CDC: Change Data Capture¶
Зачем это DE?¶
Полная выгрузка таблицы с 100 млн строк каждую ночь — медленно и расточительно. CDC захватывает только изменения (INSERT, UPDATE, DELETE) и передаёт их в реальном времени. Это основа modern data stack: PostgreSQL → Debezium → Kafka → DWH.
Подходы к CDC¶
| Подход | Как работает | Задержка | Нагрузка на source |
|---|---|---|---|
| Timestamp-based | WHERE updated_at > last_run | Минуты | Запрос к БД |
| Trigger-based | Триггер пишет в audit-таблицу | Секунды | Триггер на каждой операции |
| Log-based | Читает WAL/binlog БД | Секунды | Минимальная (читает лог) |
Timestamp-based (простой)¶
-- Airflow: каждый час забираем свежие заказы из etl_kitchen_db
SELECT * FROM orders
WHERE order_date > '{{ prev_execution_date }}'
AND order_date <= '{{ execution_date }}';
Плюсы: просто, работает везде. Минусы: не ловит DELETE, нужен столбец с датой обновления, нагрузка на БД при больших таблицах.
Trigger-based CDC¶
Триггеры автоматически записывают изменения в отдельную таблицу аудита при каждой DML-операции. Реализуем на таблице orders из etl_kitchen_db.
SQL-реализация¶
-- Таблица для хранения истории изменений
CREATE TABLE orders_audit_log (
audit_id SERIAL PRIMARY KEY,
order_id INT,
old_status TEXT,
new_status TEXT,
changed_at TIMESTAMP DEFAULT now()
);
-- Функция: записываем изменение статуса
CREATE OR REPLACE FUNCTION track_order_status()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.status IS DISTINCT FROM NEW.status THEN
INSERT INTO orders_audit_log(order_id, old_status, new_status)
VALUES (OLD.order_id, OLD.status, NEW.status);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Привязываем триггер к таблице orders
CREATE TRIGGER trg_order_status_cdc
AFTER UPDATE ON orders
FOR EACH ROW EXECUTE FUNCTION track_order_status();
Пример работы¶
-- Обновляем статус заказа
UPDATE orders SET status = 'delivered' WHERE order_id = 28;
-- Проверяем лог
SELECT * FROM orders_audit_log;
| audit_id | order_id | old_status | new_status | changed_at |
|---|---|---|---|---|
| 1 | 28 | shipped | delivered | 2026-04-03 12:00:00 |
Плюсы: не нужны Kafka и Debezium, ловит DELETE (если добавить AFTER DELETE), работает в той же транзакции.
Минусы: высокая нагрузка (каждая операция = две записи), нужно менять схему БД и поддерживать триггеры на каждой таблице.
Как работает WAL и логическое декодирование¶
Чтобы понять log-based CDC, нужно разобраться, как PostgreSQL гарантирует сохранность данных.
WAL: сначала пишем в лог¶
Изменения никогда не попадают сразу в файлы таблиц и индексов. Сначала описание изменения записывается в WAL (Write-Ahead Log) — последовательный поток бинарных данных на диске. Если сервер упадёт — база восстановится, «проиграв» лог заново (операция REDO).
Уровни wal_level¶
| Уровень | Что в логе | Для чего |
|---|---|---|
minimal |
Только данные для восстановления после сбоя | Standalone без репликации |
replica |
+ данные для физической репликации (блоки) | Standby-серверы |
logical |
+ метаданные для декодирования строк (INSERT/UPDATE/DELETE) | CDC, Debezium |
Для CDC обязателен уровень logical — он позволяет «раскодировать» бинарный WAL обратно в логические операции.
Репликационный слот¶
Слот — это «умная закладка» на стороне сервера. PostgreSQL не удалит WAL-файлы, пока consumer (Debezium) не подтвердит, что прочитал их.
Риск: растущий WAL
Если consumer отвалится или начнёт отставать, WAL-файлы будут копиться бесконечно. Это может привести к остановке БД из-за нехватки места на диске. Мониторь pg_replication_slots и настраивай max_slot_wal_keep_size.
Плагин pgoutput¶
Сам WAL — бинарная «каша». Плагин логического декодирования pgoutput превращает её в структурированный поток данных. Он встроен в ядро PostgreSQL (с версии 10), и Debezium рекомендует использовать именно его.
Настройка и мониторинг¶
-- Включаем логическое декодирование (рестарт PostgreSQL обязателен)
ALTER SYSTEM SET wal_level = 'logical';
-- После рестарта и запуска Debezium — проверяем слоты
SELECT slot_name, plugin, slot_type, active, restart_lsn
FROM pg_replication_slots;
| slot_name | plugin | slot_type | active | restart_lsn |
|---|---|---|---|---|
| debezium_orders | pgoutput | logical | t | 0/1A3B4C0 |
Log-based CDC: Debezium¶
Debezium читает WAL PostgreSQL — поток изменений, который БД и так пишет для репликации.
Архитектура¶
┌──────────────┐ ┌──────────────┐ ┌──────────┐ ┌────────────┐
│ PostgreSQL │ ──→ │ Debezium │ ──→ │ Kafka │ ──→ │ Consumer │
│ (WAL) │ │ (Kafka │ │ Topic │ │ (Spark/ │
│ │ │ Connect) │ │ │ │ Python) │
└──────────────┘ └──────────────┘ └──────────┘ └────────────┘
Docker Compose¶
services:
postgres:
image: postgres:15
command: >
-c wal_level=logical
-c max_wal_senders=4
-c max_replication_slots=4
environment:
POSTGRES_DB: etl_kitchen_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
debezium:
image: debezium/connect:2.5
depends_on: [kafka, postgres]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
Создание коннектора¶
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" -d '{
"name": "orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "etl_kitchen_db",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium_orders"
}
}'
Формат CDC-события¶
Когда заказ №1 в etl_kitchen_db меняет статус с processing на shipped:
{
"before": {"order_id": 1, "status": "processing", "total_amount": 89990.00},
"after": {"order_id": 1, "status": "shipped", "total_amount": 89990.00},
"source": {
"version": "2.5.0",
"connector": "postgresql",
"db": "etl_kitchen_db",
"schema": "public",
"table": "orders",
"txId": 12345,
"lsn": 67890
},
"op": "u",
"ts_ms": 1712150000000
}
Поле op — тип операции: c = create, u = update, d = delete, r = read (initial snapshot).
Outbox Pattern¶
Проблема: нужно и обновить БД, и отправить событие в Kafka. Если Kafka недоступен — данные рассинхронизируются.
Решение: пишем в outbox-таблицу в той же транзакции. Debezium читает outbox и отправляет в Kafka.
-- Одна транзакция — атомарность гарантирована
BEGIN;
UPDATE orders SET status = 'shipped' WHERE order_id = 1;
INSERT INTO outbox (aggregate_type, aggregate_id, payload)
VALUES ('order', '1', '{"status": "shipped"}');
COMMIT;
Зачем outbox, а не напрямую?
Если отправлять событие в Kafka из кода приложения — при сбое между COMMIT и отправкой событие потеряется. Outbox гарантирует: если транзакция прошла, событие будет доставлено.
Обработка CDC-событий¶
Python consumer¶
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
"cdc.public.orders",
bootstrap_servers="localhost:9092",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="earliest",
group_id="etl-consumer"
)
for message in consumer:
event = message.value
op = event["payload"]["op"]
after = event["payload"]["after"]
if op in ("c", "u"): # INSERT или UPDATE
upsert_to_dwh(after)
elif op == "d": # DELETE
mark_deleted_in_dwh(event["payload"]["before"])
Итоги: сравнение подходов¶
| Подход | Задержка | Нагрузка | DELETE | Сложность |
|---|---|---|---|---|
| Timestamp | Минуты | Средняя | Нет | Простой |
| Trigger | Секунды | Высокая | Да (если добавить) | Средний |
| Log-based (Debezium) | Секунды | Минимальная | Да | Сложный |
Что запомнить¶
- CDC превращает базу данных в поток событий — ты получаешь дельту, а не полную копию
- Timestamp-based — самый простой, но не ловит DELETE и нагружает БД
- Trigger-based — работает без внешних зависимостей, но создаёт двойную нагрузку на запись
- Log-based (Debezium) — стандарт для DE: минимальная нагрузка, ловит все операции, real-time
wal_level=logical— обязательная настройка PostgreSQL для CDC через Debezium- Репликационный слот удерживает WAL — мониторь его, иначе диск переполнится
- Outbox Pattern гарантирует атомарность «обновить БД + отправить событие»
Проверь себя¶
Источники¶
- PostgreSQL: Write-Ahead Logging — механизм WAL и durability
- PostgreSQL: Logical Replication — логическое декодирование и pgoutput
- Debezium Documentation — коннекторы, архитектура, конфигурация
- Transactional Outbox Pattern — паттерн для надёжной доставки событий