Продакшн-паттерны
Airflow в продакшене¶
Локальный Airflow с SequentialExecutor и SQLite — это для разработки. Продакшн — это кластер с Celery или Kubernetes executor, secrets в Vault, логи в S3 и алерты в Slack.
Executors — как выполняются задачи¶
| Executor | Описание | Когда |
|---|---|---|
| SequentialExecutor | Задачи одна за другой | Только разработка |
| LocalExecutor | Параллельно на одной машине | Маленькие команды, < 50 DAG |
| CeleryExecutor | Worker'ы на разных машинах через Redis/RabbitMQ | Средние нагрузки |
| KubernetesExecutor | Каждая задача — отдельный Pod | Большие нагрузки, изоляция |
| CeleryKubernetesExecutor | Celery + возможность запуска отдельных задач в K8s | Гибридный подход |
CeleryExecutor¶
INI
# airflow.cfg
[core]
executor = CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 8
Python
# Задача на конкретной очереди
heavy_task = PythonOperator(
task_id="heavy_transform",
python_callable=run_spark_job,
queue="heavy", # выполнится на worker'е, слушающем "heavy"
)
KubernetesExecutor¶
YAML
# airflow.cfg или helm values
[kubernetes_executor]
namespace = airflow
worker_container_repository = myregistry/airflow
worker_container_tag = 2.8.0
delete_worker_pods = True
Python
# Каждая задача может иметь свой Docker-образ
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
spark_task = KubernetesPodOperator(
task_id="run_spark",
name="spark-job",
namespace="data",
image="bitnami/spark:3.5",
cmds=["spark-submit"],
arguments=["--master", "k8s://...", "job.py"],
resources={"request_memory": "4Gi", "request_cpu": "2"},
is_delete_operator_pod=True,
)
Secrets Management¶
Environment Variables¶
Python
# Airflow автоматически подхватывает переменные с префиксами:
# AIRFLOW_VAR_<name> → Variable.get("<name>")
# AIRFLOW_CONN_<conn_id> → Connection("<conn_id>")
# .env
AIRFLOW_VAR_S3_BUCKET=etl-kitchen-data
AIRFLOW_CONN_ETL_DB=postgresql://user:pass@host:5432/db
Secrets Backend¶
INI
# airflow.cfg
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
"connections_path": "airflow/connections",
"variables_path": "airflow/variables",
"url": "https://vault.internal:8200",
"auth_type": "token",
"token": "s.xxxxx"
}
Приоритет поиска: 1. Secrets Backend (Vault, AWS SSM, GCP Secret Manager) 2. Environment Variables 3. Airflow Metadata DB (UI / CLI)
Никогда не храни секреты в DAG-коде
Ни пароли, ни API-ключи, ни токены. Используй Connections, Variables или Secrets Backend.
Remote Logging¶
По умолчанию логи хранятся локально в $AIRFLOW_HOME/logs/. В продакшене — отправляй в S3/GCS.
INI
# airflow.cfg
[logging]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://etl-kitchen-logs/airflow/
Мониторинг и алерты¶
Callbacks¶
Python
from airflow.providers.slack.notifications.slack import send_slack_notification
def on_failure(context):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
execution_date = context["execution_date"]
log_url = context["task_instance"].log_url
send_slack_notification(
slack_conn_id="slack_webhook",
text=f":red_circle: *{dag_id}.{task_id}* failed at {execution_date}\n{log_url}",
)
@dag(
default_args={"on_failure_callback": on_failure},
# SLA: алерт если DAG не завершился за 2 часа
dagrun_timeout=timedelta(hours=2),
)
SLA¶
Python
@task(sla=timedelta(hours=1))
def extract():
"""Если extract не завершится за 1 час — SLA miss."""
...
StatsD + Grafana¶
INI
# airflow.cfg
[metrics]
statsd_on = True
statsd_host = statsd
statsd_port = 8125
statsd_prefix = airflow
Ключевые метрики:
| Метрика | Описание | Алерт |
|---|---|---|
dag_processing.total_parse_time |
Время парсинга DAG-файлов | > 30 секунд |
scheduler.tasks.running |
Запущенные задачи | > 80% worker capacity |
scheduler.tasks.starving |
Задачи в очереди без worker'а | > 0 долгое время |
ti.failures |
Провалы задач | Любой |
dag_run.duration |
Время выполнения DAG | > SLA |
Масштабирование¶
Горизонтальное (больше worker'ов)¶
Bash
# Celery: добавить worker
airflow celery worker --queues default -c 16
# Kubernetes: Airflow автоматически создаёт Pod'ы
Вертикальное (больше ресурсов)¶
Python
# Ресурсы для конкретной задачи (K8s Executor)
@task(
executor_config={
"KubernetesExecutor": {
"request_memory": "8Gi",
"request_cpu": "4",
"limit_memory": "16Gi",
}
}
)
def heavy_transform():
...
Оптимизация Scheduler¶
INI
# airflow.cfg
[scheduler]
parsing_processes = 4 # параллельный парсинг DAG-файлов
min_file_process_interval = 30 # не парсить чаще чем раз в 30 сек
dag_dir_list_interval = 60 # сканировать dags/ каждые 60 сек
[core]
parallelism = 32 # макс задач одновременно во всём кластере
max_active_tasks_per_dag = 16 # макс задач на один DAG
max_active_runs_per_dag = 3 # макс одновременных запусков одного DAG
Деплой DAG-ов¶
| Метод | Описание | Плюсы | Минусы |
|---|---|---|---|
| Git-sync sidecar | Pod синхронизирует dags/ из git | Нет rebuild | Задержка синхронизации |
| Docker image | DAG-ы в Docker-образе | Версионирование | Rebuild при каждом изменении |
| S3/GCS | DAG-ы в object storage | Простота | Не git-native |
| PersistentVolume | Shared volume | Простота в K8s | Нет версионирования |
Git-sync (рекомендуется)¶
YAML
# Helm values (Airflow chart)
dags:
gitSync:
enabled: true
repo: git@github.com:org/airflow-dags.git
branch: main
subPath: dags/
wait: 60
Проверь себя¶
Источники¶
- Airflow: Production Deployment — руководство по деплою
- Airflow: Executors — Celery, Kubernetes
- Airflow: Logging & Monitoring — remote logging, metrics
- Airflow Helm Chart — Kubernetes деплой