Операторы и провайдеры
Операторы и провайдеры Airflow¶
Оператор — единица работы в DAG. Каждый task — это экземпляр оператора. Airflow предоставляет десятки встроенных операторов, а провайдеры добавляют интеграции с внешними системами.
Базовые операторы¶
BashOperator¶
from airflow.operators.bash import BashOperator
run_dbt = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/dbt && dbt run --profiles-dir .",
env={"DBT_TARGET": "prod"},
)
PythonOperator¶
from airflow.operators.python import PythonOperator
def extract_data(**context):
import requests
response = requests.get("https://api.example.com/orders")
data = response.json()
# Передаём через XCom
context["ti"].xcom_push(key="orders", value=data)
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
EmptyOperator (раньше DummyOperator)¶
from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> [task_a, task_b] >> end
Провайдеры — интеграции¶
Провайдеры — отдельные пакеты с операторами, хуками и сенсорами для внешних систем.
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-google
PostgresOperator¶
from airflow.providers.postgres.operators.postgres import PostgresOperator
create_table = PostgresOperator(
task_id="create_staging",
postgres_conn_id="etl_db", # Connection в Airflow UI
sql="""
CREATE TABLE IF NOT EXISTS staging_orders (
order_id INT,
customer_id INT,
total NUMERIC,
loaded_at TIMESTAMP DEFAULT now()
);
""",
)
S3 операторы¶
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator
upload = S3CreateObjectOperator(
task_id="upload_to_s3",
s3_bucket="etl-kitchen-data",
s3_key="raw/orders/{{ ds }}/data.json",
data="{{ ti.xcom_pull(task_ids='extract', key='orders') }}",
aws_conn_id="aws_default",
)
Connections и Hooks¶
Connections¶
Connection — хранилище учётных данных в Airflow. Настраивается через UI (Admin → Connections) или CLI.
airflow connections add 'etl_db' \
--conn-type postgres \
--conn-host localhost \
--conn-port 5432 \
--conn-login etl_user \
--conn-password etl_pass \
--conn-schema etl_kitchen
Hooks — низкоуровневый доступ¶
Hook — обёртка над Connection для прямого взаимодействия с системой.
from airflow.providers.postgres.hooks.postgres import PostgresHook
def load_to_postgres(**context):
hook = PostgresHook(postgres_conn_id="etl_db")
conn = hook.get_conn()
cursor = conn.cursor()
data = context["ti"].xcom_pull(task_ids="transform")
for row in data:
cursor.execute(
"INSERT INTO orders (order_id, total) VALUES (%s, %s)",
(row["id"], row["total"])
)
conn.commit()
load = PythonOperator(
task_id="load",
python_callable=load_to_postgres,
)
# Bulk load через COPY (быстрее INSERT)
from io import StringIO
def bulk_load(**context):
hook = PostgresHook(postgres_conn_id="etl_db")
data = context["ti"].xcom_pull(task_ids="transform")
buffer = StringIO()
for row in data:
buffer.write(f"{row['id']}\t{row['total']}\n")
buffer.seek(0)
hook.bulk_load("staging_orders", buffer)
Jinja-шаблоны¶
Airflow использует Jinja2 для динамических параметров. Шаблоны подставляются во время выполнения.
# Дата выполнения
extract = BashOperator(
task_id="extract",
bash_command="python extract.py --date {{ ds }} --ts {{ ts }}",
)
# Предыдущий день
backfill = PostgresOperator(
task_id="delete_and_insert",
sql="""
DELETE FROM orders WHERE order_date = '{{ ds }}';
INSERT INTO orders
SELECT * FROM staging_orders
WHERE order_date = '{{ ds }}';
""",
)
Основные переменные¶
| Переменная | Пример | Описание |
|---|---|---|
{{ ds }} |
2025-03-15 |
Logical date (YYYY-MM-DD) |
{{ ds_nodash }} |
20250315 |
Без дефисов |
{{ ts }} |
2025-03-15T00:00:00+00:00 |
Timestamp |
{{ data_interval_start }} |
DateTime | Начало интервала |
{{ data_interval_end }} |
DateTime | Конец интервала |
{{ prev_ds }} |
2025-03-14 |
Предыдущий logical date |
{{ macros.ds_add(ds, 7) }} |
2025-03-22 |
+7 дней |
{{ params.table }} |
orders |
Пользовательский параметр |
Фильтры Jinja¶
# Форматирование
bash_command="echo {{ ds | replace('-', '/') }}"
# → echo 2025/03/15
# Использование params
PostgresOperator(
task_id="load",
params={"schema": "staging", "table": "orders"},
sql="TRUNCATE {{ params.schema }}.{{ params.table }};",
)
Sensors — ожидание условия¶
Сенсор — оператор, который ждёт, пока условие станет истинным.
from airflow.sensors.filesystem import FileSensor
from airflow.providers.postgres.sensors.postgres import PostgresSensor
# Ждать файл
wait_for_file = FileSensor(
task_id="wait_for_export",
filepath="/data/export/orders_{{ ds }}.csv",
poke_interval=60, # проверять каждые 60 секунд
timeout=3600, # максимум 1 час
mode="poke", # poke = занимает worker slot; reschedule = освобождает
)
# Ждать данные в таблице
wait_for_data = PostgresSensor(
task_id="wait_for_staging",
postgres_conn_id="etl_db",
sql="SELECT 1 FROM staging_orders WHERE date = '{{ ds }}' LIMIT 1",
mode="reschedule", # освобождает worker между проверками
)
mode='reschedule' для долгих ожиданий
poke держит worker slot всё время ожидания. reschedule освобождает slot между проверками — не блокирует другие задачи.
XCom — передача данных между задачами¶
# Автоматический push (return из python_callable)
def extract():
return {"orders": [1, 2, 3], "count": 3}
# Явный push/pull
def transform(**context):
data = context["ti"].xcom_pull(task_ids="extract")
result = [x * 2 for x in data["orders"]]
context["ti"].xcom_push(key="transformed", value=result)
XCom — не для больших данных
XCom хранится в metadata DB (обычно PostgreSQL). Передавай через XCom только метаданные (пути к файлам, количество строк, статусы). Для данных используй S3/GCS/файловую систему.
Проверь себя¶
Источники¶
- Airflow: Operators — официальная документация
- Airflow: Connections — настройка подключений
- Airflow: Templates — Jinja-переменные
- Airflow: Providers — каталог провайдеров