Перейти к содержанию

Apache Kafka — основы потоковой обработки

Kafka — распределённая платформа потоковой передачи данных. Дата-инженер использует Kafka для real-time интеграций: сбор логов, CDC из баз данных, событийные пайплайны. В этой статье — архитектура Kafka, ключевые концепции, внутреннее устройство и локальный запуск.


Зачем Kafka дата-инженеру

Batch-пайплайны (Airflow + SQL) работают по расписанию: раз в час, раз в день. Но бизнесу часто нужны данные быстрее:

Сценарий Batch Kafka (streaming)
Обновление дашборда Раз в час Секунды
Обнаружение фрода Следующий день Real-time
Синхронизация между БД По расписанию Непрерывно (CDC)
Сбор логов Файлы + cron Поток событий

Kafka — не замена Airflow

Kafka и Airflow решают разные задачи. Airflow оркестрирует batch-пайплайны (запустить → подождать → проверить). Kafka передаёт данные непрерывным потоком. В реальных системах они работают вместе.


Архитектура

Основные компоненты

Text Only
┌──────────────────────────────────────────────┐
│                 Kafka Cluster                 │
│                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ Broker 0 │  │ Broker 1 │  │ Broker 2 │  │
│  │          │  │          │  │          │  │
│  │ topic-A  │  │ topic-A  │  │ topic-B  │  │
│  │ part-0   │  │ part-1   │  │ part-0   │  │
│  └──────────┘  └──────────┘  └──────────┘  │
└──────────────────────────────────────────────┘
       ▲                              │
       │                              ▼
  ┌─────────┐                   ┌──────────┐
  │ Producer│                   │ Consumer │
  │ (app)   │                   │ (ETL)    │
  └─────────┘                   └──────────┘
Компонент Описание
Broker Сервер Kafka, хранит данные на диске
Topic Именованный канал сообщений (аналог таблицы)
Partition Часть topic, распределённая по broker-ам
Producer Приложение, отправляющее сообщения
Consumer Приложение, читающее сообщения
Consumer Group Группа consumer-ов, распределяющих partition-ы между собой

Анатомия события (Event)

Каждое сообщение в Kafka — это событие с четырьмя компонентами:

Поле Назначение
Key Определяет partition (через хеширование) и группирует связанные данные
Value Полезная нагрузка (payload) — JSON, Avro, Protobuf
Timestamp Временная метка события (event-time или ingestion-time)
Headers Метаданные: ID схемы, трассировка, теги

Topic и Partitions

Topic разделён на partitions для параллелизма и масштабирования:

Text Only
Topic: orders (3 partitions)

Partition 0: [msg-0] [msg-3] [msg-6] [msg-9]  → Consumer A
Partition 1: [msg-1] [msg-4] [msg-7] [msg-10] → Consumer B
Partition 2: [msg-2] [msg-5] [msg-8] [msg-11] → Consumer C
  • Сообщения внутри partition упорядочены (offset 0, 1, 2, ...);
  • Между partition-ами порядок не гарантирован;
  • Каждый partition читается ровно одним consumer из группы.

Сколько partitions?

Число partitions = максимальный параллелизм чтения. Для начала — 3–6 partitions на topic. Увеличить можно, уменьшить — нет.

Consumer Groups

Consumer group — группа consumer-ов, которые делят partitions между собой:

Text Only
Consumer Group "etl-pipeline":
  Consumer A → Partition 0, Partition 1
  Consumer B → Partition 2

Consumer Group "monitoring":
  Consumer C → Partition 0, Partition 1, Partition 2
  • Каждая группа получает все сообщения topic;
  • Внутри группы каждый partition читается одним consumer;
  • Разные группы — независимые потребители (каждая читает с начала).

Хранение данных на диске

На физическом уровне partition — это не один гигантский файл, а каталог с набором файлов-сегментов (segments).

Сегменты

Text Only
/tmp/kraft-logs/orders-0/
├── 00000000000000000000.log      ← активный сегмент
├── 00000000000000000000.index    ← индекс offset → позиция в .log
├── 00000000000000000000.timeindex
├── 00000000000045231.log         ← закрытый сегмент
├── 00000000000045231.index
└── ...

Запись всегда идёт в «активный» сегмент. Когда он достигает лимита (по размеру log.segment.bytes или времени log.roll.ms), он закрывается, создаётся новый. Удаление данных происходит целыми сегментами, а не построчно.

Нюанс retention

Сегмент удаляется только когда его самая свежая (последняя) запись становится старше retention.ms. Часть данных в сегменте может быть свежей — весь сегмент живёт.

Retention Policy

Properties
# Стратегия очистки: delete (по времени/размеру) или compact (по ключу)
cleanup.policy=delete

# Хранить данные 7 дней (по умолчанию)
retention.ms=604800000

# Ограничение размера одной партиции (по умолчанию -1 = без лимита)
retention.bytes=10737418240

Спектр в продакшне: от 15 минут (буферы) до бесконечности (системы-источники истины).

Log Compaction

Compaction — механизм для хранения «сущностей», где важно только последнее состояние для каждого ключа:

Text Only
До compaction:            После compaction:
key=user_1 → v1           key=user_1 → v3  (последнее)
key=user_2 → v1           key=user_2 → v2  (последнее)
key=user_1 → v2
key=user_2 → v2
key=user_1 → v3

Аналогия: «умный термостат» — нам не нужна история всех температур, нужно последнее значение.

Properties
# Включить compaction для топика
cleanup.policy=compact

Типичные кейсы: кэш профилей пользователей, текущие остатки на складе, конфигурации систем.


Репликация и высокая доступность

Kafka обеспечивает сохранность данных через распределённую репликацию. Каждая partition имеет одну реплику-лидера (Leader) и несколько последователей (Followers).

Leader, Followers и ISR

Text Only
Partition 0 (replication-factor=3):

  Broker 0: Leader    ← все чтения и записи
  Broker 1: Follower  ← копирует данные от лидера
  Broker 2: Follower  ← копирует данные от лидера

ISR (In-Sync Replicas) — «элитный клуб» реплик, которые успевают за лидером. Только реплика из ISR может стать новым лидером при падении старого (Leader Election).

Fetch from Follower (Kafka 2.5+)

Consumer-ы могут читать из ближайшей реплики в той же зоне доступности. Это критически важно для облаков (Multi-AZ): сокращает затраты на cross-AZ трафик.

acks и min.insync.replicas

Это механизм «заказного письма» с уведомлением о вручении:

Настройка Описание Компромисс
acks=0 Producer не ждёт подтверждения Максимальная скорость, риск потери
acks=1 Ждёт подтверждения от лидера Баланс скорости и надёжности
acks=all Ждёт подтверждения от всех ISR Максимальная надёжность
Properties
# Producer
acks=all

# Broker/Topic: минимум 2 реплики в ISR для приёма записей
min.insync.replicas=2

Если в ISR останется меньше реплик, чем min.insync.replicas, Kafka откажет в записи — предпочтёт ошибку потере данных.


Как Producer выбирает Partition

Когда вы вызываете send(), под капотом происходит: Сериализация → Partitioner → Батчинг.

С ключом

Kafka хэширует ключ и направляет в конкретную partition. Все сообщения с одним ключом всегда попадают в одну partition → гарантирован порядок:

Bash
# CLI: отправка с ключом
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic orders \
  --property parse.key=true --property key.separator=:
> user_123:{"item": "laptop", "price": 1000}
> user_123:{"item": "mouse", "price": 50}
# Оба сообщения попадут в одну partition

Без ключа: Sticky Partitioner

Раньше использовался Round-robin, но начиная с Kafka 2.4 стандартом стал Sticky Partitioner: сообщения «приклеиваются» к одной partition, пока не заполнится батч, затем переходят к следующей. Это снижает latency и нагрузку на CPU.

Батчинг

Kafka не отправляет каждое сообщение отдельно — собирает в пачки (batches):

Properties
# Максимальный размер батча (bytes)
batch.size=16384

# Максимальное ожидание перед отправкой (ms)
linger.ms=5

Чем больше батч → выше throughput, но выше latency одного сообщения.


Гарантии доставки

В распределённых системах три уровня гарантий:

Семантика Риск потери Риск дублей Настройки
At-most-once Да Нет acks=0
At-least-once Нет Да acks=all, retries > 0
Exactly-once Нет Нет enable.idempotence=true + Transactions

Idempotent Producer

При включении enable.idempotence=true producer присваивает каждому батчу порядковый номер (sequence number). Если лидер упадёт после записи, но до отправки ack, новый лидер увидит дубликат по номеру и проигнорирует:

Properties
enable.idempotence=true
# Включается автоматически с Kafka 3.0+

Exactly-Once Semantics (EOS)

Для паттерна «прочитал → обработал → записал» (read-process-write) Kafka использует транзакции:

  1. Transaction Coordinator (внутри брокера) управляет состоянием
  2. Transaction Log (внутренний топик) хранит историю транзакций
  3. Атомарно фиксируются и выходные данные, и offset-ы прочитанных сообщений
Properties
# Producer
enable.idempotence=true
transactional.id=my-etl-service-01

# Consumer
isolation.level=read_committed

EOS на практике

Exactly-once delivery физически невозможен (проблема двух генералов). Но Kafka гарантирует exactly-once processing — обработку ровно один раз. Падение throughput — всего ~3%.


Kafka vs очереди

Критерий Kafka RabbitMQ / SQS
Модель Log (append-only) Queue (consumed = deleted)
Хранение Дни/недели на диске До обработки
Replay Можно перечитать с любого offset Нельзя
Порядок Гарантирован в partition Гарантирован в queue
Масштабирование Horizontal (partition-ы) Vertical (больше памяти)
Паттерн Pub/Sub + Stream processing Task queue

Kafka — не очередь задач

Kafka хранит сообщения даже после чтения. Это log, не queue. Используй RabbitMQ/Celery для задач типа «отправить email» или «обработать заказ».


Продакшн-проблемы: что ломается в реальности

Consumer Lag

Разрыв между последним offset-ом в topic и offset-ом, который прочитал consumer. Если lag растёт — система не справляется.

Bash
# Проверить lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-etl-group
Симптом Причина Решение
Lag растёт линейно Мало consumer-ов Добавить consumer-ы (до числа partitions)
Lag скачками Медленный внешний API Оптимизировать обработку или добавить буфер
Lag после рестарта auto.offset.reset=latest Переключить на earliest

Rebalance Storms

При добавлении/удалении consumer-а происходит перераспределение partitions:

  • Eager Rebalance (старый): «stop-the-world» — все consumer-ы бросают работу, отдают partitions, ждут переназначения
  • Incremental Cooperative (2.4+): хирургическая миграция — переназначаются только те partitions, которые нужно переместить
Properties
# Включить cooperative rebalancing
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Static Membership

Используй group.instance.id для стабильных consumer-ов. После быстрого рестарта consumer сохраняет свои partitions без rebalance.

Unclean Leader Election

Если все ISR-реплики недоступны — выбор: ждать (простой) или назначить лидером «грязную» реплику (потеря данных):

Properties
# Запретить unclean leader election (рекомендуется для финансовых систем)
unclean.leader.election.enable=false

Zombie Producers

Если producer «завис» (GC-пауза), а кластер выбрал замену — при «оживлении» старый producer становится зомби. Kafka использует fencing через transactional.id: брокер проверяет epoch и отклоняет записи от старых экземпляров.


Мониторинг: ключевые метрики

Метрика Описание Аномалия
UnderReplicatedPartitions Partitions с неполной репликацией Critical! Сбой брокера/сети
ActiveControllerCount Контроллеров в кластере Должно быть строго 1
BytesInPerSec / BytesOutPerSec Скорость записи/чтения Перегрузка диска/сети
ConsumerLag Задержка потребления Consumer-ы не справляются
RequestsPerSec Количество запросов Перегрузка брокера
Bash
# CLI-мониторинг
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group analytics-group

Локальный Kafka в Docker

docker-compose.yml
services:
  kafka:
    image: apache/kafka:3.8.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: /tmp/kraft-logs
Bash
# Запустить
docker compose up -d

# Создать topic
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --create --topic orders --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

# Список topic-ов
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
  --list --bootstrap-server localhost:9092

# Producer (отправить сообщения)
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
  --topic orders --bootstrap-server localhost:9092

# Consumer (прочитать сообщения)
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --topic orders --from-beginning --bootstrap-server localhost:9092

KRaft вместо ZooKeeper

Начиная с Kafka 3.3+, ZooKeeper не нужен — Kafka использует встроенный KRaft для координации. Docker-образ apache/kafka уже настроен на KRaft.


Python-клиент

kafka-python-ng

Python
from kafka import KafkaProducer, KafkaConsumer
import json

# Producer — отправка JSON-сообщений
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

producer.send("orders", {"order_id": 1, "amount": 99.99, "region": "EU"})
producer.send("orders", {"order_id": 2, "amount": 149.50, "region": "US"})
producer.flush()
producer.close()

# Consumer — чтение сообщений
consumer = KafkaConsumer(
    "orders",
    bootstrap_servers="localhost:9092",
    group_id="etl-pipeline",
    auto_offset_reset="earliest",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

for message in consumer:
    print(f"Offset {message.offset}: {message.value}")

confluent-kafka (быстрее, на базе librdkafka)

Python
from confluent_kafka import Producer, Consumer
import json

# Producer
p = Producer({"bootstrap.servers": "localhost:9092"})

def delivery_report(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()}[{msg.partition()}] @ {msg.offset()}")

p.produce("orders", key="user_123", value=json.dumps({"amount": 99.99}).encode(),
          callback=delivery_report)
p.flush()

# Consumer
c = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "etl-pipeline",
    "auto.offset.reset": "earliest",
})
c.subscribe(["orders"])

while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Error: {msg.error()}")
        continue
    print(f"Key: {msg.key()}, Value: {msg.value().decode()}")
Bash
pip install kafka-python-ng    # чистый Python
pip install confluent-kafka    # обёртка над librdkafka (быстрее)

Что запомнить

Тема Ключевая мысль
Topic & Partition Partition — единица параллелизма и порядка
Segments Данные хранятся в сегментах, удаляются целиком
Log Compaction Хранит последнее значение по ключу (для сущностей)
ISR & acks acks=all + min.insync.replicas=2 = надёжность
Partitioner С ключом — hash, без ключа — Sticky Partitioner
Exactly-Once Idempotent Producer + Transactions API
Consumer Lag Главная метрика здоровья потокового пайплайна
Rebalance Cooperative Sticky > Eager для стабильности
KRaft С Kafka 3.3+ ZooKeeper не нужен

Проверь себя


Что дальше?


Источники