Потребление данных из Kafka¶
Consumer API — основной инструмент дата-инженера для чтения данных из Kafka. В этой статье — механика poll loop, управление offset-ами, стратегии rebalancing, обработка ошибок и практические паттерны на Python.
Механика Consumer API: Poll Loop¶
Работа consumer-а — бесконечный цикл poll(). Внутри этого метода происходит всё: получение данных, отправка heartbeat-сигналов, координация в группе.
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "etl-pipeline",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe(["orders"])
while True:
msg = consumer.poll(timeout=1.0) # ← здесь происходит вся магия
if msg is None:
continue
if msg.error():
handle_error(msg.error())
continue
process(msg)
consumer.commit(asynchronous=False)
Критические параметры¶
| Параметр | Назначение | Рекомендация |
|---|---|---|
session.timeout.ms |
Время ожидания heartbeat перед исключением из группы | 10–45 с (по умолчанию 45 с) |
heartbeat.interval.ms |
Частота heartbeat-ов координатору | 1/3 от session.timeout.ms |
max.poll.interval.ms |
Макс. время между вызовами poll() |
Должно покрывать обработку батча |
max.poll.records |
Максимум записей в одном poll() |
100–500 для ETL |
fetch.min.bytes |
Минимум данных для ответа брокера | Больше → выше throughput, выше latency |
max.partition.fetch.bytes |
Лимит данных с одной partition за запрос | Должен быть ≥ max.message.bytes |
OOM-ловушка
Потребление памяти ≈ кол-во partitions × max.partition.fetch.bytes. Если consumer читает 50 partitions по 1 МБ — это 50 МБ буфера. Рассчитывай заранее.
max.poll.interval.ms
Если обработка батча занимает больше max.poll.interval.ms — consumer вылетает из группы. Начинается rebalance. Уменьшай max.poll.records или увеличивай таймаут.
Offset Management¶
Offset — курсор в логе. Ошибки здесь стоят дорого: потеря данных или дубликаты.
Auto Commit¶
Проблемы: - Data loss: consumer упал после auto commit, но до обработки - Data duplication: обработка прошла, а auto commit не успел
Правило
Auto commit допустим только для некритичных логов. Для денег и заказов — только manual commit.
Manual Commit: Sync vs Async¶
# commitSync — блокирует до подтверждения. Надёжно, но медленнее.
consumer.commit(asynchronous=False)
# commitAsync — не блокирует. Быстрее, но при rebalance есть риск дублей.
consumer.commit(asynchronous=True)
Стратегия: commit после обработки (at-least-once)¶
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
# 1. Обработать сообщение
try:
result = process(msg)
save_to_db(result)
except Exception as e:
send_to_dlq(msg, e) # Dead Letter Queue
continue
# 2. Зафиксировать offset ПОСЛЕ успешной обработки
consumer.commit(asynchronous=False)
auto.offset.reset¶
| Значение | Описание | Когда использовать |
|---|---|---|
earliest |
Читать с начала | Первый запуск, восстановление |
latest |
Читать только новое | Мониторинг, alerting |
Протухшие offset-ы
Если offset-ы consumer-а старше offsets.retention.minutes (по умолчанию 7 дней), они удаляются. При следующем запуске сработает auto.offset.reset.
seek: ручное перемещение по логу¶
from kafka import TopicPartition
# Перемотать на конкретный offset
tp = TopicPartition("orders", partition=0, offset=12345)
consumer.seek(tp)
# Перемотать на начало
consumer.seek_to_beginning(tp)
Используй seek для восстановления после сбоев или replay данных.
Consumer Groups и Rebalancing¶
Протокол ребалансировки¶
Когда consumer присоединяется или покидает группу, происходит rebalance — перераспределение partitions:
До: Consumer A → [P0, P1, P2]
Consumer B присоединяется:
После: Consumer A → [P0, P1], Consumer B → [P2]
Стратегии назначения¶
| Стратегия | Описание | Когда использовать |
|---|---|---|
| RangeAssignor | Делит partitions по порядку | По умолчанию, простые случаи |
| RoundRobinAssignor | Распределяет по кругу | Равномерная нагрузка |
| StickyAssignor | Минимизирует перемещения | Снижение простоев |
| CooperativeStickyAssignor | Sticky + инкрементальный rebalance | Рекомендуется для продакшна |
Eager vs Cooperative Rebalancing¶
Eager (старый): stop-the-world — все consumer-ы бросают работу, отдают все partitions, ждут нового назначения. Простой = секунды-минуты.
Cooperative (Kafka 2.4+): хирургическая миграция — переназначаются только те partitions, которые нужно переместить. Остальные consumer-ы продолжают работу.
# Включить cooperative rebalancing
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Static Membership¶
Проблема: в Kubernetes pod перезапускается → consumer уходит из группы → rebalance → простой 30+ секунд.
Решение: group.instance.id — consumer сохраняет свой Member ID после быстрого рестарта:
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "etl-pipeline",
"group.instance.id": "worker-1", # Static Membership
"session.timeout.ms": "45000",
})
session.timeout.ms для Static Membership
Увеличь session.timeout.ms до 45–300 секунд. Consumer сохраняет partitions на это время после отключения — хватит для rolling restart.
Паттерны обработки¶
At-Least-Once с идемпотентной записью¶
Стандартный паттерн для ETL: commit после обработки + идемпотентность в целевой системе:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
event = json.loads(msg.value())
# Идемпотентная запись: UPSERT по event_id
db.execute("""
INSERT INTO fact_orders (event_id, amount, region, processed_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (event_id) DO NOTHING
""", (event["event_id"], event["amount"], event["region"]))
consumer.commit(asynchronous=False)
Dead Letter Queue (DLQ)¶
Если сообщение вызывает ошибку (poison pill), не блокируй весь поток — отправь в DLQ-топик:
from confluent_kafka import Producer
dlq_producer = Producer({"bootstrap.servers": "localhost:9092"})
def process_with_dlq(consumer):
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
try:
process(msg)
except Exception as e:
# Отправить в DLQ с метаданными об ошибке
dlq_producer.produce(
"orders.dlq",
key=msg.key(),
value=msg.value(),
headers={
"error": str(e).encode(),
"original_topic": msg.topic().encode(),
"original_partition": str(msg.partition()).encode(),
"original_offset": str(msg.offset()).encode(),
},
)
dlq_producer.flush()
consumer.commit(asynchronous=False)
Retry с backoff¶
import time
MAX_RETRIES = 3
def process_with_retry(msg):
for attempt in range(MAX_RETRIES):
try:
return process(msg)
except TransientError:
wait = 2 ** attempt # exponential backoff: 1, 2, 4 сек
time.sleep(wait)
# Все попытки исчерпаны — в DLQ
send_to_dlq(msg, "Max retries exceeded")
Многопоточное потребление¶
KafkaConsumer не потокобезопасен — нельзя вызывать методы из разных потоков (кроме wakeup()).
Модель 1: Thread per Consumer¶
По одному consumer-у на поток. Просто и надёжно:
import threading
from confluent_kafka import Consumer
def consumer_worker(worker_id):
c = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "etl-pipeline",
"group.instance.id": f"worker-{worker_id}",
})
c.subscribe(["orders"])
while True:
msg = c.poll(1.0)
if msg and not msg.error():
process(msg)
c.commit(asynchronous=False)
# Запустить 3 consumer-а в отдельных потоках
for i in range(3):
t = threading.Thread(target=consumer_worker, args=(i,), daemon=True)
t.start()
Модель 2: Consumer + Thread Pool¶
Один поток читает, пул обрабатывает. Выше throughput, но сложнее:
from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Consumer
consumer = Consumer({...})
consumer.subscribe(["orders"])
pool = ThreadPoolExecutor(max_workers=8)
def handle(msg):
process(msg)
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
# Pause partition, чтобы не нарушить порядок
tp = msg.topic_partition()
consumer.pause([tp])
future = pool.submit(handle, msg)
future.add_done_callback(lambda f: consumer.resume([tp]))
Порядок при многопоточности
Используй pause() перед передачей сообщения в пул и resume() после завершения. Без этого можно начать обработку следующего батча, пока первый ещё в работе.
Graceful Shutdown¶
Корректное завершение — обязательно для продакшна:
import signal
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "etl-pipeline",
"group.instance.id": "worker-1",
"enable.auto.commit": False,
})
consumer.subscribe(["orders"])
running = True
def shutdown(sig, frame):
global running
running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
try:
while running:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
continue
process(msg)
consumer.commit(asynchronous=False)
finally:
consumer.close() # Коммитит offset-ы и уходит из группы
Мониторинг Consumer Lag¶
Consumer Lag — разрыв между тем, что записано в topic, и тем, что прочитано. Главная метрика здоровья.
CLI¶
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group etl-pipeline
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# orders 0 12340 12345 5 worker-1-xxx
# orders 1 8920 8950 30 worker-2-xxx
# orders 2 15000 15000 0 worker-3-xxx
Чек-лист при росте lag¶
| Проверить | Причина | Решение |
|---|---|---|
CommitFailedException в логах |
Обработка дольше max.poll.interval.ms |
Уменьшить max.poll.records |
| CPU/IO на воркерах | Медленный внешний API или тяжёлый запрос | Оптимизировать обработку |
| Кол-во consumer-ов < partitions | Недостаточно параллелизма | Добавить consumer-ы |
| Частые rebalance-ы | Нестабильная сеть | Увеличить session.timeout.ms, включить Static Membership |
Инструменты мониторинга¶
- Burrow — специализированный мониторинг consumer lag от LinkedIn
- kafka-lag-exporter — экспортёр метрик для Prometheus
- Kafka UI / AKHQ — веб-интерфейс для мониторинга
Что запомнить¶
| Тема | Ключевая мысль |
|---|---|
| Poll Loop | Всё происходит внутри poll(): fetch, heartbeat, координация |
| Auto Commit | Только для некритичных данных. Для ETL — manual commit |
| commitSync vs Async | Sync надёжнее, Async быстрее. Выбирай по SLA |
| CooperativeSticky | Рекомендуемая стратегия rebalancing для продакшна |
| Static Membership | group.instance.id спасает от rebalance storms в K8s |
| DLQ | Poison pill → в DLQ, а не блокировка всего потока |
| Идемпотентность | Проектируй обработку так, чтобы повторный приход не ломал данные |
| Consumer Lag | Главная метрика — мониторь через Burrow/Prometheus |
Проверь себя¶
Что дальше?¶
- Kafka Connect — коннекторы для интеграции с БД и облаками без кода
- Kafka в продакшне — безопасность, Schema Registry, мониторинг и эксплуатация
Источники¶
- Apache Kafka Consumer Documentation — Consumer API
- KIP-848: The Next Rebalance Protocol — новый протокол ребалансировки
- Confluent: Kafka Consumer Guide — руководство по consumer-ам
- confluent-kafka-python — Python-клиент