Паттерны DAG и TaskFlow
Паттерны DAG и TaskFlow API¶
DAG — это граф зависимостей между задачами. Но реальные пайплайны сложнее линейной цепочки: нужны ветвления, условные пути, динамическое количество задач и координация между DAG-ами.
TaskFlow API (@task)¶
TaskFlow (Airflow 2.0+) упрощает написание DAG-ов: декоратор @task заменяет PythonOperator + явный XCom.
Было (классический стиль)¶
def _extract(**context):
data = fetch_data()
context["ti"].xcom_push(key="raw", value=data)
def _transform(**context):
raw = context["ti"].xcom_pull(task_ids="extract", key="raw")
return [row["amount"] * 1.2 for row in raw]
extract = PythonOperator(task_id="extract", python_callable=_extract)
transform = PythonOperator(task_id="transform", python_callable=_transform)
extract >> transform
Стало (TaskFlow)¶
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def etl_pipeline():
@task
def extract():
return fetch_data()
@task
def transform(raw_data):
return [row["amount"] * 1.2 for row in raw_data]
@task
def load(transformed):
save_to_db(transformed)
raw = extract()
clean = transform(raw)
load(clean)
etl_pipeline()
XCom-передача происходит автоматически через аргументы функций.
Branching — условные пути¶
BranchPythonOperator¶
from airflow.operators.python import BranchPythonOperator
def choose_path(**context):
ds = context["ds"]
day = datetime.strptime(ds, "%Y-%m-%d").weekday()
if day < 5: # будни
return "process_workday"
return "process_weekend"
branch = BranchPythonOperator(
task_id="branch",
python_callable=choose_path,
)
branch >> [process_workday, process_weekend] >> end
@task.branch (TaskFlow)¶
@task.branch
def choose_path(ds=None):
if is_workday(ds):
return "process_workday"
return "process_weekend"
Downstream после branch пропускается
Задачи, которые НЕ на выбранном пути, получают статус skipped. Если после ветвления есть общий downstream-task, используй trigger_rule="none_failed_min_one_success".
Trigger Rules¶
По умолчанию задача запускается, когда все upstream-задачи завершились успешно (all_success). Trigger rule меняет это поведение.
| Rule | Описание | Пример |
|---|---|---|
all_success |
Все успешны (default) | Обычная цепочка |
all_failed |
Все провалились | Алертинг |
all_done |
Все завершились (любой статус) | Cleanup |
one_success |
Хотя бы одна успешна | После branching |
one_failed |
Хотя бы одна провалилась | Быстрый fail |
none_failed |
Ни одна не провалилась (skipped ОК) | После branch → общий downstream |
none_failed_min_one_success |
Ни одна не failed + хотя бы одна success | Рекомендуется после branch |
none_skipped |
Ни одна не skipped | Убедиться что все пути отработали |
Dynamic Task Mapping¶
Airflow 2.3+ позволяет создавать задачи динамически на основе данных.
expand() — map-like¶
@task
def get_file_list():
return ["orders_01.csv", "orders_02.csv", "orders_03.csv"]
@task
def process_file(filename):
df = pd.read_csv(f"/data/{filename}")
save_to_db(df)
return len(df)
@task
def summarize(counts):
total = sum(counts)
print(f"Total rows: {total}")
files = get_file_list()
results = process_file.expand(filename=files) # 3 параллельных task
summarize(results)
expand_kwargs¶
@task
def process(table, schema):
run_dbt(table, schema)
configs = [
{"table": "orders", "schema": "staging"},
{"table": "customers", "schema": "staging"},
{"table": "products", "schema": "raw"},
]
process.expand_kwargs(configs) # 3 задачи с разными параметрами
map_index в XCom¶
@task
def generate_reports(table):
return f"report_{table}.pdf"
# Результаты mapped task — список
reports = generate_reports.expand(table=["orders", "customers"])
# reports → ["report_orders.pdf", "report_customers.pdf"]
Datasets — координация между DAG-ами¶
Airflow 2.4+ поддерживает datasets — логические имена данных. Один DAG создаёт dataset, другой DAG реагирует на обновление.
from airflow.datasets import Dataset
# Определяем dataset
orders_dataset = Dataset("s3://etl-kitchen/orders/")
# DAG-продюсер
@dag(schedule="@hourly")
def produce_orders():
@task(outlets=[orders_dataset]) # помечаем как продюсер
def export_orders():
df = extract_orders()
df.to_parquet("s3://etl-kitchen/orders/latest.parquet")
export_orders()
# DAG-консюмер — запускается АВТОМАТИЧЕСКИ при обновлении dataset
@dag(schedule=[orders_dataset]) # триггер — dataset, не cron!
def consume_orders():
@task
def process():
df = pd.read_parquet("s3://etl-kitchen/orders/latest.parquet")
transform_and_load(df)
process()
Datasets заменяют ExternalTaskSensor
Вместо ExternalTaskSensor (polling, coupling) используй Datasets — event-driven, loosely coupled.
Паттерны DAG-дизайна¶
Идемпотентность¶
# Плохо — дублирование при повторном запуске
@task
def load(data):
insert_into_db(data)
# Хорошо — удаляем старое, вставляем новое
@task
def load(data, ds=None):
delete_from_db(date=ds)
insert_into_db(data)
Retry и Alerting¶
@dag(
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"on_failure_callback": send_alert_to_slack,
},
)
Sub-DAGs → TaskGroup¶
from airflow.utils.task_group import TaskGroup
@dag(schedule="@daily")
def etl():
with TaskGroup("extract") as extract_group:
extract_orders = BashOperator(...)
extract_customers = BashOperator(...)
with TaskGroup("transform") as transform_group:
transform_orders = PythonOperator(...)
transform_customers = PythonOperator(...)
extract_group >> transform_group
Проверь себя¶
Источники¶
- Airflow: TaskFlow API — @task декоратор
- Airflow: Dynamic Task Mapping — expand()
- Airflow: Datasets — event-driven scheduling
- Airflow: Branching — условные пути