Перейти к содержанию

Тестирование и CI/CD

Тестирование и CI/CD для Airflow

DAG без тестов — бомба замедленного действия. Ошибка в DAG обнаруживается в 3 часа ночи, когда пайплайн падает и утренний отчёт пуст. Тестирование ловит проблемы до деплоя.


Уровни тестирования DAG

Уровень Что проверяет Скорость Пример
DAG Integrity DAG валиден, парсится без ошибок Секунды Нет циклов, нет import errors
Unit Логика отдельных функций Секунды transform() возвращает правильный результат
Integration Взаимодействие с внешними системами Минуты PostgresHook реально пишет в тестовую БД
DAG.test() Полный прогон DAG локально Минуты Все задачи выполняются последовательно

DAG Integrity Tests

Минимальный тест — убедиться, что все DAG-файлы парсятся без ошибок.

Python
# tests/test_dag_integrity.py
import pytest
from airflow.models import DagBag

@pytest.fixture(scope="session")
def dagbag():
    return DagBag(dag_folder="dags/", include_examples=False)

def test_no_import_errors(dagbag):
    """Все DAG-файлы парсятся без ошибок."""
    assert dagbag.import_errors == {}, f"Import errors: {dagbag.import_errors}"

def test_dag_count(dagbag):
    """Минимальное количество DAG-ов."""
    assert len(dagbag.dags) >= 3, f"Expected >= 3 DAGs, got {len(dagbag.dags)}"

@pytest.mark.parametrize("dag_id", [
    "etl_orders_daily",
    "etl_customers_sync",
    "dbt_run_daily",
])
def test_dag_exists(dagbag, dag_id):
    """Конкретные DAG-ы существуют."""
    assert dag_id in dagbag.dags, f"DAG '{dag_id}' not found"

def test_no_cycles(dagbag):
    """Нет циклических зависимостей."""
    for dag_id, dag in dagbag.dags.items():
        # Airflow не позволяет создать DAG с циклом, но проверим
        assert not dag.test_cycle(), f"Cycle in {dag_id}"

Проверка структуры DAG

Python
def test_etl_orders_structure(dagbag):
    dag = dagbag.dags["etl_orders_daily"]

    # Правильное количество задач
    assert len(dag.tasks) == 5

    # Правильные зависимости
    extract = dag.get_task("extract")
    transform = dag.get_task("transform")
    assert transform in extract.downstream_list

    # Schedule
    assert dag.schedule_interval == "@daily"

    # Теги
    assert "etl" in dag.tags

    # Нет запрещённых операторов
    for task in dag.tasks:
        assert type(task).__name__ != "SubDagOperator", "SubDag deprecated!"

Unit-тесты логики

Тестируй функции, а не операторы. Вынеси логику из DAG-файла.

Python
# dags/etl/transform.py
def calculate_revenue(orders: list[dict]) -> list[dict]:
    result = []
    for order in orders:
        revenue = order["quantity"] * order["price"] * (1 - order.get("discount", 0) / 100)
        result.append({**order, "revenue": round(revenue, 2)})
    return result
Python
# tests/test_transform.py
from dags.etl.transform import calculate_revenue

def test_revenue_no_discount():
    orders = [{"quantity": 2, "price": 100.0}]
    result = calculate_revenue(orders)
    assert result[0]["revenue"] == 200.0

def test_revenue_with_discount():
    orders = [{"quantity": 1, "price": 1000.0, "discount": 10}]
    result = calculate_revenue(orders)
    assert result[0]["revenue"] == 900.0

def test_empty_orders():
    assert calculate_revenue([]) == []

Мокирование Connections

Python
# tests/test_extract.py
import pytest
from unittest.mock import patch, MagicMock
from dags.etl.extract import extract_from_api

@patch("dags.etl.extract.requests.get")
def test_extract_success(mock_get):
    mock_get.return_value = MagicMock(
        status_code=200,
        json=lambda: [{"id": 1, "amount": 100}],
    )
    result = extract_from_api("https://api.example.com/orders")
    assert len(result) == 1
    assert result[0]["amount"] == 100

@patch("dags.etl.extract.PostgresHook")
def test_load_to_postgres(mock_hook_class):
    mock_hook = MagicMock()
    mock_hook_class.return_value = mock_hook

    from dags.etl.load import load_orders
    load_orders([{"id": 1, "amount": 100}])

    mock_hook.run.assert_called_once()

Airflow Variables и Connections в тестах

Python
# conftest.py
import os

@pytest.fixture(autouse=True)
def airflow_test_env(monkeypatch):
    """Подставляем тестовые значения."""
    monkeypatch.setenv("AIRFLOW__CORE__DAGS_FOLDER", "dags/")
    monkeypatch.setenv("AIRFLOW_VAR_ETL_SCHEMA", "test_staging")
    monkeypatch.setenv(
        "AIRFLOW_CONN_ETL_DB",
        "postgresql://test:test@localhost:5432/test_db"
    )

dag.test() — локальный прогон

Airflow 2.5+ позволяет запустить DAG локально без scheduler/webserver.

Python
# Из командной строки
airflow dags test etl_orders_daily 2025-03-15

# Из Python
from airflow.models import DagBag

dagbag = DagBag("dags/")
dag = dagbag.dags["etl_orders_daily"]
dag.test()
Python
# С конкретной датой
dag.test(execution_date=datetime(2025, 3, 15))

dag.test() для разработки

Вместо docker compose up + ждать scheduler + trigger → просто dag.test(). Выполняет все задачи последовательно в текущем процессе.


CI Pipeline

GitHub Actions

YAML
# .github/workflows/airflow-ci.yml
name: Airflow CI

on:
  pull_request:
    paths: ["dags/**", "tests/**"]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        ports: ["5432:5432"]
        options: --health-cmd pg_isready

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          pip install apache-airflow==2.8.0 \
                      apache-airflow-providers-postgres \
                      pytest pytest-cov

      - name: Init Airflow DB
        env:
          AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost/airflow
          AIRFLOW__CORE__LOAD_EXAMPLES: "false"
        run: airflow db init

      - name: Run DAG integrity tests
        env:
          AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/dags
        run: pytest tests/test_dag_integrity.py -v

      - name: Run unit tests
        run: pytest tests/ -v --ignore=tests/test_dag_integrity.py --cov=dags

Чеклист CI

  • [ ] DAG Integrity — все файлы парсятся
  • [ ] Unit tests — логика трансформаций
  • [ ] Linting — ruff/flake8 на DAG-файлы
  • [ ] SQL linting — SQLFluff на SQL в операторах
  • [ ] No hardcoded credentials — grep на пароли/ключи

Проверь себя


Источники