Перейти к содержанию

Архитектура Airflow

Airflow — платформа для создания, планирования и мониторинга пайплайнов данных. Пайплайн описывается как DAG (Directed Acyclic Graph) — набор задач с зависимостями. Чтобы эффективно работать с Airflow, нужно понимать, из каких компонентов он состоит и как они взаимодействуют.


Компоненты Airflow

Scheduler (Планировщик)

Центральный компонент. Scheduler непрерывно:

  1. Сканирует папку dags/ и парсит DAG-файлы
  2. Проверяет расписание — пора ли запускать DAG
  3. Создаёт DAG Run (экземпляр запуска) и Task Instances
  4. Отправляет готовые задачи на выполнение через Executor

Scheduler — единственная точка отказа

Если scheduler упал — новые задачи не запускаются. В продакшене используй --num-runs -1 (бесконечный цикл) и healthcheck.

API Server (Веб-сервер)

Веб-интерфейс и REST API:

  • Визуализация DAG (граф зависимостей, Grid, Gantt)
  • Запуск и остановка DAG вручную
  • Просмотр логов задач
  • Управление connections и variables
  • REST API для программного управления

Доступен по умолчанию на http://localhost:8080.

Executor (Исполнитель)

Определяет как задачи выполняются. Executor — это стратегия, а не отдельный процесс.

Executor Как работает Когда подходит
SequentialExecutor Одна задача за раз, в процессе scheduler Только для тестов и отладки
LocalExecutor Параллельно, в отдельных процессах на одной машине Локальная разработка, одиночный сервер
CeleryExecutor Распределённо, через брокер (Redis/RabbitMQ) на воркерах Продакшн, масштабирование на несколько машин
KubernetesExecutor Каждая задача — отдельный Pod в Kubernetes Облачный продакшн, динамическое масштабирование
Text Only
                    ┌─────────────────────────────────────┐
                    │           Scheduler                  │
                    │  (парсинг DAG, проверка расписания)   │
                    └────────────────┬────────────────────┘
                              отправляет задачу
                    ┌────────────────▼────────────────────┐
                    │           Executor                    │
                    ├──────────┬───────────┬──────────────┤
                    │ Local    │ Celery    │ Kubernetes   │
                    │ (fork)   │ (Redis→   │ (Pod в k8s)  │
                    │          │  Worker)  │              │
                    └──────────┴───────────┴──────────────┘

Как выбрать Executor

  • Один сервер, до 50 DAG → LocalExecutor
  • Команда, несколько серверов → CeleryExecutor
  • Kubernetes-кластер → KubernetesExecutor
  • Тесты → SequentialExecutor

Worker (Воркер)

Процесс, который выполняет задачи. При LocalExecutor — fork-процессы scheduler. При CeleryExecutor — отдельные контейнеры, которые забирают задачи из брокера.

Triggerer

Обрабатывает асинхронные операции (deferrable operators). Вместо того чтобы воркер ждал ответа (polling), задача «засыпает» и просыпается, когда Triggerer получит событие.

Пример: ждём появления файла в S3. Без Triggerer — воркер занят всё время ожидания. С Triggerer — воркер свободен, Triggerer разбудит задачу, когда файл появится.

DAG Processor

Отвечает за парсинг DAG-файлов. В стандартной конфигурации работает внутри Scheduler. В продвинутых — выносится в отдельный процесс для изоляции.

Metadata Database

PostgreSQL (или MySQL), где Airflow хранит:

  • Определения DAG и их расписание
  • Историю DAG Run и Task Instance (статус, время, попытки)
  • Connections (подключения к БД, API, облакам)
  • Variables (переменные конфигурации)
  • XCom (данные, передаваемые между задачами)
  • Пулы и очереди

Metadata DB — не для данных

Metadata DB хранит только служебную информацию Airflow. Данные пайплайнов (таблицы, файлы) живут в отдельных хранилищах. Не пиши результаты DAG в metadata DB.


Как компоненты взаимодействуют

Text Only
                                    ┌──────────────┐
                                    │  API Server   │
                                    │  (UI + REST)  │
                                    └──────┬───────┘
                                           │ читает
┌──────────────┐    пишет/читает    ┌──────────────┐
│  Scheduler   │◄──────────────────►│  Metadata DB │
│              │                    │  (PostgreSQL) │
└──────┬───────┘                    └──────▲───────┘
       │                                   │
       │ отправляет задачу                 │ пишет результат
       │                                   │
┌──────▼───────┐                    ┌──────┴───────┐
│   Executor   │───────────────────►│   Worker     │
│              │                    │              │
└──────────────┘                    └──────────────┘
  1. Scheduler читает DAG-файлы, проверяет расписание
  2. Когда пора — создаёт DAG Run и Task Instance в Metadata DB
  3. Готовые задачи отправляются в Executor
  4. Worker выполняет задачу, результат записывается в Metadata DB
  5. API Server читает из Metadata DB и показывает статус в UI

Жизненный цикл DAG

Состояния DAG Run

Состояние Значение
queued DAG Run создан, задачи ждут выполнения
running Хотя бы одна задача выполняется
success Все задачи завершились успешно
failed Хотя бы одна задача провалилась

Состояния Task Instance

Состояние Значение
none Задача ещё не была запланирована
scheduled Scheduler определил, что задачу пора запускать
queued Задача отправлена в Executor
running Задача выполняется на Worker
success Задача завершена успешно
failed Задача провалилась
up_for_retry Задача провалилась, но будет перезапущена
skipped Задача пропущена (например, по условию BranchOperator)

DAG: основные параметры

Python
from datetime import datetime, timedelta
from airflow.decorators import dag, task

@dag(
    dag_id="my_pipeline",
    start_date=datetime(2026, 1, 1),
    schedule="0 6 * * *",        # каждый день в 06:00
    catchup=False,                # не запускать пропущенные
    max_active_runs=1,            # не более 1 параллельного запуска
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["production", "etl"],
)
def my_pipeline():
    @task()
    def extract():
        ...

    @task()
    def transform(data):
        ...

    @task()
    def load(data):
        ...

    raw = extract()
    clean = transform(raw)
    load(clean)

my_pipeline()
Параметр Назначение
schedule Cron-выражение или preset (@daily, @hourly)
start_date С какой даты считать расписание
catchup Запускать ли пропущенные интервалы при первом включении
max_active_runs Ограничение параллельных запусков DAG
default_args Настройки по умолчанию для всех задач (retries, timeout)

catchup=False для новых DAG

Если catchup=True (по умолчанию) и start_date в прошлом — Airflow запустит DAG за все пропущенные интервалы. Для нового DAG почти всегда нужен catchup=False.


Connections и Variables

Connections

Подключения к внешним системам — БД, API, облака. Хранятся в Metadata DB, зашифрованы Fernet-ключом.

Способы создания:

Bash
# Через CLI
airflow connections add 'my_postgres' \
    --conn-type postgres \
    --conn-host localhost \
    --conn-port 5432 \
    --conn-login user \
    --conn-password pass \
    --conn-schema mydb

# Через переменную окружения (формат URI)
AIRFLOW_CONN_MY_POSTGRES=postgresql://user:pass@localhost:5432/mydb

Variables

Пары ключ-значение для конфигурации DAG:

Python
from airflow.models import Variable

api_url = Variable.get("api_base_url")
batch_size = Variable.get("batch_size", default_var=100)
config = Variable.get("etl_config", deserialize_json=True)

Не злоупотребляй Variable.get() в теле DAG

Variable.get() делает запрос к БД при каждом парсинге DAG (каждые 30 секунд). Используй только внутри @task(), не на верхнем уровне файла.


XCom — обмен данными между задачами

XCom (Cross-Communication) позволяет задачам передавать данные друг другу через Metadata DB.

Python
@task()
def extract() -> dict:
    # return автоматически pushит в XCom
    return {"user_count": 42, "date": "2026-01-01"}

@task()
def transform(data: dict) -> dict:
    # data автоматически pullит из XCom
    data["user_count"] *= 2
    return data

XCom — для метаданных, не для данных

XCom хранится в Metadata DB. Не передавай через XCom большие DataFrame или файлы. Для больших данных — запиши в хранилище (S3, PostgreSQL) и передай путь/ключ.


Что запомнить

  • Scheduler — мозг Airflow: парсит DAG, проверяет расписание, отправляет задачи
  • Executor — стратегия выполнения: Local для одного сервера, Celery для кластера
  • Metadata DB — хранит всё кроме самих данных пайплайна
  • DAG = граф задач с зависимостями, @dag + @task — современный TaskFlow API
  • Connections — подключения к внешним системам, шифруются Fernet-ключом
  • XCom — обмен метаданными между задачами, не для больших данных

Проверь себя


Источники