Перейти к содержанию

Потребление данных из Kafka

Consumer API — основной инструмент дата-инженера для чтения данных из Kafka. В этой статье — механика poll loop, управление offset-ами, стратегии rebalancing, обработка ошибок и практические паттерны на Python.


Механика Consumer API: Poll Loop

Работа consumer-а — бесконечный цикл poll(). Внутри этого метода происходит всё: получение данных, отправка heartbeat-сигналов, координация в группе.

Python
from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "etl-pipeline",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
})
consumer.subscribe(["orders"])

while True:
    msg = consumer.poll(timeout=1.0)  # ← здесь происходит вся магия
    if msg is None:
        continue
    if msg.error():
        handle_error(msg.error())
        continue

    process(msg)
    consumer.commit(asynchronous=False)

Критические параметры

Параметр Назначение Рекомендация
session.timeout.ms Время ожидания heartbeat перед исключением из группы 10–45 с (по умолчанию 45 с)
heartbeat.interval.ms Частота heartbeat-ов координатору 1/3 от session.timeout.ms
max.poll.interval.ms Макс. время между вызовами poll() Должно покрывать обработку батча
max.poll.records Максимум записей в одном poll() 100–500 для ETL
fetch.min.bytes Минимум данных для ответа брокера Больше → выше throughput, выше latency
max.partition.fetch.bytes Лимит данных с одной partition за запрос Должен быть ≥ max.message.bytes

OOM-ловушка

Потребление памяти ≈ кол-во partitions × max.partition.fetch.bytes. Если consumer читает 50 partitions по 1 МБ — это 50 МБ буфера. Рассчитывай заранее.

max.poll.interval.ms

Если обработка батча занимает больше max.poll.interval.ms — consumer вылетает из группы. Начинается rebalance. Уменьшай max.poll.records или увеличивай таймаут.


Offset Management

Offset — курсор в логе. Ошибки здесь стоят дорого: потеря данных или дубликаты.

Auto Commit

Properties
enable.auto.commit=true        # по умолчанию
auto.commit.interval.ms=5000   # каждые 5 секунд

Проблемы: - Data loss: consumer упал после auto commit, но до обработки - Data duplication: обработка прошла, а auto commit не успел

Правило

Auto commit допустим только для некритичных логов. Для денег и заказов — только manual commit.

Manual Commit: Sync vs Async

Python
# commitSync — блокирует до подтверждения. Надёжно, но медленнее.
consumer.commit(asynchronous=False)

# commitAsync — не блокирует. Быстрее, но при rebalance есть риск дублей.
consumer.commit(asynchronous=True)

Стратегия: commit после обработки (at-least-once)

Python
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        continue

    # 1. Обработать сообщение
    try:
        result = process(msg)
        save_to_db(result)
    except Exception as e:
        send_to_dlq(msg, e)  # Dead Letter Queue
        continue

    # 2. Зафиксировать offset ПОСЛЕ успешной обработки
    consumer.commit(asynchronous=False)

auto.offset.reset

Значение Описание Когда использовать
earliest Читать с начала Первый запуск, восстановление
latest Читать только новое Мониторинг, alerting

Протухшие offset-ы

Если offset-ы consumer-а старше offsets.retention.minutes (по умолчанию 7 дней), они удаляются. При следующем запуске сработает auto.offset.reset.

seek: ручное перемещение по логу

Python
from kafka import TopicPartition

# Перемотать на конкретный offset
tp = TopicPartition("orders", partition=0, offset=12345)
consumer.seek(tp)

# Перемотать на начало
consumer.seek_to_beginning(tp)

Используй seek для восстановления после сбоев или replay данных.


Consumer Groups и Rebalancing

Протокол ребалансировки

Когда consumer присоединяется или покидает группу, происходит rebalance — перераспределение partitions:

Text Only
До: Consumer A → [P0, P1, P2]

Consumer B присоединяется:
После: Consumer A → [P0, P1], Consumer B → [P2]

Стратегии назначения

Стратегия Описание Когда использовать
RangeAssignor Делит partitions по порядку По умолчанию, простые случаи
RoundRobinAssignor Распределяет по кругу Равномерная нагрузка
StickyAssignor Минимизирует перемещения Снижение простоев
CooperativeStickyAssignor Sticky + инкрементальный rebalance Рекомендуется для продакшна

Eager vs Cooperative Rebalancing

Eager (старый): stop-the-world — все consumer-ы бросают работу, отдают все partitions, ждут нового назначения. Простой = секунды-минуты.

Cooperative (Kafka 2.4+): хирургическая миграция — переназначаются только те partitions, которые нужно переместить. Остальные consumer-ы продолжают работу.

Properties
# Включить cooperative rebalancing
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Static Membership

Проблема: в Kubernetes pod перезапускается → consumer уходит из группы → rebalance → простой 30+ секунд.

Решение: group.instance.id — consumer сохраняет свой Member ID после быстрого рестарта:

Python
consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "etl-pipeline",
    "group.instance.id": "worker-1",  # Static Membership
    "session.timeout.ms": "45000",
})

session.timeout.ms для Static Membership

Увеличь session.timeout.ms до 45–300 секунд. Consumer сохраняет partitions на это время после отключения — хватит для rolling restart.


Паттерны обработки

At-Least-Once с идемпотентной записью

Стандартный паттерн для ETL: commit после обработки + идемпотентность в целевой системе:

Python
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue

    event = json.loads(msg.value())

    # Идемпотентная запись: UPSERT по event_id
    db.execute("""
        INSERT INTO fact_orders (event_id, amount, region, processed_at)
        VALUES (%s, %s, %s, NOW())
        ON CONFLICT (event_id) DO NOTHING
    """, (event["event_id"], event["amount"], event["region"]))

    consumer.commit(asynchronous=False)

Dead Letter Queue (DLQ)

Если сообщение вызывает ошибку (poison pill), не блокируй весь поток — отправь в DLQ-топик:

Python
from confluent_kafka import Producer

dlq_producer = Producer({"bootstrap.servers": "localhost:9092"})

def process_with_dlq(consumer):
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue

        try:
            process(msg)
        except Exception as e:
            # Отправить в DLQ с метаданными об ошибке
            dlq_producer.produce(
                "orders.dlq",
                key=msg.key(),
                value=msg.value(),
                headers={
                    "error": str(e).encode(),
                    "original_topic": msg.topic().encode(),
                    "original_partition": str(msg.partition()).encode(),
                    "original_offset": str(msg.offset()).encode(),
                },
            )
            dlq_producer.flush()

        consumer.commit(asynchronous=False)

Retry с backoff

Python
import time

MAX_RETRIES = 3

def process_with_retry(msg):
    for attempt in range(MAX_RETRIES):
        try:
            return process(msg)
        except TransientError:
            wait = 2 ** attempt  # exponential backoff: 1, 2, 4 сек
            time.sleep(wait)
    # Все попытки исчерпаны — в DLQ
    send_to_dlq(msg, "Max retries exceeded")

Многопоточное потребление

KafkaConsumer не потокобезопасен — нельзя вызывать методы из разных потоков (кроме wakeup()).

Модель 1: Thread per Consumer

По одному consumer-у на поток. Просто и надёжно:

Python
import threading
from confluent_kafka import Consumer

def consumer_worker(worker_id):
    c = Consumer({
        "bootstrap.servers": "localhost:9092",
        "group.id": "etl-pipeline",
        "group.instance.id": f"worker-{worker_id}",
    })
    c.subscribe(["orders"])

    while True:
        msg = c.poll(1.0)
        if msg and not msg.error():
            process(msg)
            c.commit(asynchronous=False)

# Запустить 3 consumer-а в отдельных потоках
for i in range(3):
    t = threading.Thread(target=consumer_worker, args=(i,), daemon=True)
    t.start()

Модель 2: Consumer + Thread Pool

Один поток читает, пул обрабатывает. Выше throughput, но сложнее:

Python
from concurrent.futures import ThreadPoolExecutor
from confluent_kafka import Consumer

consumer = Consumer({...})
consumer.subscribe(["orders"])
pool = ThreadPoolExecutor(max_workers=8)

def handle(msg):
    process(msg)

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        # Pause partition, чтобы не нарушить порядок
        tp = msg.topic_partition()
        consumer.pause([tp])

        future = pool.submit(handle, msg)
        future.add_done_callback(lambda f: consumer.resume([tp]))

Порядок при многопоточности

Используй pause() перед передачей сообщения в пул и resume() после завершения. Без этого можно начать обработку следующего батча, пока первый ещё в работе.


Graceful Shutdown

Корректное завершение — обязательно для продакшна:

Python
import signal
from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "etl-pipeline",
    "group.instance.id": "worker-1",
    "enable.auto.commit": False,
})
consumer.subscribe(["orders"])

running = True

def shutdown(sig, frame):
    global running
    running = False

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

try:
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            continue

        process(msg)
        consumer.commit(asynchronous=False)
finally:
    consumer.close()  # Коммитит offset-ы и уходит из группы

Мониторинг Consumer Lag

Consumer Lag — разрыв между тем, что записано в topic, и тем, что прочитано. Главная метрика здоровья.

CLI

Bash
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group etl-pipeline

# TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
# orders   0          12340           12345           5    worker-1-xxx
# orders   1          8920            8950            30   worker-2-xxx
# orders   2          15000           15000           0    worker-3-xxx

Чек-лист при росте lag

Проверить Причина Решение
CommitFailedException в логах Обработка дольше max.poll.interval.ms Уменьшить max.poll.records
CPU/IO на воркерах Медленный внешний API или тяжёлый запрос Оптимизировать обработку
Кол-во consumer-ов < partitions Недостаточно параллелизма Добавить consumer-ы
Частые rebalance-ы Нестабильная сеть Увеличить session.timeout.ms, включить Static Membership

Инструменты мониторинга

  • Burrow — специализированный мониторинг consumer lag от LinkedIn
  • kafka-lag-exporter — экспортёр метрик для Prometheus
  • Kafka UI / AKHQ — веб-интерфейс для мониторинга

Что запомнить

Тема Ключевая мысль
Poll Loop Всё происходит внутри poll(): fetch, heartbeat, координация
Auto Commit Только для некритичных данных. Для ETL — manual commit
commitSync vs Async Sync надёжнее, Async быстрее. Выбирай по SLA
CooperativeSticky Рекомендуемая стратегия rebalancing для продакшна
Static Membership group.instance.id спасает от rebalance storms в K8s
DLQ Poison pill → в DLQ, а не блокировка всего потока
Идемпотентность Проектируй обработку так, чтобы повторный приход не ломал данные
Consumer Lag Главная метрика — мониторь через Burrow/Prometheus

Проверь себя


Что дальше?

  • Kafka Connect — коннекторы для интеграции с БД и облаками без кода
  • Kafka в продакшне — безопасность, Schema Registry, мониторинг и эксплуатация

Источники