Перейти к содержанию

Kafka в продакшне

Запустить Kafka локально в Docker — просто. Эксплуатировать в продакшне — другой уровень. В этой статье — конфигурация кластера, безопасность, Schema Registry, мониторинг, траблшутинг и операции.


Конфигурация кластера

Ключевые параметры брокера

Properties
# Репликация по умолчанию (для новых топиков)
default.replication.factor=3

# Минимум ISR для записи с acks=all
min.insync.replicas=2

# Retention
log.retention.hours=168           # 7 дней
log.retention.bytes=-1            # без лимита по размеру
log.segment.bytes=1073741824      # 1 ГБ на сегмент

# Максимальный размер сообщения
message.max.bytes=10485760        # 10 МБ

# Количество потоков
num.io.threads=8
num.network.threads=3
num.replica.fetchers=2

# Запретить unclean leader election
unclean.leader.election.enable=false

Сайзинг

Ресурс Рекомендация Почему
CPU 8–16 cores Kafka CPU-bound на сжатии и TLS
RAM 32–64 ГБ 6 ГБ heap + остальное для page cache
Disk SSD/NVMe, JBOD Последовательные I/O, RAID не нужен
Network 10 Gbps Репликация × replication factor

Page Cache — секрет производительности Kafka

Kafka активно использует OS page cache. Чем больше RAM — тем больше данных читается из кэша, а не с диска. Не выделяй весь RAM на JVM heap — 6 ГБ достаточно.

Формула для количества partitions

Text Only
partitions = max(throughput / partition_throughput, consumer_count)
  • Один partition даёт ~10 МБ/с на запись
  • Consumer-ов не может быть больше, чем partitions
  • Начни с 6–12 partitions на topic, увеличивай по мере роста

Безопасность

Безопасность в Kafka — не опция, а фундамент продакшн-инсталляции.

Encryption: SSL/TLS

Защита данных в транзите (client ↔ broker, broker ↔ broker):

Properties
# Broker config
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=${file:/secrets/ssl.properties:keystore_password}
ssl.truststore.location=/var/kafka/ssl/kafka.truststore.jks

# Inter-broker communication
security.inter.broker.protocol=SSL

Authentication: SASL/SCRAM

Аутентификация клиентов через логин/пароль:

Properties
# Broker
listeners=SASL_SSL://0.0.0.0:9094
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
Bash
# Создать пользователя
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --add-config 'SCRAM-SHA-512=[iterations=8192,password=etl_pass]' \
  --entity-type users --entity-name etl_user

Authorization: ACL

Разграничение прав через Access Control Lists:

Bash
# Разрешить etl_user читать из топика raw.orders
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:etl_user \
  --operation Read --topic raw.orders

# Разрешить запись в топик
kafka-acls.sh --bootstrap-server localhost:9092 \
  --add --allow-principal User:producer_app \
  --operation Write --topic raw.orders

# Список ACL
kafka-acls.sh --bootstrap-server localhost:9092 --list

RBAC для Enterprise

В Confluent Platform доступен RBAC (Role-Based Access Control) — централизованное управление ролями через Control Center.


Schema Registry

Без управления схемами Kafka — «глупая труба» с невалидными байтами. Schema Registry делает данные самоописывающимися.

Зачем нужен

Text Only
Без Schema Registry:
Producer → [{"name": "Иван", "age": 30}] → Kafka → Consumer
                                                    ↑ а если завтра age станет string?

С Schema Registry:
Producer → [Schema ID: 42, bytes...] → Kafka → Consumer
                ↓                                  ↓
           Schema Registry ←─── запрос схемы ────→ Schema Registry

Producer регистрирует схему и отправляет только Schema ID в заголовках. Consumer получает схему по ID. Это экономит трафик и гарантирует контракт.

Форматы

Формат Размер Типизация Эволюция Когда использовать
JSON Schema Большой Слабая Да Прототипы, дебаг
Avro Малый Строгая Да Стандарт индустрии
Protobuf Малый Строгая Да gRPC-совместимость

Режимы совместимости

Режим Описание Что можно менять
BACKWARD Новые consumer-ы читают старые данные Удалять поля, добавлять опциональные
FORWARD Старые consumer-ы читают новые данные Добавлять поля, удалять опциональные
FULL Двусторонняя совместимость Добавлять/удалять только опциональные
NONE Без проверок Что угодно (опасно!)
Bash
# Зарегистрировать Avro-схему
curl -X POST http://localhost:8081/subjects/raw.orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}'

# Проверить совместимость перед деплоем
curl -X POST http://localhost:8081/compatibility/subjects/raw.orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "..."}'

# Установить режим совместимости
curl -X PUT http://localhost:8081/config/raw.orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

Schema Registry в ETL

Регистрируй схемы в CI/CD. Если новая версия схемы несовместима — пайплайн не деплоится. Это дешевле, чем ловить ошибки в продакшне.


Мониторинг

Критические метрики (JMX)

Метрика Описание Алерт
UnderReplicatedPartitions Partitions с неполной репликацией P1 если > 0 дольше 5 мин
ActiveControllerCount Контроллеров в кластере P1 если ≠ 1
OfflinePartitionsCount Partitions без лидера P1 если > 0
BytesInPerSec / BytesOutPerSec Скорость записи/чтения Алерт при > 80% пропускной способности
RequestsPerSec Запросов в секунду Перегрузка брокера
ConsumerLag Задержка потребления P2 если растёт 15+ мин
IsrShrinksPerSec / IsrExpandsPerSec Изменения ISR Нестабильность кластера

Prometheus + Grafana

docker-compose.yml (JMX Exporter)
services:
  kafka:
    image: apache/kafka:3.8.1
    environment:
      KAFKA_OPTS: "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
    ports:
      - "7071:7071"   # Prometheus metrics

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

Мониторинг Consumer Lag

Bash
# Встроенный CLI
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group etl-pipeline

# Вывод:
# TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
# orders   0          12340           12345           5    worker-1
# orders   1          8920            8950            30   worker-2

Инструменты:

Инструмент Описание
Burrow Мониторинг lag от LinkedIn, определяет «здоровье» consumer-а
kafka-lag-exporter Prometheus-экспортёр для consumer lag
AKHQ / Kafka UI Веб-интерфейс: топики, consumer groups, сообщения

Алертинг: runbook

Алерт Причина Первые шаги
UnderReplicatedPartitions > 0 Брокер упал или сеть Проверить статус брокеров, disk I/O, сеть
ActiveControllerCount ≠ 1 Потеря controller-а Проверить ZK/KRaft, рестартнуть «лишний» controller
ConsumerLag растёт Consumer не справляется CPU/IO воркеров → масштабирование → оптимизация
OfflinePartitions > 0 Все реплики недоступны Проверить брокеры, рассмотреть unclean election

Траблшутинг

Consumer Lag растёт

Text Only
Чек-лист:
1. Ресурсы воркеров (CPU, IO, сеть) — часто медленный внешний API
2. max.poll.records слишком большой — уменьшить
3. Consumer-ов < partitions — добавить воркеры
4. Частые rebalance-ы — включить Static Membership
5. Медленная запись в БД — оптимизировать или использовать batch insert

Rebalance Storms

Причина: consumer тратит на обработку больше max.poll.interval.ms → группа считает его мёртвым → rebalance → ещё медленнее → loop.

Решение:

Properties
# Уменьшить размер батча
max.poll.records=100

# Или увеличить таймаут (временная мера)
max.poll.interval.ms=600000

# Включить cooperative rebalancing
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

# Static Membership для K8s
group.instance.id=worker-1
session.timeout.ms=120000

Broker Out of Sync

Bash
# Проверить under-replicated partitions
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --under-replicated-partitions

# Проверить ISR для конкретного топика
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

Причины: медленный диск, перегрузка сети, GC-паузы. Решение: проверить replica.lag.time.max.ms, disk I/O, GC логи.

Disk Full

Bash
# Проверить использование диска
du -sh /var/kafka-logs/*

# Уменьшить retention для горячих топиков
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --entity-type topics --entity-name hot-topic \
  --add-config retention.ms=86400000  # 1 день

# Удалить старые данные из топика (осторожно!)
kafka-delete-records.sh --bootstrap-server localhost:9092 \
  --offset-json-file delete-offsets.json

Операции

Rolling Upgrade

Обновление брокеров по одному без простоя:

Text Only
1. Обновить софт на broker-1
2. Запустить broker-1
3. Дождаться UnderReplicatedPartitions = 0
4. Повторить для broker-2, broker-3, ...

Порядок

Никогда не обновляй следующий брокер, пока UnderReplicatedPartitions > 0. Это может привести к потере данных.

Partition Reassignment

При добавлении новых брокеров — перераспределение partitions:

Bash
# Сгенерировать план
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "0,1,2,3" --generate

# Выполнить
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file plan.json --execute

# Проверить прогресс
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file plan.json --verify

Preferred Leader Election

После рестарта брокеры могут стать followers. Вернуть лидерство «предпочтительным» узлам:

Bash
kafka-leader-election.sh --bootstrap-server localhost:9092 \
  --election-type PREFERRED --all-topic-partitions

Kafka на Kubernetes

Strimzi Operator

Strimzi — стандартный оператор для Kafka в K8s. Автоматизирует:

  • Создание кластера через Custom Resources (CRD)
  • Rolling upgrades
  • Управление сертификатами TLS
  • Управление пользователями и топиками
kafka-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: etl-cluster
spec:
  kafka:
    version: 3.8.1
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      min.insync.replicas: 2
      default.replication.factor: 3
    storage:
      type: persistent-claim
      size: 100Gi
      class: gp3
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi

StatefulSet vs Bare Metal

Критерий Kubernetes Bare Metal
Развёртывание Минуты (CRD) Часы
Масштабирование replicas: 5 Ручное
Storage PersistentVolume (EBS, gp3) Локальные SSD
Latency Чуть выше (overlay network) Минимальная
Операции Strimzi автоматизирует Ansible/Chef

Anti-Affinity

Настрой pod anti-affinity, чтобы реплики одной partition не оказались на одном физическом узле:

YAML
template:
  pod:
    affinity:
      podAntiAffinity:
        requiredDuringSchedulingIgnoredDuringExecution:
          - topologyKey: kubernetes.io/hostname


Что запомнить

Тема Ключевая мысль
Сайзинг 6 ГБ heap + максимум RAM для page cache
Безопасность SSL/TLS + SASL/SCRAM + ACL — обязательная тройка
Schema Registry Avro + BACKWARD compatibility = стандарт
Мониторинг UnderReplicatedPartitions — алерт номер один
Consumer Lag Главная метрика здоровья ETL-пайплайнов
Rolling Upgrade По одному брокеру, ждать ISR sync
Kubernetes Strimzi + anti-affinity + PersistentVolume

Проверь себя


Источники