Перейти к содержанию

Data Lineage

Зачем это DE?

Витрина показывает неправильные цифры. Откуда данные? Какие трансформации прошли? Кто менял пайплайн? Data Lineage — карта происхождения данных: от источника до дашборда. Без неё отладка — слепой поиск в темноте.


Что такое Lineage

Text Only
┌──────────┐     ┌────────────┐     ┌──────────┐     ┌───────────┐
│ API      │ ──→ │ stg_orders │ ──→ │ fct_sales│ ──→ │ Dashboard │
│ (source) │     │ (staging)  │     │ (mart)   │     │ (BI)      │
└──────────┘     └────────────┘     └──────────┘     └───────────┘
     ↑                 ↑                 ↑                ↑
  Table-level      Column-level     Transformation   Consumer
   lineage          lineage            logic          impact

Уровни lineage

Уровень Что показывает Пример
Table-level Какие таблицы зависят от каких stg_orders → fct_sales
Column-level Какие столбцы из каких вычислены fct_sales.revenue = stg_orders.quantity * stg_orders.price
Pipeline-level Какие DAG/задачи обрабатывают Airflow: extract_api → transform_dbt → load_dwh

OpenLineage — открытый стандарт

OpenLineage — спецификация для описания lineage. Единый формат для Airflow, Spark, dbt.

Событие OpenLineage

JSON
{
  "eventType": "COMPLETE",
  "eventTime": "2025-01-15T10:30:00Z",
  "run": {
    "runId": "abc-123"
  },
  "job": {
    "namespace": "airflow",
    "name": "daily_orders_pipeline.transform_orders"
  },
  "inputs": [
    {
      "namespace": "postgres://source-db",
      "name": "raw.orders",
      "facets": {
        "schema": {
          "fields": [
            {"name": "order_id", "type": "INTEGER"},
            {"name": "amount", "type": "NUMERIC"}
          ]
        }
      }
    }
  ],
  "outputs": [
    {
      "namespace": "postgres://dwh",
      "name": "analytics.fct_orders",
      "facets": {
        "schema": {...},
        "columnLineage": {
          "fields": {
            "revenue": {
              "inputFields": [
                {"namespace": "postgres://source-db", "name": "raw.orders", "field": "amount"}
              ]
            }
          }
        }
      }
    }
  ]
}

Lineage из dbt

dbt автоматически строит DAG моделей — это table-level lineage:

Bash
dbt docs generate
dbt docs serve
# Открывается DAG-граф: source → staging → mart

manifest.json — источник lineage

Python
import json

with open("target/manifest.json") as f:
    manifest = json.load(f)

# Зависимости модели fct_orders
model = manifest["nodes"]["model.my_project.fct_orders"]
print(model["depends_on"]["nodes"])
# ['model.my_project.stg_orders', 'model.my_project.dim_customer']

Column-level lineage

dbt 1.7+ поддерживает column-level lineage через метаданные:

YAML
# schema.yml
models:
  - name: fct_orders
    columns:
      - name: revenue
        description: "quantity * unit_price * (1 - discount/100)"
        meta:
          lineage:
            - stg_order_items.quantity
            - stg_order_items.unit_price
            - stg_order_items.discount

Lineage из Airflow

OpenLineage + Airflow

Bash
pip install openlineage-airflow
Python
# airflow.cfg или env
OPENLINEAGE_URL=http://marquez:5000  # Marquez API
OPENLINEAGE_NAMESPACE=production

Airflow автоматически отправляет lineage-события при выполнении операторов: - PostgresOperator → input/output таблицы - PythonOperator → кастомный lineage через @task(inlets=[...], outlets=[...]) - SparkSubmitOperator → Spark lineage

Python
from airflow.lineage import AUTO

@task(inlets=[AUTO], outlets=[AUTO])
def transform_orders():
    ...

DataHub — платформа Data Discovery

DataHub — полноценная платформа для поиска, lineage и governance.

Что умеет

  • Поиск: найти таблицу, столбец, дашборд по ключевым словам
  • Lineage: визуальный граф от source до consumer
  • Ownership: кто отвечает за таблицу
  • Glossary: бизнес-термины (revenue = выручка за вычетом...)
  • Tags: классификация данных (PII, financial, public)

Интеграции

Источник Что извлекает
PostgreSQL Схемы, таблицы, столбцы, профили
dbt Модели, тесты, lineage, описания
Airflow DAG, задачи, расписание, lineage
Spark Lineage из SparkListener
S3 Бакеты, файлы, форматы
Looker/Superset Дашборды, чарты, зависимости

Запуск DataHub

Bash
python3 -m datahub docker quickstart
# UI: http://localhost:9002
# Логин: datahub / datahub

Impact Analysis

Lineage позволяет отвечать на вопрос: «Если я изменю X, что сломается?»

Text Only
Изменю stg_orders (убираю столбец discount)
   fct_orders (использует discount для revenue)
   weekly_revenue_dashboard (BI)
   📧 Алерт CEO

В dbt

Bash
# Downstream: что зависит от stg_orders?
dbt ls --select stg_orders+
# stg_orders
# fct_orders
# dim_customer_orders
# weekly_revenue

# Upstream: от чего зависит fct_orders?
dbt ls --select +fct_orders
# source:raw.orders
# source:raw.customers
# stg_orders
# stg_customers
# fct_orders

Что запомнить

Инструмент Для чего Когда
dbt DAG Table-level lineage из SQL-моделей Каждый dbt-проект
OpenLineage Универсальный стандарт lineage Airflow + Spark + dbt
DataHub Полная платформа: lineage + discovery + governance Компании с 50+ таблицами
dbt ls +/− Быстрый impact analysis в CLI Перед изменением модели

Проверь себя


Источники