Data Lineage¶
Зачем это DE?¶
Витрина показывает неправильные цифры. Откуда данные? Какие трансформации прошли? Кто менял пайплайн? Data Lineage — карта происхождения данных: от источника до дашборда. Без неё отладка — слепой поиск в темноте.
Что такое Lineage¶
┌──────────┐ ┌────────────┐ ┌──────────┐ ┌───────────┐
│ API │ ──→ │ stg_orders │ ──→ │ fct_sales│ ──→ │ Dashboard │
│ (source) │ │ (staging) │ │ (mart) │ │ (BI) │
└──────────┘ └────────────┘ └──────────┘ └───────────┘
↑ ↑ ↑ ↑
Table-level Column-level Transformation Consumer
lineage lineage logic impact
Уровни lineage¶
| Уровень | Что показывает | Пример |
|---|---|---|
| Table-level | Какие таблицы зависят от каких | stg_orders → fct_sales |
| Column-level | Какие столбцы из каких вычислены | fct_sales.revenue = stg_orders.quantity * stg_orders.price |
| Pipeline-level | Какие DAG/задачи обрабатывают | Airflow: extract_api → transform_dbt → load_dwh |
OpenLineage — открытый стандарт¶
OpenLineage — спецификация для описания lineage. Единый формат для Airflow, Spark, dbt.
Событие OpenLineage¶
{
"eventType": "COMPLETE",
"eventTime": "2025-01-15T10:30:00Z",
"run": {
"runId": "abc-123"
},
"job": {
"namespace": "airflow",
"name": "daily_orders_pipeline.transform_orders"
},
"inputs": [
{
"namespace": "postgres://source-db",
"name": "raw.orders",
"facets": {
"schema": {
"fields": [
{"name": "order_id", "type": "INTEGER"},
{"name": "amount", "type": "NUMERIC"}
]
}
}
}
],
"outputs": [
{
"namespace": "postgres://dwh",
"name": "analytics.fct_orders",
"facets": {
"schema": {...},
"columnLineage": {
"fields": {
"revenue": {
"inputFields": [
{"namespace": "postgres://source-db", "name": "raw.orders", "field": "amount"}
]
}
}
}
}
}
]
}
Lineage из dbt¶
dbt автоматически строит DAG моделей — это table-level lineage:
manifest.json — источник lineage¶
import json
with open("target/manifest.json") as f:
manifest = json.load(f)
# Зависимости модели fct_orders
model = manifest["nodes"]["model.my_project.fct_orders"]
print(model["depends_on"]["nodes"])
# ['model.my_project.stg_orders', 'model.my_project.dim_customer']
Column-level lineage¶
dbt 1.7+ поддерживает column-level lineage через метаданные:
# schema.yml
models:
- name: fct_orders
columns:
- name: revenue
description: "quantity * unit_price * (1 - discount/100)"
meta:
lineage:
- stg_order_items.quantity
- stg_order_items.unit_price
- stg_order_items.discount
Lineage из Airflow¶
OpenLineage + Airflow¶
# airflow.cfg или env
OPENLINEAGE_URL=http://marquez:5000 # Marquez API
OPENLINEAGE_NAMESPACE=production
Airflow автоматически отправляет lineage-события при выполнении операторов:
- PostgresOperator → input/output таблицы
- PythonOperator → кастомный lineage через @task(inlets=[...], outlets=[...])
- SparkSubmitOperator → Spark lineage
from airflow.lineage import AUTO
@task(inlets=[AUTO], outlets=[AUTO])
def transform_orders():
...
DataHub — платформа Data Discovery¶
DataHub — полноценная платформа для поиска, lineage и governance.
Что умеет¶
- Поиск: найти таблицу, столбец, дашборд по ключевым словам
- Lineage: визуальный граф от source до consumer
- Ownership: кто отвечает за таблицу
- Glossary: бизнес-термины (revenue = выручка за вычетом...)
- Tags: классификация данных (PII, financial, public)
Интеграции¶
| Источник | Что извлекает |
|---|---|
| PostgreSQL | Схемы, таблицы, столбцы, профили |
| dbt | Модели, тесты, lineage, описания |
| Airflow | DAG, задачи, расписание, lineage |
| Spark | Lineage из SparkListener |
| S3 | Бакеты, файлы, форматы |
| Looker/Superset | Дашборды, чарты, зависимости |
Запуск DataHub¶
Impact Analysis¶
Lineage позволяет отвечать на вопрос: «Если я изменю X, что сломается?»
Изменю stg_orders (убираю столбец discount)
↓
fct_orders (использует discount для revenue)
↓
weekly_revenue_dashboard (BI)
↓
📧 Алерт CEO
В dbt¶
# Downstream: что зависит от stg_orders?
dbt ls --select stg_orders+
# stg_orders
# fct_orders
# dim_customer_orders
# weekly_revenue
# Upstream: от чего зависит fct_orders?
dbt ls --select +fct_orders
# source:raw.orders
# source:raw.customers
# stg_orders
# stg_customers
# fct_orders
Что запомнить¶
| Инструмент | Для чего | Когда |
|---|---|---|
| dbt DAG | Table-level lineage из SQL-моделей | Каждый dbt-проект |
| OpenLineage | Универсальный стандарт lineage | Airflow + Spark + dbt |
| DataHub | Полная платформа: lineage + discovery + governance | Компании с 50+ таблицами |
| dbt ls +/− | Быстрый impact analysis в CLI | Перед изменением модели |