Тестирование и CI/CD
Тестирование и CI/CD для Airflow¶
DAG без тестов — бомба замедленного действия. Ошибка в DAG обнаруживается в 3 часа ночи, когда пайплайн падает и утренний отчёт пуст. Тестирование ловит проблемы до деплоя.
Уровни тестирования DAG¶
| Уровень | Что проверяет | Скорость | Пример |
|---|---|---|---|
| DAG Integrity | DAG валиден, парсится без ошибок | Секунды | Нет циклов, нет import errors |
| Unit | Логика отдельных функций | Секунды | transform() возвращает правильный результат |
| Integration | Взаимодействие с внешними системами | Минуты | PostgresHook реально пишет в тестовую БД |
| DAG.test() | Полный прогон DAG локально | Минуты | Все задачи выполняются последовательно |
DAG Integrity Tests¶
Минимальный тест — убедиться, что все DAG-файлы парсятся без ошибок.
Python
# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dagbag():
return DagBag(dag_folder="dags/", include_examples=False)
def test_no_import_errors(dagbag):
"""Все DAG-файлы парсятся без ошибок."""
assert dagbag.import_errors == {}, f"Import errors: {dagbag.import_errors}"
def test_dag_count(dagbag):
"""Минимальное количество DAG-ов."""
assert len(dagbag.dags) >= 3, f"Expected >= 3 DAGs, got {len(dagbag.dags)}"
@pytest.mark.parametrize("dag_id", [
"etl_orders_daily",
"etl_customers_sync",
"dbt_run_daily",
])
def test_dag_exists(dagbag, dag_id):
"""Конкретные DAG-ы существуют."""
assert dag_id in dagbag.dags, f"DAG '{dag_id}' not found"
def test_no_cycles(dagbag):
"""Нет циклических зависимостей."""
for dag_id, dag in dagbag.dags.items():
# Airflow не позволяет создать DAG с циклом, но проверим
assert not dag.test_cycle(), f"Cycle in {dag_id}"
Проверка структуры DAG¶
Python
def test_etl_orders_structure(dagbag):
dag = dagbag.dags["etl_orders_daily"]
# Правильное количество задач
assert len(dag.tasks) == 5
# Правильные зависимости
extract = dag.get_task("extract")
transform = dag.get_task("transform")
assert transform in extract.downstream_list
# Schedule
assert dag.schedule_interval == "@daily"
# Теги
assert "etl" in dag.tags
# Нет запрещённых операторов
for task in dag.tasks:
assert type(task).__name__ != "SubDagOperator", "SubDag deprecated!"
Unit-тесты логики¶
Тестируй функции, а не операторы. Вынеси логику из DAG-файла.
Python
# dags/etl/transform.py
def calculate_revenue(orders: list[dict]) -> list[dict]:
result = []
for order in orders:
revenue = order["quantity"] * order["price"] * (1 - order.get("discount", 0) / 100)
result.append({**order, "revenue": round(revenue, 2)})
return result
Python
# tests/test_transform.py
from dags.etl.transform import calculate_revenue
def test_revenue_no_discount():
orders = [{"quantity": 2, "price": 100.0}]
result = calculate_revenue(orders)
assert result[0]["revenue"] == 200.0
def test_revenue_with_discount():
orders = [{"quantity": 1, "price": 1000.0, "discount": 10}]
result = calculate_revenue(orders)
assert result[0]["revenue"] == 900.0
def test_empty_orders():
assert calculate_revenue([]) == []
Мокирование Connections¶
Python
# tests/test_extract.py
import pytest
from unittest.mock import patch, MagicMock
from dags.etl.extract import extract_from_api
@patch("dags.etl.extract.requests.get")
def test_extract_success(mock_get):
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: [{"id": 1, "amount": 100}],
)
result = extract_from_api("https://api.example.com/orders")
assert len(result) == 1
assert result[0]["amount"] == 100
@patch("dags.etl.extract.PostgresHook")
def test_load_to_postgres(mock_hook_class):
mock_hook = MagicMock()
mock_hook_class.return_value = mock_hook
from dags.etl.load import load_orders
load_orders([{"id": 1, "amount": 100}])
mock_hook.run.assert_called_once()
Airflow Variables и Connections в тестах¶
Python
# conftest.py
import os
@pytest.fixture(autouse=True)
def airflow_test_env(monkeypatch):
"""Подставляем тестовые значения."""
monkeypatch.setenv("AIRFLOW__CORE__DAGS_FOLDER", "dags/")
monkeypatch.setenv("AIRFLOW_VAR_ETL_SCHEMA", "test_staging")
monkeypatch.setenv(
"AIRFLOW_CONN_ETL_DB",
"postgresql://test:test@localhost:5432/test_db"
)
dag.test() — локальный прогон¶
Airflow 2.5+ позволяет запустить DAG локально без scheduler/webserver.
Python
# Из командной строки
airflow dags test etl_orders_daily 2025-03-15
# Из Python
from airflow.models import DagBag
dagbag = DagBag("dags/")
dag = dagbag.dags["etl_orders_daily"]
dag.test()
dag.test() для разработки
Вместо docker compose up + ждать scheduler + trigger → просто dag.test(). Выполняет все задачи последовательно в текущем процессе.
CI Pipeline¶
GitHub Actions¶
YAML
# .github/workflows/airflow-ci.yml
name: Airflow CI
on:
pull_request:
paths: ["dags/**", "tests/**"]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports: ["5432:5432"]
options: --health-cmd pg_isready
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install apache-airflow==2.8.0 \
apache-airflow-providers-postgres \
pytest pytest-cov
- name: Init Airflow DB
env:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
run: airflow db init
- name: Run DAG integrity tests
env:
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/dags
run: pytest tests/test_dag_integrity.py -v
- name: Run unit tests
run: pytest tests/ -v --ignore=tests/test_dag_integrity.py --cov=dags
Чеклист CI¶
- [ ] DAG Integrity — все файлы парсятся
- [ ] Unit tests — логика трансформаций
- [ ] Linting — ruff/flake8 на DAG-файлы
- [ ] SQL linting — SQLFluff на SQL в операторах
- [ ] No hardcoded credentials — grep на пароли/ключи
Проверь себя¶
Источники¶
- Airflow: Testing DAGs — best practices
- Airflow: dag.test() — локальный прогон