Перейти к содержанию

Продакшн-паттерны

Airflow в продакшене

Локальный Airflow с SequentialExecutor и SQLite — это для разработки. Продакшн — это кластер с Celery или Kubernetes executor, secrets в Vault, логи в S3 и алерты в Slack.


Executors — как выполняются задачи

Executor Описание Когда
SequentialExecutor Задачи одна за другой Только разработка
LocalExecutor Параллельно на одной машине Маленькие команды, < 50 DAG
CeleryExecutor Worker'ы на разных машинах через Redis/RabbitMQ Средние нагрузки
KubernetesExecutor Каждая задача — отдельный Pod Большие нагрузки, изоляция
CeleryKubernetesExecutor Celery + возможность запуска отдельных задач в K8s Гибридный подход

CeleryExecutor

INI
# airflow.cfg
[core]
executor = CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres/airflow
worker_concurrency = 8
Bash
# Запуск worker'а
airflow celery worker --queues default,heavy
Python
# Задача на конкретной очереди
heavy_task = PythonOperator(
    task_id="heavy_transform",
    python_callable=run_spark_job,
    queue="heavy",  # выполнится на worker'е, слушающем "heavy"
)

KubernetesExecutor

YAML
# airflow.cfg или helm values
[kubernetes_executor]
namespace = airflow
worker_container_repository = myregistry/airflow
worker_container_tag = 2.8.0
delete_worker_pods = True
Python
# Каждая задача может иметь свой Docker-образ
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

spark_task = KubernetesPodOperator(
    task_id="run_spark",
    name="spark-job",
    namespace="data",
    image="bitnami/spark:3.5",
    cmds=["spark-submit"],
    arguments=["--master", "k8s://...", "job.py"],
    resources={"request_memory": "4Gi", "request_cpu": "2"},
    is_delete_operator_pod=True,
)

Secrets Management

Environment Variables

Python
# Airflow автоматически подхватывает переменные с префиксами:
# AIRFLOW_VAR_<name> → Variable.get("<name>")
# AIRFLOW_CONN_<conn_id> → Connection("<conn_id>")

# .env
AIRFLOW_VAR_S3_BUCKET=etl-kitchen-data
AIRFLOW_CONN_ETL_DB=postgresql://user:pass@host:5432/db

Secrets Backend

INI
# airflow.cfg
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
    "connections_path": "airflow/connections",
    "variables_path": "airflow/variables",
    "url": "https://vault.internal:8200",
    "auth_type": "token",
    "token": "s.xxxxx"
}

Приоритет поиска: 1. Secrets Backend (Vault, AWS SSM, GCP Secret Manager) 2. Environment Variables 3. Airflow Metadata DB (UI / CLI)

Никогда не храни секреты в DAG-коде

Ни пароли, ни API-ключи, ни токены. Используй Connections, Variables или Secrets Backend.


Remote Logging

По умолчанию логи хранятся локально в $AIRFLOW_HOME/logs/. В продакшене — отправляй в S3/GCS.

INI
# airflow.cfg
[logging]
remote_logging = True
remote_log_conn_id = aws_default
remote_base_log_folder = s3://etl-kitchen-logs/airflow/
Python
# Или для GCS
remote_base_log_folder = gs://etl-kitchen-logs/airflow/

Мониторинг и алерты

Callbacks

Python
from airflow.providers.slack.notifications.slack import send_slack_notification

def on_failure(context):
    dag_id = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    execution_date = context["execution_date"]
    log_url = context["task_instance"].log_url
    send_slack_notification(
        slack_conn_id="slack_webhook",
        text=f":red_circle: *{dag_id}.{task_id}* failed at {execution_date}\n{log_url}",
    )

@dag(
    default_args={"on_failure_callback": on_failure},
    # SLA: алерт если DAG не завершился за 2 часа
    dagrun_timeout=timedelta(hours=2),
)

SLA

Python
@task(sla=timedelta(hours=1))
def extract():
    """Если extract не завершится за 1 час — SLA miss."""
    ...

StatsD + Grafana

INI
# airflow.cfg
[metrics]
statsd_on = True
statsd_host = statsd
statsd_port = 8125
statsd_prefix = airflow

Ключевые метрики:

Метрика Описание Алерт
dag_processing.total_parse_time Время парсинга DAG-файлов > 30 секунд
scheduler.tasks.running Запущенные задачи > 80% worker capacity
scheduler.tasks.starving Задачи в очереди без worker'а > 0 долгое время
ti.failures Провалы задач Любой
dag_run.duration Время выполнения DAG > SLA

Масштабирование

Горизонтальное (больше worker'ов)

Bash
# Celery: добавить worker
airflow celery worker --queues default -c 16

# Kubernetes: Airflow автоматически создаёт Pod'ы

Вертикальное (больше ресурсов)

Python
# Ресурсы для конкретной задачи (K8s Executor)
@task(
    executor_config={
        "KubernetesExecutor": {
            "request_memory": "8Gi",
            "request_cpu": "4",
            "limit_memory": "16Gi",
        }
    }
)
def heavy_transform():
    ...

Оптимизация Scheduler

INI
# airflow.cfg
[scheduler]
parsing_processes = 4          # параллельный парсинг DAG-файлов
min_file_process_interval = 30 # не парсить чаще чем раз в 30 сек
dag_dir_list_interval = 60     # сканировать dags/ каждые 60 сек

[core]
parallelism = 32               # макс задач одновременно во всём кластере
max_active_tasks_per_dag = 16  # макс задач на один DAG
max_active_runs_per_dag = 3    # макс одновременных запусков одного DAG

Деплой DAG-ов

Метод Описание Плюсы Минусы
Git-sync sidecar Pod синхронизирует dags/ из git Нет rebuild Задержка синхронизации
Docker image DAG-ы в Docker-образе Версионирование Rebuild при каждом изменении
S3/GCS DAG-ы в object storage Простота Не git-native
PersistentVolume Shared volume Простота в K8s Нет версионирования

Git-sync (рекомендуется)

YAML
# Helm values (Airflow chart)
dags:
  gitSync:
    enabled: true
    repo: git@github.com:org/airflow-dags.git
    branch: main
    subPath: dags/
    wait: 60

Проверь себя


Источники