dbt: макросы, пакеты и incremental¶
Зачем это DE?¶
Базовый dbt — это SELECT. Продвинутый dbt — это макросы для DRY, пакеты для переиспользования, incremental для обработки только новых данных, snapshots для SCD Type 2 и hooks для кастомной логики.
Макросы — Jinja-функции¶
Макрос — переиспользуемая Jinja-функция. Как функция в Python, но для SQL.
Базовый макрос¶
SQL
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
({{ column_name }} / 100.0)::numeric(12,2)
{% endmacro %}
Использование:
SQL
-- models/orders.sql
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount
FROM {{ source('raw', 'orders') }}
Макрос с условной логикой¶
SQL
-- macros/generate_surrogate_key.sql
{% macro surrogate_key(fields) %}
md5(concat(
{% for field in fields %}
coalesce(cast({{ field }} as text), '')
{% if not loop.last %} || '|' || {% endif %}
{% endfor %}
))
{% endmacro %}
Макрос для аудита¶
SQL
-- macros/audit_columns.sql
{% macro audit_columns() %}
current_timestamp AS _loaded_at,
'{{ invocation_id }}' AS _dbt_run_id,
'{{ this.name }}' AS _model_name
{% endmacro %}
SQL
-- models/dim_customer.sql
SELECT
customer_id,
name,
city,
{{ audit_columns() }}
FROM {{ ref('stg_customers') }}
Пакеты (packages)¶
dbt_utils — must-have¶
YAML
# packages.yml
packages:
- package: dbt-labs/dbt_utils
version: ">=1.0.0"
- package: dbt-labs/dbt_date
version: ">=0.7.0"
Полезные макросы dbt_utils:
SQL
-- Surrogate key
{{ dbt_utils.generate_surrogate_key(['customer_id', 'order_date']) }}
-- Pivot
{{ dbt_utils.pivot('status', dbt_utils.get_column_values(ref('orders'), 'status')) }}
-- Date spine (генерация дат)
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2020-01-01' as date)",
end_date="current_date"
) }}
-- Union нескольких моделей
{{ dbt_utils.union_relations(
relations=[ref('orders_2023'), ref('orders_2024'), ref('orders_2025')]
) }}
Incremental — обработка только новых данных¶
Зачем¶
Полная перестройка таблицы с 100 млн строк — 30 минут. Incremental обрабатывает только новые/изменённые строки — 30 секунд.
Базовый incremental¶
SQL
-- models/fct_orders.sql
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
SELECT
order_id,
customer_id,
total_amount,
status,
order_date,
current_timestamp AS _loaded_at
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
-- Только новые/изменённые строки
WHERE order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
Стратегии incremental¶
| Стратегия | Как работает | Когда |
|---|---|---|
append |
INSERT новых строк | Логи, события (без обновлений) |
merge |
MERGE (upsert) по unique_key | Обновляемые данные |
delete+insert |
DELETE по ключу + INSERT | PostgreSQL (нет MERGE) |
insert_overwrite |
Перезаписать целые партиции | BigQuery, Spark |
Full refresh¶
Bash
# Иногда нужна полная перестройка
dbt run --full-refresh -s fct_orders
# Удаляет таблицу и создаёт заново
Snapshots — SCD Type 2¶
Snapshot отслеживает изменения в source и строит историю (SCD Type 2).
SQL
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{ config(
target_database='analytics',
target_schema='snapshots',
unique_key='customer_id',
strategy='check',
check_cols=['name', 'city', 'segment'] -- отслеживаемые столбцы
) }}
SELECT
customer_id,
name,
city,
segment
FROM {{ source('raw', 'customers') }}
{% endsnapshot %}
Результат:
Text Only
customer_id | name | city | dbt_valid_from | dbt_valid_to | dbt_scd_id
1 | Иван | Москва | 2025-01-01 | 2025-03-15 | abc123
1 | Иван | Казань | 2025-03-15 | NULL (текущая) | def456
Стратегии snapshot¶
| Стратегия | Как определяет изменения |
|---|---|
check |
Сравнивает значения указанных столбцов |
timestamp |
Сравнивает столбец updated_at |
Hooks — кастомная логика¶
Pre/Post hooks на модели¶
SQL
{{ config(
materialized='table',
post_hook=[
"GRANT SELECT ON {{ this }} TO analytics_role",
"ANALYZE {{ this }}"
]
) }}
Project-level hooks¶
YAML
# dbt_project.yml
on-run-start:
- "CREATE SCHEMA IF NOT EXISTS {{ target.schema }}_staging"
on-run-end:
- "DROP SCHEMA IF EXISTS {{ target.schema }}_staging CASCADE"
Exposures — документация потребителей¶
YAML
# models/exposures.yml
exposures:
- name: weekly_revenue_dashboard
type: dashboard
maturity: high
url: https://bi.company.com/dashboard/42
description: "Еженедельный дашборд выручки для CEO"
depends_on:
- ref('fct_revenue')
- ref('dim_date')
owner:
name: Руслан
email: r@example.com
Тестирование продвинутое¶
Custom generic test¶
SQL
-- tests/generic/test_accepted_range.sql
{% test accepted_range(model, column_name, min_value, max_value) %}
SELECT *
FROM {{ model }}
WHERE {{ column_name }} < {{ min_value }}
OR {{ column_name }} > {{ max_value }}
{% endtest %}
YAML
# schema.yml
models:
- name: fct_orders
columns:
- name: total_amount
tests:
- accepted_range:
min_value: 0
max_value: 1000000
Unit tests (dbt 1.8+)¶
YAML
# schema.yml
unit_tests:
- name: test_revenue_calculation
model: fct_revenue
given:
- input: ref('stg_order_items')
rows:
- {order_id: 1, quantity: 2, unit_price: 100, discount: 10}
- {order_id: 1, quantity: 1, unit_price: 50, discount: 0}
expect:
rows:
- {order_id: 1, revenue: 230} # 2*100*0.9 + 1*50
Организация проекта¶
Text Only
dbt_project/
├── models/
│ ├── staging/ # 1:1 с source, переименование, приведение типов
│ │ ├── _stg_models.yml
│ │ ├── stg_customers.sql
│ │ └── stg_orders.sql
│ ├── intermediate/ # бизнес-логика, подготовка
│ │ └── int_order_items_enriched.sql
│ └── marts/ # финальные витрины
│ ├── fct_revenue.sql
│ └── dim_customer.sql
├── snapshots/
├── macros/
├── tests/
├── seeds/ # CSV-справочники (маленькие)
└── packages.yml
Что запомнить¶
| Фича | Для чего | Пример |
|---|---|---|
| Макросы | DRY-код, переиспользование | cents_to_dollars, audit_columns |
| Пакеты | Готовые макросы сообщества | dbt_utils, dbt_date |
| Incremental | Обработка только новых данных | fct_orders с WHERE > MAX(date) |
| Snapshots | SCD Type 2 автоматически | snap_customers с check strategy |
| Hooks | Post-deploy действия | GRANT, ANALYZE, DROP staging |
| Exposures | Документация потребителей | BI-дашборды, ML-модели |