Перейти к содержанию

Мониторинг данных

Зачем это DE?

Пайплайн отработал без ошибок, но данные — мусор: таблица не обновилась, объём упал в 10 раз, NULL'ы заполонили ключевой столбец. Мониторинг данных ловит проблемы, которые не видит мониторинг инфраструктуры.


Что мониторить

5 измерений качества данных

Измерение Что проверяем Пример проблемы
Freshness Когда последний раз обновились Таблица не обновлялась 24 часа
Volume Сколько строк за период Вчера 100K строк, сегодня 100
Schema Структура таблицы Удалили столбец, изменили тип
Distribution Распределение значений 90% NULL в customer_id
Uniqueness Уникальность ключей Дубли в primary key

Freshness — свежесть данных

SQL-проверка

SQL
-- Когда последний раз обновлялась таблица?
SELECT 
    'orders' AS table_name,
    MAX(updated_at) AS last_update,
    NOW() - MAX(updated_at) AS age,
    CASE 
        WHEN NOW() - MAX(updated_at) > interval '12 hours' THEN 'WARNING'
        WHEN NOW() - MAX(updated_at) > interval '24 hours' THEN 'CRITICAL'
        ELSE 'OK'
    END AS status
FROM orders;

Python-мониторинг

Python
from datetime import datetime, timedelta
import structlog

logger = structlog.get_logger()

def check_freshness(conn, table: str, column: str, max_age_hours: int):
    result = conn.execute(f"SELECT MAX({column}) FROM {table}").fetchone()
    last_update = result[0]
    age = datetime.now() - last_update

    if age > timedelta(hours=max_age_hours):
        logger.error("stale_data", table=table, age_hours=age.total_seconds()/3600)
        raise ValueError(f"{table} stale: last update {last_update}")

    logger.info("freshness_ok", table=table, age_hours=age.total_seconds()/3600)

Volume — аномалии объёма

День-к-дню

SQL
WITH daily_counts AS (
    SELECT 
        order_date,
        COUNT(*) AS row_count,
        LAG(COUNT(*)) OVER (ORDER BY order_date) AS prev_count
    FROM orders
    GROUP BY order_date
    ORDER BY order_date DESC
    LIMIT 7
)
SELECT *,
    CASE 
        WHEN row_count < prev_count * 0.5 THEN 'ALERT: drop > 50%'
        WHEN row_count > prev_count * 2.0 THEN 'ALERT: spike > 200%'
        ELSE 'OK'
    END AS status
FROM daily_counts;

Скользящее среднее

SQL
SELECT 
    order_date,
    COUNT(*) AS today_count,
    AVG(COUNT(*)) OVER (
        ORDER BY order_date 
        ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
    ) AS avg_7d,
    CASE 
        WHEN COUNT(*) < 0.3 * AVG(COUNT(*)) OVER (
            ORDER BY order_date ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
        ) THEN 'CRITICAL'
        ELSE 'OK'
    END AS status
FROM orders
GROUP BY order_date
ORDER BY order_date DESC;

Schema — отслеживание изменений

SQL
-- Текущая схема таблицы
SELECT 
    column_name, 
    data_type, 
    is_nullable,
    column_default
FROM information_schema.columns
WHERE table_name = 'orders'
ORDER BY ordinal_position;

-- Обнаружение изменений: сравнить с сохранённым снимком
-- (хранить в таблице schema_snapshots)

Python: автопроверка при запуске пайплайна

Python
EXPECTED_SCHEMA = {
    "orders": {
        "order_id": "integer",
        "customer_id": "integer",
        "total_amount": "numeric",
        "status": "character varying",
        "order_date": "date"
    }
}

def check_schema(conn, table: str):
    result = conn.execute(f"""
        SELECT column_name, data_type 
        FROM information_schema.columns 
        WHERE table_name = '{table}'
    """).fetchall()

    actual = {row[0]: row[1] for row in result}
    expected = EXPECTED_SCHEMA[table]

    missing = set(expected) - set(actual)
    extra = set(actual) - set(expected)
    type_mismatch = {
        col for col in expected 
        if col in actual and actual[col] != expected[col]
    }

    if missing or type_mismatch:
        raise ValueError(f"Schema drift! Missing: {missing}, Type mismatch: {type_mismatch}")

Distribution — аномалии значений

SQL
-- NULL rate по столбцам
SELECT 
    COUNT(*) AS total,
    COUNT(*) FILTER (WHERE customer_id IS NULL) AS null_customer_id,
    ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / COUNT(*), 2) AS null_pct,
    COUNT(*) FILTER (WHERE total_amount IS NULL) AS null_amount,
    COUNT(DISTINCT status) AS unique_statuses
FROM orders
WHERE order_date = CURRENT_DATE - 1;

Статистические проверки

SQL
-- Базовая статистика для выявления аномалий
SELECT 
    MIN(total_amount) AS min_amount,
    MAX(total_amount) AS max_amount,
    AVG(total_amount) AS avg_amount,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_amount) AS median,
    STDDEV(total_amount) AS stddev,
    -- Outliers: > 3 sigma
    COUNT(*) FILTER (
        WHERE total_amount > AVG(total_amount) OVER () + 3 * STDDEV(total_amount) OVER ()
    ) AS outliers
FROM orders
WHERE order_date = CURRENT_DATE - 1;

SLA для данных

Определение SLA

YAML
# data_sla.yml
tables:
  - name: orders
    freshness_sla: 6h          # обновление каждые 6 часов
    volume_min: 1000            # минимум 1000 строк в день
    volume_max: 100000          # максимум 100K
    null_threshold:
      customer_id: 0%           # не допускаем NULL
      total_amount: 1%          # до 1% NULL OK
    unique_keys: [order_id]     # без дублей

  - name: events
    freshness_sla: 1h
    volume_min: 10000
    null_threshold:
      customer_id: 5%
      event_type: 0%

Дашборд мониторинга

Вместо сложного UI — простая таблица-отчёт:

SQL
SELECT 
    'orders' AS table_name,
    MAX(order_date) AS last_date,
    COUNT(*) FILTER (WHERE order_date = CURRENT_DATE - 1) AS yesterday_rows,
    ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0), 2) AS null_pct
FROM orders
UNION ALL
SELECT 
    'events',
    MAX(event_date::date),
    COUNT(*) FILTER (WHERE event_date::date = CURRENT_DATE - 1),
    ROUND(100.0 * COUNT(*) FILTER (WHERE customer_id IS NULL) / NULLIF(COUNT(*), 0), 2)
FROM events;

Elementary — мониторинг для dbt

Elementary — dbt-пакет для автоматического мониторинга:

YAML
# packages.yml
packages:
  - package: elementary-data/elementary
    version: ">=0.13.0"
YAML
# schema.yml
models:
  - name: fct_orders
    config:
      elementary:
        timestamp_column: order_date
    columns:
      - name: total_amount
        tests:
          - elementary.column_anomalies:
              column_anomalies:
                - zero_count
                - null_count
                - min
                - max
Bash
dbt test  # Elementary тесты запускаются как обычные dbt-тесты
edr report  # генерирует HTML-отчёт

Алерты

В Airflow

Python
from airflow.operators.python import PythonOperator

def data_quality_check(**context):
    conn = get_connection()
    check_freshness(conn, "orders", "updated_at", max_age_hours=12)
    check_volume(conn, "orders", min_rows=1000)
    check_nulls(conn, "orders", "customer_id", max_null_pct=0)

quality_task = PythonOperator(
    task_id="data_quality_check",
    python_callable=data_quality_check,
    on_failure_callback=send_slack_alert  # алерт при провале
)

Что запомнить

Проверка Что ловит Инструмент
Freshness Данные устарели SQL MAX(date), dbt source freshness
Volume Аномальный объём SQL COUNT + LAG, Elementary
Schema Изменилась структура information_schema, dbt schema tests
Distribution Аномальные значения SQL статистика, Elementary column_anomalies
Uniqueness Дубли в PK dbt unique test, SQL GROUP BY HAVING

Проверь себя


Источники