Перейти к содержанию

CDC: Change Data Capture

Зачем это DE?

Полная выгрузка таблицы с 100 млн строк каждую ночь — медленно и расточительно. CDC захватывает только изменения (INSERT, UPDATE, DELETE) и передаёт их в реальном времени. Это основа modern data stack: PostgreSQL → Debezium → Kafka → DWH.


Подходы к CDC

Подход Как работает Задержка Нагрузка на source
Timestamp-based WHERE updated_at > last_run Минуты Запрос к БД
Trigger-based Триггер пишет в audit-таблицу Секунды Триггер на каждой операции
Log-based Читает WAL/binlog БД Секунды Минимальная (читает лог)

Timestamp-based (простой)

SQL
-- Airflow: каждый час забираем свежие заказы из etl_kitchen_db
SELECT * FROM orders
WHERE order_date > '{{ prev_execution_date }}'
  AND order_date <= '{{ execution_date }}';

Плюсы: просто, работает везде. Минусы: не ловит DELETE, нужен столбец с датой обновления, нагрузка на БД при больших таблицах.


Trigger-based CDC

Триггеры автоматически записывают изменения в отдельную таблицу аудита при каждой DML-операции. Реализуем на таблице orders из etl_kitchen_db.

SQL-реализация

SQL
-- Таблица для хранения истории изменений
CREATE TABLE orders_audit_log (
    audit_id   SERIAL PRIMARY KEY,
    order_id   INT,
    old_status TEXT,
    new_status TEXT,
    changed_at TIMESTAMP DEFAULT now()
);

-- Функция: записываем изменение статуса
CREATE OR REPLACE FUNCTION track_order_status()
RETURNS TRIGGER AS $$
BEGIN
    IF OLD.status IS DISTINCT FROM NEW.status THEN
        INSERT INTO orders_audit_log(order_id, old_status, new_status)
        VALUES (OLD.order_id, OLD.status, NEW.status);
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Привязываем триггер к таблице orders
CREATE TRIGGER trg_order_status_cdc
AFTER UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION track_order_status();

Пример работы

SQL
-- Обновляем статус заказа
UPDATE orders SET status = 'delivered' WHERE order_id = 28;

-- Проверяем лог
SELECT * FROM orders_audit_log;
audit_id order_id old_status new_status changed_at
1 28 shipped delivered 2026-04-03 12:00:00

Плюсы: не нужны Kafka и Debezium, ловит DELETE (если добавить AFTER DELETE), работает в той же транзакции. Минусы: высокая нагрузка (каждая операция = две записи), нужно менять схему БД и поддерживать триггеры на каждой таблице.


Как работает WAL и логическое декодирование

Чтобы понять log-based CDC, нужно разобраться, как PostgreSQL гарантирует сохранность данных.

WAL: сначала пишем в лог

Изменения никогда не попадают сразу в файлы таблиц и индексов. Сначала описание изменения записывается в WAL (Write-Ahead Log) — последовательный поток бинарных данных на диске. Если сервер упадёт — база восстановится, «проиграв» лог заново (операция REDO).

Уровни wal_level

Уровень Что в логе Для чего
minimal Только данные для восстановления после сбоя Standalone без репликации
replica + данные для физической репликации (блоки) Standby-серверы
logical + метаданные для декодирования строк (INSERT/UPDATE/DELETE) CDC, Debezium

Для CDC обязателен уровень logical — он позволяет «раскодировать» бинарный WAL обратно в логические операции.

Репликационный слот

Слот — это «умная закладка» на стороне сервера. PostgreSQL не удалит WAL-файлы, пока consumer (Debezium) не подтвердит, что прочитал их.

Риск: растущий WAL

Если consumer отвалится или начнёт отставать, WAL-файлы будут копиться бесконечно. Это может привести к остановке БД из-за нехватки места на диске. Мониторь pg_replication_slots и настраивай max_slot_wal_keep_size.

Плагин pgoutput

Сам WAL — бинарная «каша». Плагин логического декодирования pgoutput превращает её в структурированный поток данных. Он встроен в ядро PostgreSQL (с версии 10), и Debezium рекомендует использовать именно его.

Настройка и мониторинг

SQL
-- Включаем логическое декодирование (рестарт PostgreSQL обязателен)
ALTER SYSTEM SET wal_level = 'logical';

-- После рестарта и запуска Debezium — проверяем слоты
SELECT slot_name, plugin, slot_type, active, restart_lsn
FROM pg_replication_slots;
slot_name plugin slot_type active restart_lsn
debezium_orders pgoutput logical t 0/1A3B4C0

Log-based CDC: Debezium

Debezium читает WAL PostgreSQL — поток изменений, который БД и так пишет для репликации.

Архитектура

Text Only
┌──────────────┐     ┌──────────────┐     ┌──────────┐     ┌────────────┐
│  PostgreSQL  │ ──→ │   Debezium   │ ──→ │  Kafka   │ ──→ │  Consumer  │
│  (WAL)       │     │ (Kafka       │     │  Topic   │     │ (Spark/    │
│              │     │  Connect)    │     │          │     │  Python)   │
└──────────────┘     └──────────────┘     └──────────┘     └────────────┘

Docker Compose

YAML
services:
  postgres:
    image: postgres:15
    command: >
      -c wal_level=logical
      -c max_wal_senders=4
      -c max_replication_slots=4
    environment:
      POSTGRES_DB: etl_kitchen_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres

  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  debezium:
    image: debezium/connect:2.5
    depends_on: [kafka, postgres]
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

Создание коннектора

Bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" -d '{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "etl_kitchen_db",
    "topic.prefix": "cdc",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_orders"
  }
}'

Формат CDC-события

Когда заказ №1 в etl_kitchen_db меняет статус с processing на shipped:

JSON
{
  "before": {"order_id": 1, "status": "processing", "total_amount": 89990.00},
  "after":  {"order_id": 1, "status": "shipped", "total_amount": 89990.00},
  "source": {
    "version": "2.5.0",
    "connector": "postgresql",
    "db": "etl_kitchen_db",
    "schema": "public",
    "table": "orders",
    "txId": 12345,
    "lsn": 67890
  },
  "op": "u",
  "ts_ms": 1712150000000
}

Поле op — тип операции: c = create, u = update, d = delete, r = read (initial snapshot).


Outbox Pattern

Проблема: нужно и обновить БД, и отправить событие в Kafka. Если Kafka недоступен — данные рассинхронизируются.

Решение: пишем в outbox-таблицу в той же транзакции. Debezium читает outbox и отправляет в Kafka.

SQL
-- Одна транзакция — атомарность гарантирована
BEGIN;
UPDATE orders SET status = 'shipped' WHERE order_id = 1;
INSERT INTO outbox (aggregate_type, aggregate_id, payload)
VALUES ('order', '1', '{"status": "shipped"}');
COMMIT;
Text Only
outbox таблица → Debezium → Kafka topic

Зачем outbox, а не напрямую?

Если отправлять событие в Kafka из кода приложения — при сбое между COMMIT и отправкой событие потеряется. Outbox гарантирует: если транзакция прошла, событие будет доставлено.


Обработка CDC-событий

Python consumer

Python
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    "cdc.public.orders",
    bootstrap_servers="localhost:9092",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset="earliest",
    group_id="etl-consumer"
)

for message in consumer:
    event = message.value
    op = event["payload"]["op"]
    after = event["payload"]["after"]

    if op in ("c", "u"):  # INSERT или UPDATE
        upsert_to_dwh(after)
    elif op == "d":       # DELETE
        mark_deleted_in_dwh(event["payload"]["before"])

Итоги: сравнение подходов

Подход Задержка Нагрузка DELETE Сложность
Timestamp Минуты Средняя Нет Простой
Trigger Секунды Высокая Да (если добавить) Средний
Log-based (Debezium) Секунды Минимальная Да Сложный

Что запомнить

  • CDC превращает базу данных в поток событий — ты получаешь дельту, а не полную копию
  • Timestamp-based — самый простой, но не ловит DELETE и нагружает БД
  • Trigger-based — работает без внешних зависимостей, но создаёт двойную нагрузку на запись
  • Log-based (Debezium) — стандарт для DE: минимальная нагрузка, ловит все операции, real-time
  • wal_level=logical — обязательная настройка PostgreSQL для CDC через Debezium
  • Репликационный слот удерживает WAL — мониторь его, иначе диск переполнится
  • Outbox Pattern гарантирует атомарность «обновить БД + отправить событие»

Проверь себя


Источники