Перейти к содержанию

Airflow для локальной разработки

Эта конфигурация — золотая середина между быстрым стартом и продакшном. Один контейнер Airflow с LocalExecutor, кастомный образ с твоими библиотеками, два Postgres — один для метаданных Airflow, другой для данных.


Чем отличается от быстрого старта

Аспект Быстрый старт Эта конфигурация
Executor CeleryExecutor (избыточно) LocalExecutor (достаточно для одного сервера)
Контейнеры Airflow 5 (webserver, scheduler, worker, triggerer, init) 1 (scheduler + api-server + dag-processor)
Docker-образ Стандартный Кастомный с requirements.txt
Redis Да (для Celery) Не нужен
Postgres для данных Нет Да (target-postgres)
Python-библиотеки Нужен docker exec pip install Встроены в образ

Почему LocalExecutor

LocalExecutor запускает задачи в отдельных процессах на одной машине. Не нужен Redis или RabbitMQ. Для локальной разработки и одиночного сервера — идеальный выбор. CeleryExecutor нужен только при масштабировании на несколько машин.


Структура проекта

Text Only
airflow-project/
├── dags/                   # DAG-файлы
├── logs/                   # Логи выполнения задач
├── plugins/                # Кастомные плагины
├── postgres-data/          # Данные метабазы Airflow
├── target-postgres-data/   # Данные целевой базы
├── Dockerfile              # Кастомный образ
├── requirements.txt        # Python-зависимости
├── docker-compose.yml      # Конфигурация
└── .env                    # Переменные окружения

Подготовка

1. Создай директории

Bash
mkdir -p airflow-project/{dags,logs,plugins,postgres-data,target-postgres-data}
cd airflow-project

2. Создай .env

Bash
echo "AIRFLOW_UID=$(id -u)" > .env

3. Создай requirements.txt

Text Only
requests
pandas
psycopg2-binary
apache-airflow-providers-postgres==5.10.0

4. Создай Dockerfile

Docker
FROM apache/airflow:3.1.1

USER airflow

COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt

USER airflow

Устанавливай пакеты от пользователя airflow, не от root. Это стандарт официального образа.


Docker Compose

YAML
# Общие ENV для Airflow
x-airflow-env: &airflow-env
  AIRFLOW__CORE__EXECUTOR: LocalExecutor
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
  AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS: "False"
  AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "True"
  AIRFLOW__API__EXPOSE_CONFIG: "True"
  AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
  AIRFLOW__CORE__LOAD_EXAMPLES: "False"
  AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_ALL_ADMINS: "True"
  # Connection для целевого Postgres
  AIRFLOW_CONN_TARGET_POSTGRES: postgresql+psycopg2://demo:demo@target-postgres:5432/demo
  AIRFLOW_UID: ${AIRFLOW_UID:-50000}

# Общие настройки контейнеров Airflow
x-airflow-common: &airflow-common
  build: .
  environment:
    <<: *airflow-env
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  networks:
    - airflow-net

services:
  # Postgres для метаданных Airflow
  postgres:
    image: postgres:15
    container_name: airflow-postgres
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5434:5432"
    volumes:
      - ./postgres-data:/var/lib/postgresql/data
    networks:
      - airflow-net

  # Postgres для данных DAG (целевая БД)
  target-postgres:
    image: postgres:15
    container_name: target-postgres
    environment:
      POSTGRES_USER: demo
      POSTGRES_PASSWORD: demo
      POSTGRES_DB: demo
    ports:
      - "5433:5432"
    volumes:
      - ./target-postgres-data:/var/lib/postgresql/data
    networks:
      - airflow-net

  # Одноразовая миграция БД
  airflow-init:
    <<: *airflow-common
    container_name: airflow-init
    command:
      - bash
      - -c
      - |
        echo "Running airflow db migrate..." &&
        airflow db migrate &&
        echo "Done."
    depends_on:
      - postgres

  # Основной контейнер: scheduler + dag-processor + api-server
  airflow:
    <<: *airflow-common
    container_name: airflow
    command:
      - bash
      - -c
      - |
        airflow scheduler &
        airflow dag-processor &
        airflow api-server
    depends_on:
      - postgres
      - target-postgres
      - airflow-init
    ports:
      - "8080:8080"
    restart: always

networks:
  airflow-net:
    driver: bridge

Что происходит

  • YAML-якоря (&airflow-env, &airflow-common) — избавляют от дублирования настроек
  • LocalExecutor — один процесс, без Redis и воркеров
  • Два Postgres — метаданные Airflow (порт 5434) и целевая БД для данных (порт 5433)
  • AIRFLOW_CONN_TARGET_POSTGRES — connection target_postgres создаётся автоматически из переменной окружения
  • Один контейнер Airflow — scheduler, dag-processor и api-server запускаются в одном контейнере

Запуск

Bash
# 1. Собери образ
docker compose build

# 2. Инициализируй БД (миграции)
docker compose up airflow-init
# Дождись "Done." и exit code 0

# 3. Запусти Airflow
docker compose up -d airflow

UI доступен на http://localhost:8080. Авторизация не требуется — SIMPLE_AUTH_MANAGER_ALL_ADMINS: True пускает всех.

Повторный запуск

При повторном запуске init и build не нужны. Просто docker compose up -d airflow.


Пример DAG: API → PostgreSQL

Python
# dags/dummyjson_to_postgres.py
from datetime import datetime

import requests
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook


@dag(
    dag_id="dummyjson_to_postgres",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    tags=["api", "postgres"],
)
def dummyjson_to_postgres():

    @task()
    def fetch_product() -> dict:
        resp = requests.get("https://dummyjson.com/products/1", timeout=10)
        resp.raise_for_status()
        data = resp.json()
        return {
            "id": data["id"],
            "title": data["title"],
            "price": data["price"],
            "category": data["category"],
        }

    @task()
    def write_to_postgres(product: dict):
        hook = PostgresHook(postgres_conn_id="target_postgres")
        conn = hook.get_conn()
        with conn:
            with conn.cursor() as cur:
                cur.execute("""
                    CREATE TABLE IF NOT EXISTS products (
                        id INTEGER PRIMARY KEY,
                        title TEXT,
                        price NUMERIC,
                        category TEXT,
                        created_at TIMESTAMP DEFAULT now()
                    )
                """)
                cur.execute("""
                    INSERT INTO products (id, title, price, category)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE SET
                        title = EXCLUDED.title,
                        price = EXCLUDED.price,
                        category = EXCLUDED.category
                """, (product["id"], product["title"],
                      product["price"], product["category"]))

    write_to_postgres(fetch_product())


dummyjson_to_postgres()

DAG использует:

  • TaskFlow API (@dag, @task) — декораторы вместо явного создания операторов
  • PostgresHook — подключение через connection target_postgres (из переменной AIRFLOW_CONN_TARGET_POSTGRES)
  • INSERT ... ON CONFLICT — upsert, безопасно запускать повторно

Как добавить библиотеку

  1. Добавь в requirements.txt
  2. Пересобери образ: docker compose build
  3. Перезапусти: docker compose up -d airflow

Подключение к базам данных

База Хост Порт БД Логин Пароль
Метаданные Airflow localhost 5434 airflow airflow airflow
Целевая БД (данные) localhost 5433 demo demo demo

Что запомнить

  • LocalExecutor — достаточно для одного сервера, не нужен Redis
  • Кастомный Dockerfile — библиотеки сохраняются при перезапуске
  • Два Postgres — метаданные отдельно от данных
  • AIRFLOW_CONN_* — connections из переменных окружения, без UI
  • Один контейнер Airflow = scheduler + dag-processor + api-server

Проверь себя


Что дальше?

Чтобы понять, как устроен Airflow внутри, читай архитектуру Airflow — компоненты, executor-ы и как они взаимодействуют.


Источники