Перейти к содержанию

Операторы и провайдеры

Операторы и провайдеры Airflow

Оператор — единица работы в DAG. Каждый task — это экземпляр оператора. Airflow предоставляет десятки встроенных операторов, а провайдеры добавляют интеграции с внешними системами.


Базовые операторы

BashOperator

Python
from airflow.operators.bash import BashOperator

run_dbt = BashOperator(
    task_id="dbt_run",
    bash_command="cd /opt/dbt && dbt run --profiles-dir .",
    env={"DBT_TARGET": "prod"},
)

PythonOperator

Python
from airflow.operators.python import PythonOperator

def extract_data(**context):
    import requests
    response = requests.get("https://api.example.com/orders")
    data = response.json()
    # Передаём через XCom
    context["ti"].xcom_push(key="orders", value=data)

extract = PythonOperator(
    task_id="extract",
    python_callable=extract_data,
)

EmptyOperator (раньше DummyOperator)

Python
from airflow.operators.empty import EmptyOperator

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

start >> [task_a, task_b] >> end

Провайдеры — интеграции

Провайдеры — отдельные пакеты с операторами, хуками и сенсорами для внешних систем.

Bash
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google

PostgresOperator

Python
from airflow.providers.postgres.operators.postgres import PostgresOperator

create_table = PostgresOperator(
    task_id="create_staging",
    postgres_conn_id="etl_db",  # Connection в Airflow UI
    sql="""
        CREATE TABLE IF NOT EXISTS staging_orders (
            order_id INT,
            customer_id INT,
            total NUMERIC,
            loaded_at TIMESTAMP DEFAULT now()
        );
    """,
)

S3 операторы

Python
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator

upload = S3CreateObjectOperator(
    task_id="upload_to_s3",
    s3_bucket="etl-kitchen-data",
    s3_key="raw/orders/{{ ds }}/data.json",
    data="{{ ti.xcom_pull(task_ids='extract', key='orders') }}",
    aws_conn_id="aws_default",
)

Connections и Hooks

Connections

Connection — хранилище учётных данных в Airflow. Настраивается через UI (Admin → Connections) или CLI.

Bash
airflow connections add 'etl_db' \
    --conn-type postgres \
    --conn-host localhost \
    --conn-port 5432 \
    --conn-login etl_user \
    --conn-password etl_pass \
    --conn-schema etl_kitchen

Hooks — низкоуровневый доступ

Hook — обёртка над Connection для прямого взаимодействия с системой.

Python
from airflow.providers.postgres.hooks.postgres import PostgresHook

def load_to_postgres(**context):
    hook = PostgresHook(postgres_conn_id="etl_db")
    conn = hook.get_conn()
    cursor = conn.cursor()

    data = context["ti"].xcom_pull(task_ids="transform")
    for row in data:
        cursor.execute(
            "INSERT INTO orders (order_id, total) VALUES (%s, %s)",
            (row["id"], row["total"])
        )
    conn.commit()

load = PythonOperator(
    task_id="load",
    python_callable=load_to_postgres,
)
Python
# Bulk load через COPY (быстрее INSERT)
from io import StringIO

def bulk_load(**context):
    hook = PostgresHook(postgres_conn_id="etl_db")
    data = context["ti"].xcom_pull(task_ids="transform")

    buffer = StringIO()
    for row in data:
        buffer.write(f"{row['id']}\t{row['total']}\n")
    buffer.seek(0)

    hook.bulk_load("staging_orders", buffer)

Jinja-шаблоны

Airflow использует Jinja2 для динамических параметров. Шаблоны подставляются во время выполнения.

Python
# Дата выполнения
extract = BashOperator(
    task_id="extract",
    bash_command="python extract.py --date {{ ds }} --ts {{ ts }}",
)

# Предыдущий день
backfill = PostgresOperator(
    task_id="delete_and_insert",
    sql="""
        DELETE FROM orders WHERE order_date = '{{ ds }}';
        INSERT INTO orders
        SELECT * FROM staging_orders
        WHERE order_date = '{{ ds }}';
    """,
)

Основные переменные

Переменная Пример Описание
{{ ds }} 2025-03-15 Logical date (YYYY-MM-DD)
{{ ds_nodash }} 20250315 Без дефисов
{{ ts }} 2025-03-15T00:00:00+00:00 Timestamp
{{ data_interval_start }} DateTime Начало интервала
{{ data_interval_end }} DateTime Конец интервала
{{ prev_ds }} 2025-03-14 Предыдущий logical date
{{ macros.ds_add(ds, 7) }} 2025-03-22 +7 дней
{{ params.table }} orders Пользовательский параметр

Фильтры Jinja

Python
# Форматирование
bash_command="echo {{ ds | replace('-', '/') }}"
# → echo 2025/03/15

# Использование params
PostgresOperator(
    task_id="load",
    params={"schema": "staging", "table": "orders"},
    sql="TRUNCATE {{ params.schema }}.{{ params.table }};",
)

Sensors — ожидание условия

Сенсор — оператор, который ждёт, пока условие станет истинным.

Python
from airflow.sensors.filesystem import FileSensor
from airflow.providers.postgres.sensors.postgres import PostgresSensor

# Ждать файл
wait_for_file = FileSensor(
    task_id="wait_for_export",
    filepath="/data/export/orders_{{ ds }}.csv",
    poke_interval=60,      # проверять каждые 60 секунд
    timeout=3600,           # максимум 1 час
    mode="poke",            # poke = занимает worker slot; reschedule = освобождает
)

# Ждать данные в таблице
wait_for_data = PostgresSensor(
    task_id="wait_for_staging",
    postgres_conn_id="etl_db",
    sql="SELECT 1 FROM staging_orders WHERE date = '{{ ds }}' LIMIT 1",
    mode="reschedule",      # освобождает worker между проверками
)

mode='reschedule' для долгих ожиданий

poke держит worker slot всё время ожидания. reschedule освобождает slot между проверками — не блокирует другие задачи.


XCom — передача данных между задачами

Python
# Автоматический push (return из python_callable)
def extract():
    return {"orders": [1, 2, 3], "count": 3}

# Явный push/pull
def transform(**context):
    data = context["ti"].xcom_pull(task_ids="extract")
    result = [x * 2 for x in data["orders"]]
    context["ti"].xcom_push(key="transformed", value=result)

XCom — не для больших данных

XCom хранится в metadata DB (обычно PostgreSQL). Передавай через XCom только метаданные (пути к файлам, количество строк, статусы). Для данных используй S3/GCS/файловую систему.


Проверь себя


Источники