Перейти к содержанию

dbt: макросы, пакеты и incremental

Зачем это DE?

Базовый dbt — это SELECT. Продвинутый dbt — это макросы для DRY, пакеты для переиспользования, incremental для обработки только новых данных, snapshots для SCD Type 2 и hooks для кастомной логики.


Макросы — Jinja-функции

Макрос — переиспользуемая Jinja-функция. Как функция в Python, но для SQL.

Базовый макрос

SQL
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    ({{ column_name }} / 100.0)::numeric(12,2)
{% endmacro %}

Использование:

SQL
-- models/orders.sql
SELECT
    order_id,
    {{ cents_to_dollars('amount_cents') }} AS amount
FROM {{ source('raw', 'orders') }}

Макрос с условной логикой

SQL
-- macros/generate_surrogate_key.sql
{% macro surrogate_key(fields) %}
    md5(concat(
        {% for field in fields %}
            coalesce(cast({{ field }} as text), '')
            {% if not loop.last %} || '|' || {% endif %}
        {% endfor %}
    ))
{% endmacro %}

Макрос для аудита

SQL
-- macros/audit_columns.sql
{% macro audit_columns() %}
    current_timestamp AS _loaded_at,
    '{{ invocation_id }}' AS _dbt_run_id,
    '{{ this.name }}' AS _model_name
{% endmacro %}
SQL
-- models/dim_customer.sql
SELECT
    customer_id,
    name,
    city,
    {{ audit_columns() }}
FROM {{ ref('stg_customers') }}

Пакеты (packages)

dbt_utils — must-have

YAML
# packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: ">=1.0.0"
  - package: dbt-labs/dbt_date
    version: ">=0.7.0"
Bash
dbt deps  # установить пакеты

Полезные макросы dbt_utils:

SQL
-- Surrogate key
{{ dbt_utils.generate_surrogate_key(['customer_id', 'order_date']) }}

-- Pivot
{{ dbt_utils.pivot('status', dbt_utils.get_column_values(ref('orders'), 'status')) }}

-- Date spine (генерация дат)
{{ dbt_utils.date_spine(
    datepart="day",
    start_date="cast('2020-01-01' as date)",
    end_date="current_date"
) }}

-- Union нескольких моделей
{{ dbt_utils.union_relations(
    relations=[ref('orders_2023'), ref('orders_2024'), ref('orders_2025')]
) }}

Incremental — обработка только новых данных

Зачем

Полная перестройка таблицы с 100 млн строк — 30 минут. Incremental обрабатывает только новые/изменённые строки — 30 секунд.

Базовый incremental

SQL
-- models/fct_orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

SELECT
    order_id,
    customer_id,
    total_amount,
    status,
    order_date,
    current_timestamp AS _loaded_at
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
    -- Только новые/изменённые строки
    WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

Стратегии incremental

Стратегия Как работает Когда
append INSERT новых строк Логи, события (без обновлений)
merge MERGE (upsert) по unique_key Обновляемые данные
delete+insert DELETE по ключу + INSERT PostgreSQL (нет MERGE)
insert_overwrite Перезаписать целые партиции BigQuery, Spark

Full refresh

Bash
# Иногда нужна полная перестройка
dbt run --full-refresh -s fct_orders
# Удаляет таблицу и создаёт заново

Snapshots — SCD Type 2

Snapshot отслеживает изменения в source и строит историю (SCD Type 2).

SQL
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}

{{ config(
    target_database='analytics',
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='check',
    check_cols=['name', 'city', 'segment']  -- отслеживаемые столбцы
) }}

SELECT
    customer_id,
    name,
    city,
    segment
FROM {{ source('raw', 'customers') }}

{% endsnapshot %}
Bash
dbt snapshot

Результат:

Text Only
customer_id | name  | city    | dbt_valid_from | dbt_valid_to       | dbt_scd_id
1           | Иван  | Москва  | 2025-01-01     | 2025-03-15         | abc123
1           | Иван  | Казань  | 2025-03-15     | NULL (текущая)     | def456

Стратегии snapshot

Стратегия Как определяет изменения
check Сравнивает значения указанных столбцов
timestamp Сравнивает столбец updated_at

Hooks — кастомная логика

Pre/Post hooks на модели

SQL
{{ config(
    materialized='table',
    post_hook=[
        "GRANT SELECT ON {{ this }} TO analytics_role",
        "ANALYZE {{ this }}"
    ]
) }}

Project-level hooks

YAML
# dbt_project.yml
on-run-start:
  - "CREATE SCHEMA IF NOT EXISTS {{ target.schema }}_staging"
on-run-end:
  - "DROP SCHEMA IF EXISTS {{ target.schema }}_staging CASCADE"

Exposures — документация потребителей

YAML
# models/exposures.yml
exposures:
  - name: weekly_revenue_dashboard
    type: dashboard
    maturity: high
    url: https://bi.company.com/dashboard/42
    description: "Еженедельный дашборд выручки для CEO"
    depends_on:
      - ref('fct_revenue')
      - ref('dim_date')
    owner:
      name: Руслан
      email: r@example.com
Bash
dbt docs generate  # exposure появится в DAG

Тестирование продвинутое

Custom generic test

SQL
-- tests/generic/test_accepted_range.sql
{% test accepted_range(model, column_name, min_value, max_value) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} < {{ min_value }}
   OR {{ column_name }} > {{ max_value }}
{% endtest %}
YAML
# schema.yml
models:
  - name: fct_orders
    columns:
      - name: total_amount
        tests:
          - accepted_range:
              min_value: 0
              max_value: 1000000

Unit tests (dbt 1.8+)

YAML
# schema.yml
unit_tests:
  - name: test_revenue_calculation
    model: fct_revenue
    given:
      - input: ref('stg_order_items')
        rows:
          - {order_id: 1, quantity: 2, unit_price: 100, discount: 10}
          - {order_id: 1, quantity: 1, unit_price: 50, discount: 0}
    expect:
      rows:
        - {order_id: 1, revenue: 230}  # 2*100*0.9 + 1*50

Организация проекта

Text Only
dbt_project/
├── models/
│   ├── staging/          # 1:1 с source, переименование, приведение типов
│   │   ├── _stg_models.yml
│   │   ├── stg_customers.sql
│   │   └── stg_orders.sql
│   ├── intermediate/     # бизнес-логика, подготовка
│   │   └── int_order_items_enriched.sql
│   └── marts/            # финальные витрины
│       ├── fct_revenue.sql
│       └── dim_customer.sql
├── snapshots/
├── macros/
├── tests/
├── seeds/                # CSV-справочники (маленькие)
└── packages.yml

Что запомнить

Фича Для чего Пример
Макросы DRY-код, переиспользование cents_to_dollars, audit_columns
Пакеты Готовые макросы сообщества dbt_utils, dbt_date
Incremental Обработка только новых данных fct_orders с WHERE > MAX(date)
Snapshots SCD Type 2 автоматически snap_customers с check strategy
Hooks Post-deploy действия GRANT, ANALYZE, DROP staging
Exposures Документация потребителей BI-дашборды, ML-модели

Проверь себя


Источники