Перейти к содержанию

Kafka Connect — коннекторы к базам и файлам

Kafka Connect — фреймворк для интеграции Kafka с внешними системами без написания кода. Вместо Python-скриптов ты описываешь интеграцию в JSON-конфиге: откуда забирать данные (source) и куда складывать (sink). Kafka Connect масштабируется горизонтально и обрабатывает ошибки автоматически.


Архитектура

Text Only
┌───────────┐     ┌──────────────────────────┐     ┌───────────┐
│ PostgreSQL│ ──→ │     Kafka Connect         │ ──→ │   S3      │
│ (source)  │     │                          │     │  (sink)   │
└───────────┘     │  ┌────────────────────┐  │     └───────────┘
                  │  │ JDBC Source        │  │
                  │  │ Connector          │──┼──→ Kafka Topic
                  │  └────────────────────┘  │
                  │                          │
                  │  ┌────────────────────┐  │
           ◀──────┤  │ S3 Sink           │  │
                  │  │ Connector          │◀─┼── Kafka Topic
                  │  └────────────────────┘  │
                  └──────────────────────────┘

Компоненты

Компонент Описание
Source Connector Читает данные из внешней системы → пишет в Kafka topic
Sink Connector Читает данные из Kafka topic → пишет во внешнюю систему
Worker JVM-процесс, выполняющий коннекторы
Task Единица параллелизма внутри коннектора
Converter Сериализация: JSON, Avro, Protobuf
SMT Single Message Transform — трансформации на лету

Standalone vs Distributed

Режим Описание Когда использовать
Standalone Один процесс, конфиг в файле Тесты, разработка
Distributed Кластер workers, конфиг через REST API Продакшн

В distributed-режиме workers автоматически перераспределяют tasks при сбоях. Конфиги, offset-ы и статусы хранятся в внутренних Kafka-топиках.

Жизненный цикл записи (Source)

Text Only
БД → [Source Connector] → ConnectRecord → [SMT] → [Converter] → [Producer] → Kafka Topic
  1. Source Connector извлекает данные (polling или чтение логов)
  2. Конвертирует во внутреннюю структуру ConnectRecord
  3. Применяются SMT-трансформации
  4. Converter преобразует в байты (JSON, Avro)
  5. Producer отправляет в Kafka (с idempotent writes)

Source vs Sink

Тип Направление Примеры
Source Внешняя система → Kafka JDBC Source, Debezium, File Source
Sink Kafka → Внешняя система JDBC Sink, S3 Sink, Elasticsearch Sink

Типичный DE-пайплайн:

Text Only
PostgreSQL → [JDBC Source] → Kafka → [S3 Sink] → S3 (Parquet)
                                   → [JDBC Sink] → ClickHouse

topic.prefix

Используй префиксы для организации: raw. для source-данных, clean. для обработанных, agg. для агрегатов.


JDBC Source Connector

Забирает данные из реляционной БД по расписанию (polling).

Режимы работы

Режим Описание Видит INSERT Видит UPDATE Видит DELETE
bulk Полная копия таблицы каждый раз Да Да Да
incrementing Новые строки по auto-increment ID Да Нет Нет
timestamp Строки с updated_at > последнего опроса Да Да Нет
timestamp+incrementing Комбинация: новые + обновлённые Да Да Нет

JDBC Source не видит DELETE

Ни один режим JDBC Source (кроме bulk) не захватывает удаления. Для DELETE нужен Debezium CDC.

Конфигурация

jdbc-source-orders.json
{
  "name": "jdbc-source-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/shop",
    "connection.user": "etl_user",
    "connection.password": "${file:/secrets/jdbc.properties:password}",
    "table.whitelist": "orders,order_items",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id",
    "topic.prefix": "raw.",
    "poll.interval.ms": 10000,
    "batch.max.rows": 1000,
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id"
  }
}

Тюнинг нагрузки на БД

Параметр Описание Компромисс
poll.interval.ms Частота опроса Чаще → ниже latency, выше нагрузка
batch.max.rows Записей за один SELECT Больше → выше throughput, дольше запрос
table.whitelist Список таблиц Только нужные, не *
query Кастомный SQL Гибко, но нужен incrementing.column.name
Bash
# Создать коннектор через REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @jdbc-source-orders.json

# Проверить статус
curl http://localhost:8083/connectors/jdbc-source-orders/status | jq .

Debezium — CDC из PostgreSQL

Debezium — специализированный source-коннектор для Change Data Capture. В отличие от JDBC Source (polling), Debezium читает WAL (Write-Ahead Log) и захватывает изменения в реальном времени.

JDBC Source vs Debezium

Критерий JDBC Source Debezium CDC
Метод Polling (SELECT каждые N секунд) WAL streaming
Задержка Секунды–минуты Миллисекунды
DELETE Не видит Видит
Schema changes Не видит Видит
Нагрузка на БД Запросы к таблицам Минимальная (читает WAL)
Настройка Простая Нужен replication slot

Настройка PostgreSQL для CDC

SQL
-- Включить logical replication
ALTER SYSTEM SET wal_level = logical;
-- Перезапустить PostgreSQL

-- Создать publication (какие таблицы слушать)
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- Проверить
SELECT * FROM pg_publication;

Конфигурация Debezium

debezium-postgres.json
{
  "name": "debezium-postgres",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "etl_user",
    "database.password": "etl_pass",
    "database.dbname": "shop",
    "topic.prefix": "cdc",
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "dbz_publication",
    "snapshot.mode": "initial",
    "heartbeat.interval.ms": 10000
  }
}

Snapshot Modes

Режим Описание Когда использовать
initial Полный снимок → стриминг WAL Первый запуск
schema_only Только схема, без данных Данные уже синхронизированы
never Только WAL, без снимка Рестарт с сохранённым offset
when_needed Снимок если offset потерян Автоматическое восстановление

Signal Table и Incremental Snapshots

Debezium 1.7+ поддерживает incremental snapshots через signal table — можно запустить snapshot одной таблицы без остановки CDC:

SQL
-- Создать signal table
CREATE TABLE debezium_signal (
    id VARCHAR(42) PRIMARY KEY,
    type VARCHAR(32) NOT NULL,
    data VARCHAR(2048)
);

-- Запустить incremental snapshot
INSERT INTO debezium_signal (id, type, data) VALUES (
    'snap-001',
    'execute-snapshot',
    '{"data-collections": ["public.orders"]}'
);

Replication slots и диск

Если Debezium остановлен, PostgreSQL продолжает хранить WAL для slot-а. Это может заполнить диск!

SQL
-- Мониторинг размера WAL
SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots;


S3 Sink Connector

Записывает данные из Kafka в S3 (или MinIO).

Конфигурация

s3-sink-orders.json
{
  "name": "s3-sink-orders",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "raw.orders",
    "s3.bucket.name": "data-lake",
    "s3.region": "eu-west-1",
    "flush.size": 1000,
    "rotate.interval.ms": 60000,
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
    "partition.duration.ms": 86400000,
    "locale": "en-US",
    "timezone": "UTC",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage"
  }
}

Результат: s3://data-lake/topics/raw.orders/year=2025/month=06/day=15/raw.orders+0+0000000000.parquet

Параметры производительности

Параметр Описание Компромисс
flush.size Записей в одном файле Больше → меньше файлов, выше latency
rotate.interval.ms Макс. время до закрытия файла Гарантирует «свежесть» данных
rotate.schedule.interval.ms Закрытие по расписанию (wall clock) Предсказуемые файлы для downstream

flush.size + rotate.interval.ms

Используй оба параметра. flush.size=1000 закрывает файл по количеству, rotate.interval.ms=60000 — по времени. Что сработает первым — то и закроет файл.


SMT: Single Message Transforms

SMT — лёгкие трансформации без Kafka Streams. Применяются на лету в коннекторе.

Типичные SMT

SMT Описание Кейс
ValueToKey Создаёт ключ из полей value Ключ по order_id для compacted topic
TimestampRouter Добавляет timestamp в имя topic raw.ordersraw.orders-2025-06
RegexRouter Переименование topic по regex Маршрутизация по имени таблицы
MaskField Маскирует значения полей PII/GDPR: маскировка email, телефона
InsertField Добавляет поля Добавить ingestion_ts или source_system
Filter Фильтрация записей Пропустить записи со status=test

Пример: маскирование PII + routing

JSON
"transforms": "MaskPII,AddTimestamp,RouteByDate",

"transforms.MaskPII.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskPII.fields": "email,phone",

"transforms.AddTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddTimestamp.timestamp.field": "ingested_at",

"transforms.RouteByDate.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.RouteByDate.topic.format": "${topic}-${timestamp}",
"transforms.RouteByDate.timestamp.format": "yyyyMMdd"

Ограничения SMT

SMT — для простых преобразований (rename, mask, filter). Для JOIN, агрегаций и сложной логики используй Kafka Streams или Flink.


Dead Letter Queue (DLQ) для коннекторов

Если сообщение вызывает ошибку (например, невалидный JSON), коннектор может упасть. DLQ позволяет продолжить работу:

JSON
{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-s3-sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.log.enable": true,
  "errors.log.include.messages": true
}
Параметр Описание
errors.tolerance=all Пропускать ошибочные записи (не падать)
errors.deadletterqueue.topic.name Куда складывать ошибки
errors.deadletterqueue.context.headers.enable Добавить метаданные об ошибке в headers

Мониторинг DLQ

Настрой алерт на рост сообщений в DLQ-топике. Без этого «тихие» ошибки пропадут незамеченными.


Docker Compose с Kafka Connect

docker-compose.yml
services:
  kafka:
    image: apache/kafka:3.8.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: confluentinc/cp-kafka-connect:7.7.0
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
    depends_on:
      - kafka

REST API для управления

Bash
# Список коннекторов
curl http://localhost:8083/connectors | jq .

# Создать коннектор
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @jdbc-source-orders.json

# Статус (включая все tasks)
curl http://localhost:8083/connectors/jdbc-source-orders/status | jq .

# Валидация конфига перед деплоем
curl -X PUT http://localhost:8083/connector-plugins/JdbcSourceConnector/config/validate \
  -H "Content-Type: application/json" \
  -d @jdbc-source-orders.json | jq .

# Список установленных плагинов
curl http://localhost:8083/connector-plugins | jq .

# Перезапустить упавшую task
curl -X POST http://localhost:8083/connectors/jdbc-source-orders/tasks/0/restart

# Приостановить / возобновить / удалить
curl -X PUT http://localhost:8083/connectors/jdbc-source-orders/pause
curl -X PUT http://localhost:8083/connectors/jdbc-source-orders/resume
curl -X DELETE http://localhost:8083/connectors/jdbc-source-orders

Продакшн: Troubleshooting

Проблема Причина Решение
Connector в статусе FAILED Ошибка десериализации, сетевой таймаут Проверь логи (SerializationException), настрой DLQ
Task restart loop Превышен max.poll.interval.ms Уменьшить max.poll.records, оптимизировать SMT
Дубликаты в Sink Нет идемпотентности в целевой системе Включить exactly.once, использовать UPSERT
Zombie instances Проблемы сети, fencing Настроить transactional.id для exactly-once
Offset-ы потеряны Retention offset-топика истёк Увеличить offset.storage.replication.factor, мониторить

Чек-лист перед продакшном

  • [ ] Уникальный group.id для каждого кластера Connect
  • [ ] enable.idempotence=true для гарантий доставки
  • [ ] max.poll.interval.ms > времени обработки самого тяжёлого батча
  • [ ] Настроен DLQ с алертингом на рост
  • [ ] JMX-метрики экспортируются в Prometheus
  • [ ] replication.factor ≥ 3 для внутренних топиков Connect
  • [ ] Пароли вынесены в ConfigProvider (не в JSON-конфиге)

Что запомнить

Тема Ключевая мысль
Source vs Sink Source: внешняя система → Kafka. Sink: Kafka → внешняя система
JDBC Source Polling по расписанию, не видит DELETE
Debezium CDC через WAL, real-time, видит всё (INSERT/UPDATE/DELETE)
SMT Лёгкие трансформации на лету (mask, route, filter)
DLQ Poison pill → в DLQ, а не остановка конвейера
REST API Управление коннекторами через HTTP на порту 8083
Distributed Автоматическое перераспределение tasks при сбоях

Проверь себя


Что дальше?


Источники