Kafka Connect — коннекторы к базам и файлам¶
Kafka Connect — фреймворк для интеграции Kafka с внешними системами без написания кода. Вместо Python-скриптов ты описываешь интеграцию в JSON-конфиге: откуда забирать данные (source) и куда складывать (sink). Kafka Connect масштабируется горизонтально и обрабатывает ошибки автоматически.
Архитектура¶
┌───────────┐ ┌──────────────────────────┐ ┌───────────┐
│ PostgreSQL│ ──→ │ Kafka Connect │ ──→ │ S3 │
│ (source) │ │ │ │ (sink) │
└───────────┘ │ ┌────────────────────┐ │ └───────────┘
│ │ JDBC Source │ │
│ │ Connector │──┼──→ Kafka Topic
│ └────────────────────┘ │
│ │
│ ┌────────────────────┐ │
◀──────┤ │ S3 Sink │ │
│ │ Connector │◀─┼── Kafka Topic
│ └────────────────────┘ │
└──────────────────────────┘
Компоненты¶
| Компонент | Описание |
|---|---|
| Source Connector | Читает данные из внешней системы → пишет в Kafka topic |
| Sink Connector | Читает данные из Kafka topic → пишет во внешнюю систему |
| Worker | JVM-процесс, выполняющий коннекторы |
| Task | Единица параллелизма внутри коннектора |
| Converter | Сериализация: JSON, Avro, Protobuf |
| SMT | Single Message Transform — трансформации на лету |
Standalone vs Distributed¶
| Режим | Описание | Когда использовать |
|---|---|---|
| Standalone | Один процесс, конфиг в файле | Тесты, разработка |
| Distributed | Кластер workers, конфиг через REST API | Продакшн |
В distributed-режиме workers автоматически перераспределяют tasks при сбоях. Конфиги, offset-ы и статусы хранятся в внутренних Kafka-топиках.
Жизненный цикл записи (Source)¶
- Source Connector извлекает данные (polling или чтение логов)
- Конвертирует во внутреннюю структуру
ConnectRecord - Применяются SMT-трансформации
- Converter преобразует в байты (JSON, Avro)
- Producer отправляет в Kafka (с idempotent writes)
Source vs Sink¶
| Тип | Направление | Примеры |
|---|---|---|
| Source | Внешняя система → Kafka | JDBC Source, Debezium, File Source |
| Sink | Kafka → Внешняя система | JDBC Sink, S3 Sink, Elasticsearch Sink |
Типичный DE-пайплайн:
topic.prefix
Используй префиксы для организации: raw. для source-данных, clean. для обработанных, agg. для агрегатов.
JDBC Source Connector¶
Забирает данные из реляционной БД по расписанию (polling).
Режимы работы¶
| Режим | Описание | Видит INSERT | Видит UPDATE | Видит DELETE |
|---|---|---|---|---|
| bulk | Полная копия таблицы каждый раз | Да | Да | Да |
| incrementing | Новые строки по auto-increment ID | Да | Нет | Нет |
| timestamp | Строки с updated_at > последнего опроса |
Да | Да | Нет |
| timestamp+incrementing | Комбинация: новые + обновлённые | Да | Да | Нет |
JDBC Source не видит DELETE
Ни один режим JDBC Source (кроме bulk) не захватывает удаления. Для DELETE нужен Debezium CDC.
Конфигурация¶
{
"name": "jdbc-source-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/shop",
"connection.user": "etl_user",
"connection.password": "${file:/secrets/jdbc.properties:password}",
"table.whitelist": "orders,order_items",
"mode": "timestamp+incrementing",
"timestamp.column.name": "updated_at",
"incrementing.column.name": "id",
"topic.prefix": "raw.",
"poll.interval.ms": 10000,
"batch.max.rows": 1000,
"transforms": "createKey",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id"
}
}
Тюнинг нагрузки на БД¶
| Параметр | Описание | Компромисс |
|---|---|---|
poll.interval.ms |
Частота опроса | Чаще → ниже latency, выше нагрузка |
batch.max.rows |
Записей за один SELECT | Больше → выше throughput, дольше запрос |
table.whitelist |
Список таблиц | Только нужные, не * |
query |
Кастомный SQL | Гибко, но нужен incrementing.column.name |
# Создать коннектор через REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-source-orders.json
# Проверить статус
curl http://localhost:8083/connectors/jdbc-source-orders/status | jq .
Debezium — CDC из PostgreSQL¶
Debezium — специализированный source-коннектор для Change Data Capture. В отличие от JDBC Source (polling), Debezium читает WAL (Write-Ahead Log) и захватывает изменения в реальном времени.
JDBC Source vs Debezium¶
| Критерий | JDBC Source | Debezium CDC |
|---|---|---|
| Метод | Polling (SELECT каждые N секунд) | WAL streaming |
| Задержка | Секунды–минуты | Миллисекунды |
| DELETE | Не видит | Видит |
| Schema changes | Не видит | Видит |
| Нагрузка на БД | Запросы к таблицам | Минимальная (читает WAL) |
| Настройка | Простая | Нужен replication slot |
Настройка PostgreSQL для CDC¶
-- Включить logical replication
ALTER SYSTEM SET wal_level = logical;
-- Перезапустить PostgreSQL
-- Создать publication (какие таблицы слушать)
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- Проверить
SELECT * FROM pg_publication;
Конфигурация Debezium¶
{
"name": "debezium-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "etl_user",
"database.password": "etl_pass",
"database.dbname": "shop",
"topic.prefix": "cdc",
"schema.include.list": "public",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "dbz_publication",
"snapshot.mode": "initial",
"heartbeat.interval.ms": 10000
}
}
Snapshot Modes¶
| Режим | Описание | Когда использовать |
|---|---|---|
initial |
Полный снимок → стриминг WAL | Первый запуск |
schema_only |
Только схема, без данных | Данные уже синхронизированы |
never |
Только WAL, без снимка | Рестарт с сохранённым offset |
when_needed |
Снимок если offset потерян | Автоматическое восстановление |
Signal Table и Incremental Snapshots¶
Debezium 1.7+ поддерживает incremental snapshots через signal table — можно запустить snapshot одной таблицы без остановки CDC:
-- Создать signal table
CREATE TABLE debezium_signal (
id VARCHAR(42) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2048)
);
-- Запустить incremental snapshot
INSERT INTO debezium_signal (id, type, data) VALUES (
'snap-001',
'execute-snapshot',
'{"data-collections": ["public.orders"]}'
);
Replication slots и диск
Если Debezium остановлен, PostgreSQL продолжает хранить WAL для slot-а. Это может заполнить диск!
S3 Sink Connector¶
Записывает данные из Kafka в S3 (или MinIO).
Конфигурация¶
{
"name": "s3-sink-orders",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "raw.orders",
"s3.bucket.name": "data-lake",
"s3.region": "eu-west-1",
"flush.size": 1000,
"rotate.interval.ms": 60000,
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"partition.duration.ms": 86400000,
"locale": "en-US",
"timezone": "UTC",
"storage.class": "io.confluent.connect.s3.storage.S3Storage"
}
}
Результат: s3://data-lake/topics/raw.orders/year=2025/month=06/day=15/raw.orders+0+0000000000.parquet
Параметры производительности¶
| Параметр | Описание | Компромисс |
|---|---|---|
flush.size |
Записей в одном файле | Больше → меньше файлов, выше latency |
rotate.interval.ms |
Макс. время до закрытия файла | Гарантирует «свежесть» данных |
rotate.schedule.interval.ms |
Закрытие по расписанию (wall clock) | Предсказуемые файлы для downstream |
flush.size + rotate.interval.ms
Используй оба параметра. flush.size=1000 закрывает файл по количеству, rotate.interval.ms=60000 — по времени. Что сработает первым — то и закроет файл.
SMT: Single Message Transforms¶
SMT — лёгкие трансформации без Kafka Streams. Применяются на лету в коннекторе.
Типичные SMT¶
| SMT | Описание | Кейс |
|---|---|---|
ValueToKey |
Создаёт ключ из полей value | Ключ по order_id для compacted topic |
TimestampRouter |
Добавляет timestamp в имя topic | raw.orders → raw.orders-2025-06 |
RegexRouter |
Переименование topic по regex | Маршрутизация по имени таблицы |
MaskField |
Маскирует значения полей | PII/GDPR: маскировка email, телефона |
InsertField |
Добавляет поля | Добавить ingestion_ts или source_system |
Filter |
Фильтрация записей | Пропустить записи со status=test |
Пример: маскирование PII + routing¶
"transforms": "MaskPII,AddTimestamp,RouteByDate",
"transforms.MaskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskPII.fields": "email,phone",
"transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddTimestamp.timestamp.field": "ingested_at",
"transforms.RouteByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.RouteByDate.topic.format": "${topic}-${timestamp}",
"transforms.RouteByDate.timestamp.format": "yyyyMMdd"
Ограничения SMT
SMT — для простых преобразований (rename, mask, filter). Для JOIN, агрегаций и сложной логики используй Kafka Streams или Flink.
Dead Letter Queue (DLQ) для коннекторов¶
Если сообщение вызывает ошибку (например, невалидный JSON), коннектор может упасть. DLQ позволяет продолжить работу:
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-s3-sink",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
| Параметр | Описание |
|---|---|
errors.tolerance=all |
Пропускать ошибочные записи (не падать) |
errors.deadletterqueue.topic.name |
Куда складывать ошибки |
errors.deadletterqueue.context.headers.enable |
Добавить метаданные об ошибке в headers |
Мониторинг DLQ
Настрой алерт на рост сообщений в DLQ-топике. Без этого «тихие» ошибки пропадут незамеченными.
Docker Compose с Kafka Connect¶
services:
kafka:
image: apache/kafka:3.8.1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: confluentinc/cp-kafka-connect:7.7.0
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
depends_on:
- kafka
REST API для управления¶
# Список коннекторов
curl http://localhost:8083/connectors | jq .
# Создать коннектор
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-source-orders.json
# Статус (включая все tasks)
curl http://localhost:8083/connectors/jdbc-source-orders/status | jq .
# Валидация конфига перед деплоем
curl -X PUT http://localhost:8083/connector-plugins/JdbcSourceConnector/config/validate \
-H "Content-Type: application/json" \
-d @jdbc-source-orders.json | jq .
# Список установленных плагинов
curl http://localhost:8083/connector-plugins | jq .
# Перезапустить упавшую task
curl -X POST http://localhost:8083/connectors/jdbc-source-orders/tasks/0/restart
# Приостановить / возобновить / удалить
curl -X PUT http://localhost:8083/connectors/jdbc-source-orders/pause
curl -X PUT http://localhost:8083/connectors/jdbc-source-orders/resume
curl -X DELETE http://localhost:8083/connectors/jdbc-source-orders
Продакшн: Troubleshooting¶
| Проблема | Причина | Решение |
|---|---|---|
| Connector в статусе FAILED | Ошибка десериализации, сетевой таймаут | Проверь логи (SerializationException), настрой DLQ |
| Task restart loop | Превышен max.poll.interval.ms |
Уменьшить max.poll.records, оптимизировать SMT |
| Дубликаты в Sink | Нет идемпотентности в целевой системе | Включить exactly.once, использовать UPSERT |
| Zombie instances | Проблемы сети, fencing | Настроить transactional.id для exactly-once |
| Offset-ы потеряны | Retention offset-топика истёк | Увеличить offset.storage.replication.factor, мониторить |
Чек-лист перед продакшном¶
- [ ] Уникальный
group.idдля каждого кластера Connect - [ ]
enable.idempotence=trueдля гарантий доставки - [ ]
max.poll.interval.ms> времени обработки самого тяжёлого батча - [ ] Настроен DLQ с алертингом на рост
- [ ] JMX-метрики экспортируются в Prometheus
- [ ]
replication.factor ≥ 3для внутренних топиков Connect - [ ] Пароли вынесены в
ConfigProvider(не в JSON-конфиге)
Что запомнить¶
| Тема | Ключевая мысль |
|---|---|
| Source vs Sink | Source: внешняя система → Kafka. Sink: Kafka → внешняя система |
| JDBC Source | Polling по расписанию, не видит DELETE |
| Debezium | CDC через WAL, real-time, видит всё (INSERT/UPDATE/DELETE) |
| SMT | Лёгкие трансформации на лету (mask, route, filter) |
| DLQ | Poison pill → в DLQ, а не остановка конвейера |
| REST API | Управление коннекторами через HTTP на порту 8083 |
| Distributed | Автоматическое перераспределение tasks при сбоях |
Проверь себя¶
Что дальше?¶
- Потребление данных из Kafka — Consumer API, offset management, Python-примеры
- Kafka в продакшне — безопасность, Schema Registry, мониторинг и эксплуатация
Источники¶
- Kafka Connect Documentation — официальная документация
- Confluent JDBC Connector — настройка JDBC Source/Sink
- Debezium PostgreSQL Connector — CDC для PostgreSQL
- Confluent S3 Sink Connector — запись в S3