Kafka в продакшне¶
Запустить Kafka локально в Docker — просто. Эксплуатировать в продакшне — другой уровень. В этой статье — конфигурация кластера, безопасность, Schema Registry, мониторинг, траблшутинг и операции.
Конфигурация кластера¶
Ключевые параметры брокера¶
# Репликация по умолчанию (для новых топиков)
default.replication.factor=3
# Минимум ISR для записи с acks=all
min.insync.replicas=2
# Retention
log.retention.hours=168 # 7 дней
log.retention.bytes=-1 # без лимита по размеру
log.segment.bytes=1073741824 # 1 ГБ на сегмент
# Максимальный размер сообщения
message.max.bytes=10485760 # 10 МБ
# Количество потоков
num.io.threads=8
num.network.threads=3
num.replica.fetchers=2
# Запретить unclean leader election
unclean.leader.election.enable=false
Сайзинг¶
| Ресурс | Рекомендация | Почему |
|---|---|---|
| CPU | 8–16 cores | Kafka CPU-bound на сжатии и TLS |
| RAM | 32–64 ГБ | 6 ГБ heap + остальное для page cache |
| Disk | SSD/NVMe, JBOD | Последовательные I/O, RAID не нужен |
| Network | 10 Gbps | Репликация × replication factor |
Page Cache — секрет производительности Kafka
Kafka активно использует OS page cache. Чем больше RAM — тем больше данных читается из кэша, а не с диска. Не выделяй весь RAM на JVM heap — 6 ГБ достаточно.
Формула для количества partitions¶
- Один partition даёт ~10 МБ/с на запись
- Consumer-ов не может быть больше, чем partitions
- Начни с 6–12 partitions на topic, увеличивай по мере роста
Безопасность¶
Безопасность в Kafka — не опция, а фундамент продакшн-инсталляции.
Encryption: SSL/TLS¶
Защита данных в транзите (client ↔ broker, broker ↔ broker):
# Broker config
listeners=SSL://0.0.0.0:9093
ssl.keystore.location=/var/kafka/ssl/kafka.keystore.jks
ssl.keystore.password=${file:/secrets/ssl.properties:keystore_password}
ssl.truststore.location=/var/kafka/ssl/kafka.truststore.jks
# Inter-broker communication
security.inter.broker.protocol=SSL
Authentication: SASL/SCRAM¶
Аутентификация клиентов через логин/пароль:
# Broker
listeners=SASL_SSL://0.0.0.0:9094
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# Создать пользователя
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config 'SCRAM-SHA-512=[iterations=8192,password=etl_pass]' \
--entity-type users --entity-name etl_user
Authorization: ACL¶
Разграничение прав через Access Control Lists:
# Разрешить etl_user читать из топика raw.orders
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:etl_user \
--operation Read --topic raw.orders
# Разрешить запись в топик
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:producer_app \
--operation Write --topic raw.orders
# Список ACL
kafka-acls.sh --bootstrap-server localhost:9092 --list
RBAC для Enterprise
В Confluent Platform доступен RBAC (Role-Based Access Control) — централизованное управление ролями через Control Center.
Schema Registry¶
Без управления схемами Kafka — «глупая труба» с невалидными байтами. Schema Registry делает данные самоописывающимися.
Зачем нужен¶
Без Schema Registry:
Producer → [{"name": "Иван", "age": 30}] → Kafka → Consumer
↑ а если завтра age станет string?
С Schema Registry:
Producer → [Schema ID: 42, bytes...] → Kafka → Consumer
↓ ↓
Schema Registry ←─── запрос схемы ────→ Schema Registry
Producer регистрирует схему и отправляет только Schema ID в заголовках. Consumer получает схему по ID. Это экономит трафик и гарантирует контракт.
Форматы¶
| Формат | Размер | Типизация | Эволюция | Когда использовать |
|---|---|---|---|---|
| JSON Schema | Большой | Слабая | Да | Прототипы, дебаг |
| Avro | Малый | Строгая | Да | Стандарт индустрии |
| Protobuf | Малый | Строгая | Да | gRPC-совместимость |
Режимы совместимости¶
| Режим | Описание | Что можно менять |
|---|---|---|
BACKWARD |
Новые consumer-ы читают старые данные | Удалять поля, добавлять опциональные |
FORWARD |
Старые consumer-ы читают новые данные | Добавлять поля, удалять опциональные |
FULL |
Двусторонняя совместимость | Добавлять/удалять только опциональные |
NONE |
Без проверок | Что угодно (опасно!) |
# Зарегистрировать Avro-схему
curl -X POST http://localhost:8081/subjects/raw.orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"region\",\"type\":\"string\"}]}"}'
# Проверить совместимость перед деплоем
curl -X POST http://localhost:8081/compatibility/subjects/raw.orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "..."}'
# Установить режим совместимости
curl -X PUT http://localhost:8081/config/raw.orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
Schema Registry в ETL
Регистрируй схемы в CI/CD. Если новая версия схемы несовместима — пайплайн не деплоится. Это дешевле, чем ловить ошибки в продакшне.
Мониторинг¶
Критические метрики (JMX)¶
| Метрика | Описание | Алерт |
|---|---|---|
UnderReplicatedPartitions |
Partitions с неполной репликацией | P1 если > 0 дольше 5 мин |
ActiveControllerCount |
Контроллеров в кластере | P1 если ≠ 1 |
OfflinePartitionsCount |
Partitions без лидера | P1 если > 0 |
BytesInPerSec / BytesOutPerSec |
Скорость записи/чтения | Алерт при > 80% пропускной способности |
RequestsPerSec |
Запросов в секунду | Перегрузка брокера |
ConsumerLag |
Задержка потребления | P2 если растёт 15+ мин |
IsrShrinksPerSec / IsrExpandsPerSec |
Изменения ISR | Нестабильность кластера |
Prometheus + Grafana¶
services:
kafka:
image: apache/kafka:3.8.1
environment:
KAFKA_OPTS: "-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
ports:
- "7071:7071" # Prometheus metrics
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
Мониторинг Consumer Lag¶
# Встроенный CLI
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group etl-pipeline
# Вывод:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# orders 0 12340 12345 5 worker-1
# orders 1 8920 8950 30 worker-2
Инструменты:
| Инструмент | Описание |
|---|---|
| Burrow | Мониторинг lag от LinkedIn, определяет «здоровье» consumer-а |
| kafka-lag-exporter | Prometheus-экспортёр для consumer lag |
| AKHQ / Kafka UI | Веб-интерфейс: топики, consumer groups, сообщения |
Алертинг: runbook¶
| Алерт | Причина | Первые шаги |
|---|---|---|
UnderReplicatedPartitions > 0 |
Брокер упал или сеть | Проверить статус брокеров, disk I/O, сеть |
ActiveControllerCount ≠ 1 |
Потеря controller-а | Проверить ZK/KRaft, рестартнуть «лишний» controller |
ConsumerLag растёт |
Consumer не справляется | CPU/IO воркеров → масштабирование → оптимизация |
OfflinePartitions > 0 |
Все реплики недоступны | Проверить брокеры, рассмотреть unclean election |
Траблшутинг¶
Consumer Lag растёт¶
Чек-лист:
1. Ресурсы воркеров (CPU, IO, сеть) — часто медленный внешний API
2. max.poll.records слишком большой — уменьшить
3. Consumer-ов < partitions — добавить воркеры
4. Частые rebalance-ы — включить Static Membership
5. Медленная запись в БД — оптимизировать или использовать batch insert
Rebalance Storms¶
Причина: consumer тратит на обработку больше max.poll.interval.ms → группа считает его мёртвым → rebalance → ещё медленнее → loop.
Решение:
# Уменьшить размер батча
max.poll.records=100
# Или увеличить таймаут (временная мера)
max.poll.interval.ms=600000
# Включить cooperative rebalancing
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# Static Membership для K8s
group.instance.id=worker-1
session.timeout.ms=120000
Broker Out of Sync¶
# Проверить under-replicated partitions
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --under-replicated-partitions
# Проверить ISR для конкретного топика
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
Причины: медленный диск, перегрузка сети, GC-паузы. Решение: проверить replica.lag.time.max.ms, disk I/O, GC логи.
Disk Full¶
# Проверить использование диска
du -sh /var/kafka-logs/*
# Уменьшить retention для горячих топиков
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name hot-topic \
--add-config retention.ms=86400000 # 1 день
# Удалить старые данные из топика (осторожно!)
kafka-delete-records.sh --bootstrap-server localhost:9092 \
--offset-json-file delete-offsets.json
Операции¶
Rolling Upgrade¶
Обновление брокеров по одному без простоя:
1. Обновить софт на broker-1
2. Запустить broker-1
3. Дождаться UnderReplicatedPartitions = 0
4. Повторить для broker-2, broker-3, ...
Порядок
Никогда не обновляй следующий брокер, пока UnderReplicatedPartitions > 0. Это может привести к потере данных.
Partition Reassignment¶
При добавлении новых брокеров — перераспределение partitions:
# Сгенерировать план
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--topics-to-move-json-file topics.json \
--broker-list "0,1,2,3" --generate
# Выполнить
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file plan.json --execute
# Проверить прогресс
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file plan.json --verify
Preferred Leader Election¶
После рестарта брокеры могут стать followers. Вернуть лидерство «предпочтительным» узлам:
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type PREFERRED --all-topic-partitions
Kafka на Kubernetes¶
Strimzi Operator¶
Strimzi — стандартный оператор для Kafka в K8s. Автоматизирует:
- Создание кластера через Custom Resources (CRD)
- Rolling upgrades
- Управление сертификатами TLS
- Управление пользователями и топиками
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: etl-cluster
spec:
kafka:
version: 3.8.1
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
min.insync.replicas: 2
default.replication.factor: 3
storage:
type: persistent-claim
size: 100Gi
class: gp3
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
StatefulSet vs Bare Metal¶
| Критерий | Kubernetes | Bare Metal |
|---|---|---|
| Развёртывание | Минуты (CRD) | Часы |
| Масштабирование | replicas: 5 |
Ручное |
| Storage | PersistentVolume (EBS, gp3) | Локальные SSD |
| Latency | Чуть выше (overlay network) | Минимальная |
| Операции | Strimzi автоматизирует | Ansible/Chef |
Anti-Affinity
Настрой pod anti-affinity, чтобы реплики одной partition не оказались на одном физическом узле:
Что запомнить¶
| Тема | Ключевая мысль |
|---|---|
| Сайзинг | 6 ГБ heap + максимум RAM для page cache |
| Безопасность | SSL/TLS + SASL/SCRAM + ACL — обязательная тройка |
| Schema Registry | Avro + BACKWARD compatibility = стандарт |
| Мониторинг | UnderReplicatedPartitions — алерт номер один |
| Consumer Lag | Главная метрика здоровья ETL-пайплайнов |
| Rolling Upgrade | По одному брокеру, ждать ISR sync |
| Kubernetes | Strimzi + anti-affinity + PersistentVolume |
Проверь себя¶
Источники¶
- Apache Kafka Operations — операции с кластером
- Confluent Schema Registry — управление схемами
- Strimzi — Kafka on Kubernetes — оператор для K8s
- Kafka Monitoring with Prometheus — мониторинг через JMX