Apache Kafka — основы потоковой обработки¶
Kafka — распределённая платформа потоковой передачи данных. Дата-инженер использует Kafka для real-time интеграций: сбор логов, CDC из баз данных, событийные пайплайны. В этой статье — архитектура Kafka, ключевые концепции, внутреннее устройство и локальный запуск.
Зачем Kafka дата-инженеру¶
Batch-пайплайны (Airflow + SQL) работают по расписанию: раз в час, раз в день. Но бизнесу часто нужны данные быстрее:
| Сценарий | Batch | Kafka (streaming) |
|---|---|---|
| Обновление дашборда | Раз в час | Секунды |
| Обнаружение фрода | Следующий день | Real-time |
| Синхронизация между БД | По расписанию | Непрерывно (CDC) |
| Сбор логов | Файлы + cron | Поток событий |
Kafka — не замена Airflow
Kafka и Airflow решают разные задачи. Airflow оркестрирует batch-пайплайны (запустить → подождать → проверить). Kafka передаёт данные непрерывным потоком. В реальных системах они работают вместе.
Архитектура¶
Основные компоненты¶
┌──────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Broker 0 │ │ Broker 1 │ │ Broker 2 │ │
│ │ │ │ │ │ │ │
│ │ topic-A │ │ topic-A │ │ topic-B │ │
│ │ part-0 │ │ part-1 │ │ part-0 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└──────────────────────────────────────────────┘
▲ │
│ ▼
┌─────────┐ ┌──────────┐
│ Producer│ │ Consumer │
│ (app) │ │ (ETL) │
└─────────┘ └──────────┘
| Компонент | Описание |
|---|---|
| Broker | Сервер Kafka, хранит данные на диске |
| Topic | Именованный канал сообщений (аналог таблицы) |
| Partition | Часть topic, распределённая по broker-ам |
| Producer | Приложение, отправляющее сообщения |
| Consumer | Приложение, читающее сообщения |
| Consumer Group | Группа consumer-ов, распределяющих partition-ы между собой |
Анатомия события (Event)¶
Каждое сообщение в Kafka — это событие с четырьмя компонентами:
| Поле | Назначение |
|---|---|
| Key | Определяет partition (через хеширование) и группирует связанные данные |
| Value | Полезная нагрузка (payload) — JSON, Avro, Protobuf |
| Timestamp | Временная метка события (event-time или ingestion-time) |
| Headers | Метаданные: ID схемы, трассировка, теги |
Topic и Partitions¶
Topic разделён на partitions для параллелизма и масштабирования:
Topic: orders (3 partitions)
Partition 0: [msg-0] [msg-3] [msg-6] [msg-9] → Consumer A
Partition 1: [msg-1] [msg-4] [msg-7] [msg-10] → Consumer B
Partition 2: [msg-2] [msg-5] [msg-8] [msg-11] → Consumer C
- Сообщения внутри partition упорядочены (offset 0, 1, 2, ...);
- Между partition-ами порядок не гарантирован;
- Каждый partition читается ровно одним consumer из группы.
Сколько partitions?
Число partitions = максимальный параллелизм чтения. Для начала — 3–6 partitions на topic. Увеличить можно, уменьшить — нет.
Consumer Groups¶
Consumer group — группа consumer-ов, которые делят partitions между собой:
Consumer Group "etl-pipeline":
Consumer A → Partition 0, Partition 1
Consumer B → Partition 2
Consumer Group "monitoring":
Consumer C → Partition 0, Partition 1, Partition 2
- Каждая группа получает все сообщения topic;
- Внутри группы каждый partition читается одним consumer;
- Разные группы — независимые потребители (каждая читает с начала).
Хранение данных на диске¶
На физическом уровне partition — это не один гигантский файл, а каталог с набором файлов-сегментов (segments).
Сегменты¶
/tmp/kraft-logs/orders-0/
├── 00000000000000000000.log ← активный сегмент
├── 00000000000000000000.index ← индекс offset → позиция в .log
├── 00000000000000000000.timeindex
├── 00000000000045231.log ← закрытый сегмент
├── 00000000000045231.index
└── ...
Запись всегда идёт в «активный» сегмент. Когда он достигает лимита (по размеру log.segment.bytes или времени log.roll.ms), он закрывается, создаётся новый. Удаление данных происходит целыми сегментами, а не построчно.
Нюанс retention
Сегмент удаляется только когда его самая свежая (последняя) запись становится старше retention.ms. Часть данных в сегменте может быть свежей — весь сегмент живёт.
Retention Policy¶
# Стратегия очистки: delete (по времени/размеру) или compact (по ключу)
cleanup.policy=delete
# Хранить данные 7 дней (по умолчанию)
retention.ms=604800000
# Ограничение размера одной партиции (по умолчанию -1 = без лимита)
retention.bytes=10737418240
Спектр в продакшне: от 15 минут (буферы) до бесконечности (системы-источники истины).
Log Compaction¶
Compaction — механизм для хранения «сущностей», где важно только последнее состояние для каждого ключа:
До compaction: После compaction:
key=user_1 → v1 key=user_1 → v3 (последнее)
key=user_2 → v1 key=user_2 → v2 (последнее)
key=user_1 → v2
key=user_2 → v2
key=user_1 → v3
Аналогия: «умный термостат» — нам не нужна история всех температур, нужно последнее значение.
Типичные кейсы: кэш профилей пользователей, текущие остатки на складе, конфигурации систем.
Репликация и высокая доступность¶
Kafka обеспечивает сохранность данных через распределённую репликацию. Каждая partition имеет одну реплику-лидера (Leader) и несколько последователей (Followers).
Leader, Followers и ISR¶
Partition 0 (replication-factor=3):
Broker 0: Leader ← все чтения и записи
Broker 1: Follower ← копирует данные от лидера
Broker 2: Follower ← копирует данные от лидера
ISR (In-Sync Replicas) — «элитный клуб» реплик, которые успевают за лидером. Только реплика из ISR может стать новым лидером при падении старого (Leader Election).
Fetch from Follower (Kafka 2.5+)
Consumer-ы могут читать из ближайшей реплики в той же зоне доступности. Это критически важно для облаков (Multi-AZ): сокращает затраты на cross-AZ трафик.
acks и min.insync.replicas¶
Это механизм «заказного письма» с уведомлением о вручении:
| Настройка | Описание | Компромисс |
|---|---|---|
acks=0 |
Producer не ждёт подтверждения | Максимальная скорость, риск потери |
acks=1 |
Ждёт подтверждения от лидера | Баланс скорости и надёжности |
acks=all |
Ждёт подтверждения от всех ISR | Максимальная надёжность |
# Producer
acks=all
# Broker/Topic: минимум 2 реплики в ISR для приёма записей
min.insync.replicas=2
Если в ISR останется меньше реплик, чем min.insync.replicas, Kafka откажет в записи — предпочтёт ошибку потере данных.
Как Producer выбирает Partition¶
Когда вы вызываете send(), под капотом происходит: Сериализация → Partitioner → Батчинг.
С ключом¶
Kafka хэширует ключ и направляет в конкретную partition. Все сообщения с одним ключом всегда попадают в одну partition → гарантирован порядок:
# CLI: отправка с ключом
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic orders \
--property parse.key=true --property key.separator=:
> user_123:{"item": "laptop", "price": 1000}
> user_123:{"item": "mouse", "price": 50}
# Оба сообщения попадут в одну partition
Без ключа: Sticky Partitioner¶
Раньше использовался Round-robin, но начиная с Kafka 2.4 стандартом стал Sticky Partitioner: сообщения «приклеиваются» к одной partition, пока не заполнится батч, затем переходят к следующей. Это снижает latency и нагрузку на CPU.
Батчинг¶
Kafka не отправляет каждое сообщение отдельно — собирает в пачки (batches):
# Максимальный размер батча (bytes)
batch.size=16384
# Максимальное ожидание перед отправкой (ms)
linger.ms=5
Чем больше батч → выше throughput, но выше latency одного сообщения.
Гарантии доставки¶
В распределённых системах три уровня гарантий:
| Семантика | Риск потери | Риск дублей | Настройки |
|---|---|---|---|
| At-most-once | Да | Нет | acks=0 |
| At-least-once | Нет | Да | acks=all, retries > 0 |
| Exactly-once | Нет | Нет | enable.idempotence=true + Transactions |
Idempotent Producer¶
При включении enable.idempotence=true producer присваивает каждому батчу порядковый номер (sequence number). Если лидер упадёт после записи, но до отправки ack, новый лидер увидит дубликат по номеру и проигнорирует:
Exactly-Once Semantics (EOS)¶
Для паттерна «прочитал → обработал → записал» (read-process-write) Kafka использует транзакции:
- Transaction Coordinator (внутри брокера) управляет состоянием
- Transaction Log (внутренний топик) хранит историю транзакций
- Атомарно фиксируются и выходные данные, и offset-ы прочитанных сообщений
# Producer
enable.idempotence=true
transactional.id=my-etl-service-01
# Consumer
isolation.level=read_committed
EOS на практике
Exactly-once delivery физически невозможен (проблема двух генералов). Но Kafka гарантирует exactly-once processing — обработку ровно один раз. Падение throughput — всего ~3%.
Kafka vs очереди¶
| Критерий | Kafka | RabbitMQ / SQS |
|---|---|---|
| Модель | Log (append-only) | Queue (consumed = deleted) |
| Хранение | Дни/недели на диске | До обработки |
| Replay | Можно перечитать с любого offset | Нельзя |
| Порядок | Гарантирован в partition | Гарантирован в queue |
| Масштабирование | Horizontal (partition-ы) | Vertical (больше памяти) |
| Паттерн | Pub/Sub + Stream processing | Task queue |
Kafka — не очередь задач
Kafka хранит сообщения даже после чтения. Это log, не queue. Используй RabbitMQ/Celery для задач типа «отправить email» или «обработать заказ».
Продакшн-проблемы: что ломается в реальности¶
Consumer Lag¶
Разрыв между последним offset-ом в topic и offset-ом, который прочитал consumer. Если lag растёт — система не справляется.
# Проверить lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-etl-group
| Симптом | Причина | Решение |
|---|---|---|
| Lag растёт линейно | Мало consumer-ов | Добавить consumer-ы (до числа partitions) |
| Lag скачками | Медленный внешний API | Оптимизировать обработку или добавить буфер |
| Lag после рестарта | auto.offset.reset=latest |
Переключить на earliest |
Rebalance Storms¶
При добавлении/удалении consumer-а происходит перераспределение partitions:
- Eager Rebalance (старый): «stop-the-world» — все consumer-ы бросают работу, отдают partitions, ждут переназначения
- Incremental Cooperative (2.4+): хирургическая миграция — переназначаются только те partitions, которые нужно переместить
# Включить cooperative rebalancing
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Static Membership
Используй group.instance.id для стабильных consumer-ов. После быстрого рестарта consumer сохраняет свои partitions без rebalance.
Unclean Leader Election¶
Если все ISR-реплики недоступны — выбор: ждать (простой) или назначить лидером «грязную» реплику (потеря данных):
# Запретить unclean leader election (рекомендуется для финансовых систем)
unclean.leader.election.enable=false
Zombie Producers¶
Если producer «завис» (GC-пауза), а кластер выбрал замену — при «оживлении» старый producer становится зомби. Kafka использует fencing через transactional.id: брокер проверяет epoch и отклоняет записи от старых экземпляров.
Мониторинг: ключевые метрики¶
| Метрика | Описание | Аномалия |
|---|---|---|
UnderReplicatedPartitions |
Partitions с неполной репликацией | Critical! Сбой брокера/сети |
ActiveControllerCount |
Контроллеров в кластере | Должно быть строго 1 |
BytesInPerSec / BytesOutPerSec |
Скорость записи/чтения | Перегрузка диска/сети |
ConsumerLag |
Задержка потребления | Consumer-ы не справляются |
RequestsPerSec |
Количество запросов | Перегрузка брокера |
# CLI-мониторинг
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group analytics-group
Локальный Kafka в Docker¶
services:
kafka:
image: apache/kafka:3.8.1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: /tmp/kraft-logs
# Запустить
docker compose up -d
# Создать topic
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create --topic orders --partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
# Список topic-ов
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--list --bootstrap-server localhost:9092
# Producer (отправить сообщения)
docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh \
--topic orders --bootstrap-server localhost:9092
# Consumer (прочитать сообщения)
docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh \
--topic orders --from-beginning --bootstrap-server localhost:9092
KRaft вместо ZooKeeper
Начиная с Kafka 3.3+, ZooKeeper не нужен — Kafka использует встроенный KRaft для координации. Docker-образ apache/kafka уже настроен на KRaft.
Python-клиент¶
kafka-python-ng¶
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer — отправка JSON-сообщений
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
producer.send("orders", {"order_id": 1, "amount": 99.99, "region": "EU"})
producer.send("orders", {"order_id": 2, "amount": 149.50, "region": "US"})
producer.flush()
producer.close()
# Consumer — чтение сообщений
consumer = KafkaConsumer(
"orders",
bootstrap_servers="localhost:9092",
group_id="etl-pipeline",
auto_offset_reset="earliest",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
for message in consumer:
print(f"Offset {message.offset}: {message.value}")
confluent-kafka (быстрее, на базе librdkafka)¶
from confluent_kafka import Producer, Consumer
import json
# Producer
p = Producer({"bootstrap.servers": "localhost:9092"})
def delivery_report(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()}[{msg.partition()}] @ {msg.offset()}")
p.produce("orders", key="user_123", value=json.dumps({"amount": 99.99}).encode(),
callback=delivery_report)
p.flush()
# Consumer
c = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "etl-pipeline",
"auto.offset.reset": "earliest",
})
c.subscribe(["orders"])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
print(f"Key: {msg.key()}, Value: {msg.value().decode()}")
pip install kafka-python-ng # чистый Python
pip install confluent-kafka # обёртка над librdkafka (быстрее)
Что запомнить¶
| Тема | Ключевая мысль |
|---|---|
| Topic & Partition | Partition — единица параллелизма и порядка |
| Segments | Данные хранятся в сегментах, удаляются целиком |
| Log Compaction | Хранит последнее значение по ключу (для сущностей) |
| ISR & acks | acks=all + min.insync.replicas=2 = надёжность |
| Partitioner | С ключом — hash, без ключа — Sticky Partitioner |
| Exactly-Once | Idempotent Producer + Transactions API |
| Consumer Lag | Главная метрика здоровья потокового пайплайна |
| Rebalance | Cooperative Sticky > Eager для стабильности |
| KRaft | С Kafka 3.3+ ZooKeeper не нужен |
Проверь себя¶
Что дальше?¶
- Потребление данных из Kafka — offset management, consumer groups, exactly-once на практике
- Kafka Connect — коннекторы для интеграции с БД и облаками
- Kafka в продакшне — безопасность, Schema Registry, мониторинг, Kubernetes
Источники¶
- Apache Kafka Documentation — полная документация
- Kafka: The Definitive Guide (Confluent) — обзор архитектуры
- KRaft — Kafka without ZooKeeper — настройка KRaft
- kafka-python-ng — Python-клиент для Kafka
- confluent-kafka-python — Python-клиент на базе librdkafka