Мониторинг данных¶
Зачем это DE?¶
Пайплайн отработал без ошибок, но данные — мусор: таблица не обновилась, объём упал в 10 раз, NULL'ы заполонили ключевой столбец. Мониторинг данных ловит проблемы, которые не видит мониторинг инфраструктуры.
Что мониторить¶
5 измерений качества данных¶
| Измерение | Что проверяем | Пример проблемы |
|---|---|---|
| Freshness | Когда последний раз обновились | Таблица не обновлялась 24 часа |
| Volume | Сколько строк за период | Вчера 100K строк, сегодня 100 |
| Schema | Структура таблицы | Удалили столбец, изменили тип |
| Distribution | Распределение значений | 90% NULL в customer_id |
| Uniqueness | Уникальность ключей | Дубли в primary key |
Freshness — свежесть данных¶
SQL-проверка¶
SQL
-- Когда последний раз обновлялась таблица?
SELECT
'orders' AS table_name,
MAX(updated_at) AS last_update,
NOW() - MAX(updated_at) AS age,
CASE
WHEN NOW() - MAX(updated_at) > interval '12 hours' THEN 'WARNING'
WHEN NOW() - MAX(updated_at) > interval '24 hours' THEN 'CRITICAL'
ELSE 'OK'
END AS status
FROM orders;
Python-мониторинг¶
Python
from datetime import datetime, timedelta
import structlog
logger = structlog.get_logger()
def check_freshness(conn, table: str, column: str, max_age_hours: int):
result = conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone()
last_update = result[0]
age = datetime.now() - last_update
if age > timedelta(hours=max_age_hours):
logger.error("stale_data", table=table, age_hours=age.total_seconds()/3600)
raise ValueError(f"{table} stale: last update {last_update}")
logger.info("freshness_ok", table=table, age_hours=age.total_seconds()/3600)
Volume — аномалии объёма¶
День-к-дню¶
SQL
WITH daily_counts AS (
SELECT
order_date,
COUNT(*) AS row_count,
LAG(COUNT(*)) OVER (ORDER BY order_date) AS prev_count
FROM orders
GROUP BY order_date
ORDER BY order_date DESC
LIMIT 7
)
SELECT *,
CASE
WHEN row_count < prev_count * 0.5 THEN 'ALERT: drop > 50%'
WHEN row_count > prev_count * 2.0 THEN 'ALERT: spike > 200%'
ELSE 'OK'
END AS status
FROM daily_counts;
Скользящее среднее¶
SQL
SELECT
order_date,
COUNT(*) AS today_count,
AVG(COUNT(*)) OVER (
ORDER BY order_date
ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
) AS avg_7d,
CASE
WHEN COUNT(*) < 0.3 * AVG(COUNT(*)) OVER (
ORDER BY order_date ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
) THEN 'CRITICAL'
ELSE 'OK'
END AS status
FROM orders
GROUP BY order_date
ORDER BY order_date DESC;
Schema — отслеживание изменений¶
SQL
-- Текущая схема таблицы
SELECT
column_name,
data_type,
is_nullable,
column_default
FROM information_schema.columns
WHERE table_name = 'orders'
ORDER BY ordinal_position;
-- Обнаружение изменений: сравнить с сохранённым снимком
-- (хранить в таблице schema_snapshots)
Python: автопроверка при запуске пайплайна¶
Python
EXPECTED_SCHEMA = {
"orders": {
"order_id": "integer",
"customer_id": "integer",
"total_amount": "numeric",
"status": "character varying",
"order_date": "date"
}
}
def check_schema(conn, table: str):
result = conn.execute(f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table}'
""").fetchall()
actual = {row[0]: row[1] for row in result}
expected = EXPECTED_SCHEMA[table]
missing = set(expected) - set(actual)
extra = set(actual) - set(expected)
type_mismatch = {
col for col in expected
if col in actual and actual[col] != expected[col]
}
if missing or type_mismatch:
raise ValueError(f"Schema drift! Missing: {missing}, Type mismatch: {type_mismatch}")
Distribution — аномалии значений¶
SQL
-- NULL rate по столбцам
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE customer_id IS NULL) AS null_customer_id,
ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / COUNT(*), 2) AS null_pct,
COUNT(*) FILTER (WHERE total_amount IS NULL) AS null_amount,
COUNT(DISTINCT status) AS unique_statuses
FROM orders
WHERE order_date = CURRENT_DATE - 1;
Статистические проверки¶
SQL
-- Базовая статистика для выявления аномалий
SELECT
MIN(total_amount) AS min_amount,
MAX(total_amount) AS max_amount,
AVG(total_amount) AS avg_amount,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_amount) AS median,
STDDEV(total_amount) AS stddev,
-- Outliers: > 3 sigma
COUNT(*) FILTER (
WHERE total_amount > AVG(total_amount) OVER () + 3 * STDDEV(total_amount) OVER ()
) AS outliers
FROM orders
WHERE order_date = CURRENT_DATE - 1;
SLA для данных¶
Определение SLA¶
YAML
# data_sla.yml
tables:
- name: orders
freshness_sla: 6h # обновление каждые 6 часов
volume_min: 1000 # минимум 1000 строк в день
volume_max: 100000 # максимум 100K
null_threshold:
customer_id: 0% # не допускаем NULL
total_amount: 1% # до 1% NULL OK
unique_keys: [order_id] # без дублей
- name: events
freshness_sla: 1h
volume_min: 10000
null_threshold:
customer_id: 5%
event_type: 0%
Дашборд мониторинга¶
Вместо сложного UI — простая таблица-отчёт:
SQL
SELECT
'orders' AS table_name,
MAX(order_date) AS last_date,
COUNT(*) FILTER (WHERE order_date = CURRENT_DATE - 1) AS yesterday_rows,
ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0), 2) AS null_pct
FROM orders
UNION ALL
SELECT
'events',
MAX(event_date::date),
COUNT(*) FILTER (WHERE event_date::date = CURRENT_DATE - 1),
ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0), 2)
FROM events;
Elementary — мониторинг для dbt¶
Elementary — dbt-пакет для автоматического мониторинга:
YAML
# schema.yml
models:
- name: fct_orders
config:
elementary:
timestamp_column: order_date
columns:
- name: total_amount
tests:
- elementary.column_anomalies:
column_anomalies:
- zero_count
- null_count
- min
- max
Bash
dbt test # Elementary тесты запускаются как обычные dbt-тесты
edr report # генерирует HTML-отчёт
Алерты¶
В Airflow¶
Python
from airflow.operators.python import PythonOperator
def data_quality_check(**context):
conn = get_connection()
check_freshness(conn, "orders", "updated_at", max_age_hours=12)
check_volume(conn, "orders", min_rows=1000)
check_nulls(conn, "orders", "customer_id", max_null_pct=0)
quality_task = PythonOperator(
task_id="data_quality_check",
python_callable=data_quality_check,
on_failure_callback=send_slack_alert # алерт при провале
)
Что запомнить¶
| Проверка | Что ловит | Инструмент |
|---|---|---|
| Freshness | Данные устарели | SQL MAX(date), dbt source freshness |
| Volume | Аномальный объём | SQL COUNT + LAG, Elementary |
| Schema | Изменилась структура | information_schema, dbt schema tests |
| Distribution | Аномальные значения | SQL статистика, Elementary column_anomalies |
| Uniqueness | Дубли в PK | dbt unique test, SQL GROUP BY HAVING |