Перейти к содержанию

Паттерны DAG и TaskFlow

Паттерны DAG и TaskFlow API

DAG — это граф зависимостей между задачами. Но реальные пайплайны сложнее линейной цепочки: нужны ветвления, условные пути, динамическое количество задач и координация между DAG-ами.


TaskFlow API (@task)

TaskFlow (Airflow 2.0+) упрощает написание DAG-ов: декоратор @task заменяет PythonOperator + явный XCom.

Было (классический стиль)

Python
def _extract(**context):
    data = fetch_data()
    context["ti"].xcom_push(key="raw", value=data)

def _transform(**context):
    raw = context["ti"].xcom_pull(task_ids="extract", key="raw")
    return [row["amount"] * 1.2 for row in raw]

extract = PythonOperator(task_id="extract", python_callable=_extract)
transform = PythonOperator(task_id="transform", python_callable=_transform)
extract >> transform

Стало (TaskFlow)

Python
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def etl_pipeline():

    @task
    def extract():
        return fetch_data()

    @task
    def transform(raw_data):
        return [row["amount"] * 1.2 for row in raw_data]

    @task
    def load(transformed):
        save_to_db(transformed)

    raw = extract()
    clean = transform(raw)
    load(clean)

etl_pipeline()

XCom-передача происходит автоматически через аргументы функций.


Branching — условные пути

BranchPythonOperator

Python
from airflow.operators.python import BranchPythonOperator

def choose_path(**context):
    ds = context["ds"]
    day = datetime.strptime(ds, "%Y-%m-%d").weekday()
    if day < 5:  # будни
        return "process_workday"
    return "process_weekend"

branch = BranchPythonOperator(
    task_id="branch",
    python_callable=choose_path,
)

branch >> [process_workday, process_weekend] >> end

@task.branch (TaskFlow)

Python
@task.branch
def choose_path(ds=None):
    if is_workday(ds):
        return "process_workday"
    return "process_weekend"

Downstream после branch пропускается

Задачи, которые НЕ на выбранном пути, получают статус skipped. Если после ветвления есть общий downstream-task, используй trigger_rule="none_failed_min_one_success".


Trigger Rules

По умолчанию задача запускается, когда все upstream-задачи завершились успешно (all_success). Trigger rule меняет это поведение.

Rule Описание Пример
all_success Все успешны (default) Обычная цепочка
all_failed Все провалились Алертинг
all_done Все завершились (любой статус) Cleanup
one_success Хотя бы одна успешна После branching
one_failed Хотя бы одна провалилась Быстрый fail
none_failed Ни одна не провалилась (skipped ОК) После branch → общий downstream
none_failed_min_one_success Ни одна не failed + хотя бы одна success Рекомендуется после branch
none_skipped Ни одна не skipped Убедиться что все пути отработали
Python
end = EmptyOperator(
    task_id="end",
    trigger_rule="none_failed_min_one_success",
)

Dynamic Task Mapping

Airflow 2.3+ позволяет создавать задачи динамически на основе данных.

expand() — map-like

Python
@task
def get_file_list():
    return ["orders_01.csv", "orders_02.csv", "orders_03.csv"]

@task
def process_file(filename):
    df = pd.read_csv(f"/data/{filename}")
    save_to_db(df)
    return len(df)

@task
def summarize(counts):
    total = sum(counts)
    print(f"Total rows: {total}")

files = get_file_list()
results = process_file.expand(filename=files)  # 3 параллельных task
summarize(results)

expand_kwargs

Python
@task
def process(table, schema):
    run_dbt(table, schema)

configs = [
    {"table": "orders", "schema": "staging"},
    {"table": "customers", "schema": "staging"},
    {"table": "products", "schema": "raw"},
]

process.expand_kwargs(configs)  # 3 задачи с разными параметрами

map_index в XCom

Python
@task
def generate_reports(table):
    return f"report_{table}.pdf"

# Результаты mapped task — список
reports = generate_reports.expand(table=["orders", "customers"])
# reports → ["report_orders.pdf", "report_customers.pdf"]

Datasets — координация между DAG-ами

Airflow 2.4+ поддерживает datasets — логические имена данных. Один DAG создаёт dataset, другой DAG реагирует на обновление.

Python
from airflow.datasets import Dataset

# Определяем dataset
orders_dataset = Dataset("s3://etl-kitchen/orders/")

# DAG-продюсер
@dag(schedule="@hourly")
def produce_orders():
    @task(outlets=[orders_dataset])  # помечаем как продюсер
    def export_orders():
        df = extract_orders()
        df.to_parquet("s3://etl-kitchen/orders/latest.parquet")

    export_orders()

# DAG-консюмер — запускается АВТОМАТИЧЕСКИ при обновлении dataset
@dag(schedule=[orders_dataset])  # триггер — dataset, не cron!
def consume_orders():
    @task
    def process():
        df = pd.read_parquet("s3://etl-kitchen/orders/latest.parquet")
        transform_and_load(df)

    process()

Datasets заменяют ExternalTaskSensor

Вместо ExternalTaskSensor (polling, coupling) используй Datasets — event-driven, loosely coupled.


Паттерны DAG-дизайна

Идемпотентность

Python
# Плохо — дублирование при повторном запуске
@task
def load(data):
    insert_into_db(data)

# Хорошо — удаляем старое, вставляем новое
@task
def load(data, ds=None):
    delete_from_db(date=ds)
    insert_into_db(data)

Retry и Alerting

Python
@dag(
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
        "retry_exponential_backoff": True,
        "on_failure_callback": send_alert_to_slack,
    },
)

Sub-DAGs → TaskGroup

Python
from airflow.utils.task_group import TaskGroup

@dag(schedule="@daily")
def etl():
    with TaskGroup("extract") as extract_group:
        extract_orders = BashOperator(...)
        extract_customers = BashOperator(...)

    with TaskGroup("transform") as transform_group:
        transform_orders = PythonOperator(...)
        transform_customers = PythonOperator(...)

    extract_group >> transform_group

Проверь себя


Источники