Архитектура Airflow¶
Airflow — платформа для создания, планирования и мониторинга пайплайнов данных. Пайплайн описывается как DAG (Directed Acyclic Graph) — набор задач с зависимостями. Чтобы эффективно работать с Airflow, нужно понимать, из каких компонентов он состоит и как они взаимодействуют.
Компоненты Airflow¶
Scheduler (Планировщик)¶
Центральный компонент. Scheduler непрерывно:
- Сканирует папку
dags/и парсит DAG-файлы - Проверяет расписание — пора ли запускать DAG
- Создаёт DAG Run (экземпляр запуска) и Task Instances
- Отправляет готовые задачи на выполнение через Executor
Scheduler — единственная точка отказа
Если scheduler упал — новые задачи не запускаются. В продакшене используй --num-runs -1 (бесконечный цикл) и healthcheck.
API Server (Веб-сервер)¶
Веб-интерфейс и REST API:
- Визуализация DAG (граф зависимостей, Grid, Gantt)
- Запуск и остановка DAG вручную
- Просмотр логов задач
- Управление connections и variables
- REST API для программного управления
Доступен по умолчанию на http://localhost:8080.
Executor (Исполнитель)¶
Определяет как задачи выполняются. Executor — это стратегия, а не отдельный процесс.
| Executor | Как работает | Когда подходит |
|---|---|---|
| SequentialExecutor | Одна задача за раз, в процессе scheduler | Только для тестов и отладки |
| LocalExecutor | Параллельно, в отдельных процессах на одной машине | Локальная разработка, одиночный сервер |
| CeleryExecutor | Распределённо, через брокер (Redis/RabbitMQ) на воркерах | Продакшн, масштабирование на несколько машин |
| KubernetesExecutor | Каждая задача — отдельный Pod в Kubernetes | Облачный продакшн, динамическое масштабирование |
┌─────────────────────────────────────┐
│ Scheduler │
│ (парсинг DAG, проверка расписания) │
└────────────────┬────────────────────┘
│
отправляет задачу
│
┌────────────────▼────────────────────┐
│ Executor │
├──────────┬───────────┬──────────────┤
│ Local │ Celery │ Kubernetes │
│ (fork) │ (Redis→ │ (Pod в k8s) │
│ │ Worker) │ │
└──────────┴───────────┴──────────────┘
Как выбрать Executor
- Один сервер, до 50 DAG → LocalExecutor
- Команда, несколько серверов → CeleryExecutor
- Kubernetes-кластер → KubernetesExecutor
- Тесты → SequentialExecutor
Worker (Воркер)¶
Процесс, который выполняет задачи. При LocalExecutor — fork-процессы scheduler. При CeleryExecutor — отдельные контейнеры, которые забирают задачи из брокера.
Triggerer¶
Обрабатывает асинхронные операции (deferrable operators). Вместо того чтобы воркер ждал ответа (polling), задача «засыпает» и просыпается, когда Triggerer получит событие.
Пример: ждём появления файла в S3. Без Triggerer — воркер занят всё время ожидания. С Triggerer — воркер свободен, Triggerer разбудит задачу, когда файл появится.
DAG Processor¶
Отвечает за парсинг DAG-файлов. В стандартной конфигурации работает внутри Scheduler. В продвинутых — выносится в отдельный процесс для изоляции.
Metadata Database¶
PostgreSQL (или MySQL), где Airflow хранит:
- Определения DAG и их расписание
- Историю DAG Run и Task Instance (статус, время, попытки)
- Connections (подключения к БД, API, облакам)
- Variables (переменные конфигурации)
- XCom (данные, передаваемые между задачами)
- Пулы и очереди
Metadata DB — не для данных
Metadata DB хранит только служебную информацию Airflow. Данные пайплайнов (таблицы, файлы) живут в отдельных хранилищах. Не пиши результаты DAG в metadata DB.
Как компоненты взаимодействуют¶
┌──────────────┐
│ API Server │
│ (UI + REST) │
└──────┬───────┘
│ читает
▼
┌──────────────┐ пишет/читает ┌──────────────┐
│ Scheduler │◄──────────────────►│ Metadata DB │
│ │ │ (PostgreSQL) │
└──────┬───────┘ └──────▲───────┘
│ │
│ отправляет задачу │ пишет результат
│ │
┌──────▼───────┐ ┌──────┴───────┐
│ Executor │───────────────────►│ Worker │
│ │ │ │
└──────────────┘ └──────────────┘
- Scheduler читает DAG-файлы, проверяет расписание
- Когда пора — создаёт DAG Run и Task Instance в Metadata DB
- Готовые задачи отправляются в Executor
- Worker выполняет задачу, результат записывается в Metadata DB
- API Server читает из Metadata DB и показывает статус в UI
Жизненный цикл DAG¶
Состояния DAG Run¶
| Состояние | Значение |
|---|---|
queued |
DAG Run создан, задачи ждут выполнения |
running |
Хотя бы одна задача выполняется |
success |
Все задачи завершились успешно |
failed |
Хотя бы одна задача провалилась |
Состояния Task Instance¶
| Состояние | Значение |
|---|---|
none |
Задача ещё не была запланирована |
scheduled |
Scheduler определил, что задачу пора запускать |
queued |
Задача отправлена в Executor |
running |
Задача выполняется на Worker |
success |
Задача завершена успешно |
failed |
Задача провалилась |
up_for_retry |
Задача провалилась, но будет перезапущена |
skipped |
Задача пропущена (например, по условию BranchOperator) |
DAG: основные параметры¶
from datetime import datetime, timedelta
from airflow.decorators import dag, task
@dag(
dag_id="my_pipeline",
start_date=datetime(2026, 1, 1),
schedule="0 6 * * *", # каждый день в 06:00
catchup=False, # не запускать пропущенные
max_active_runs=1, # не более 1 параллельного запуска
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
tags=["production", "etl"],
)
def my_pipeline():
@task()
def extract():
...
@task()
def transform(data):
...
@task()
def load(data):
...
raw = extract()
clean = transform(raw)
load(clean)
my_pipeline()
| Параметр | Назначение |
|---|---|
schedule |
Cron-выражение или preset (@daily, @hourly) |
start_date |
С какой даты считать расписание |
catchup |
Запускать ли пропущенные интервалы при первом включении |
max_active_runs |
Ограничение параллельных запусков DAG |
default_args |
Настройки по умолчанию для всех задач (retries, timeout) |
catchup=False для новых DAG
Если catchup=True (по умолчанию) и start_date в прошлом — Airflow запустит DAG за все пропущенные интервалы. Для нового DAG почти всегда нужен catchup=False.
Connections и Variables¶
Connections¶
Подключения к внешним системам — БД, API, облака. Хранятся в Metadata DB, зашифрованы Fernet-ключом.
Способы создания:
# Через CLI
airflow connections add 'my_postgres' \
--conn-type postgres \
--conn-host localhost \
--conn-port 5432 \
--conn-login user \
--conn-password pass \
--conn-schema mydb
# Через переменную окружения (формат URI)
AIRFLOW_CONN_MY_POSTGRES=postgresql://user:pass@localhost:5432/mydb
Variables¶
Пары ключ-значение для конфигурации DAG:
from airflow.models import Variable
api_url = Variable.get("api_base_url")
batch_size = Variable.get("batch_size", default_var=100)
config = Variable.get("etl_config", deserialize_json=True)
Не злоупотребляй Variable.get() в теле DAG
Variable.get() делает запрос к БД при каждом парсинге DAG (каждые 30 секунд). Используй только внутри @task(), не на верхнем уровне файла.
XCom — обмен данными между задачами¶
XCom (Cross-Communication) позволяет задачам передавать данные друг другу через Metadata DB.
@task()
def extract() -> dict:
# return автоматически pushит в XCom
return {"user_count": 42, "date": "2026-01-01"}
@task()
def transform(data: dict) -> dict:
# data автоматически pullит из XCom
data["user_count"] *= 2
return data
XCom — для метаданных, не для данных
XCom хранится в Metadata DB. Не передавай через XCom большие DataFrame или файлы. Для больших данных — запиши в хранилище (S3, PostgreSQL) и передай путь/ключ.
Что запомнить¶
- Scheduler — мозг Airflow: парсит DAG, проверяет расписание, отправляет задачи
- Executor — стратегия выполнения: Local для одного сервера, Celery для кластера
- Metadata DB — хранит всё кроме самих данных пайплайна
- DAG = граф задач с зависимостями,
@dag+@task— современный TaskFlow API - Connections — подключения к внешним системам, шифруются Fernet-ключом
- XCom — обмен метаданными между задачами, не для больших данных
Проверь себя¶
Источники¶
- Airflow Documentation: Architecture Overview — архитектура и компоненты
- Airflow Documentation: Executors — типы executor-ов
- Airflow Documentation: DAGs — описание DAG и их параметров