Декораторы, генераторы и контексты¶
Зачем это DE?¶
Data Engineer пишет не просто скрипты — он строит переиспользуемый, поддерживаемый код. Декораторы автоматизируют retry и логирование, генераторы позволяют обрабатывать гигабайты без OOM, а контекстные менеджеры гарантируют закрытие соединений.
Декораторы¶
Что такое декоратор¶
Декоратор — функция, которая принимает функцию и возвращает новую функцию (обёртку). Синтаксис @decorator — сахар для func = decorator(func).
import functools
import time
def timer(func):
"""Замеряет время выполнения."""
@functools.wraps(func) # сохраняет __name__, __doc__
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} выполнена за {elapsed:.2f}s")
return result
return wrapper
@timer
def load_data():
time.sleep(1)
return [1, 2, 3]
load_data() # load_data выполнена за 1.00s
Не забывай @functools.wraps
Без wraps обёртка «съедает» имя и docstring оригинальной функции. Это ломает логи, отладку и help().
Декоратор с параметрами¶
Декоратор с параметрами — это фабрика декораторов: функция, возвращающая декоратор.
def retry(max_attempts=3, delay=1.0):
"""Повторяет вызов при ошибке."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
print(f"Попытка {attempt}/{max_attempts} не удалась: {e}")
time.sleep(delay)
raise last_error
return wrapper
return decorator
@retry(max_attempts=5, delay=2.0)
def fetch_api(url):
...
Практические декораторы для DE¶
import logging
logger = logging.getLogger(__name__)
def log_calls(func):
"""Логирует вход и выход из функции."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
logger.info(f"→ {func.__name__}(args={args}, kwargs={kwargs})")
result = func(*args, **kwargs)
logger.info(f"← {func.__name__} вернула {type(result).__name__}")
return result
return wrapper
def validate_dataframe(required_columns):
"""Проверяет наличие колонок в DataFrame."""
def decorator(func):
@functools.wraps(func)
def wrapper(df, *args, **kwargs):
missing = set(required_columns) - set(df.columns)
if missing:
raise ValueError(f"Отсутствуют колонки: {missing}")
return func(df, *args, **kwargs)
return wrapper
return decorator
@validate_dataframe(["customer_id", "amount"])
def aggregate_sales(df):
return df.groupby("customer_id")["amount"].sum()
Генераторы¶
Обычная функция vs генератор¶
Обычная функция return — возвращает всё сразу и умирает. Генератор yield — отдаёт по одному элементу и замораживается до следующего вызова next().
# Обычная — загружает ВСЕ строки в память
def read_all_lines(path):
with open(path) as f:
return f.readlines() # 10 GB → OOM
# Генератор — читает по одной строке
def read_lines(path):
with open(path) as f:
for line in f:
yield line.strip() # одна строка в памяти
for line in read_lines("huge_file.csv"):
process(line)
Генератор для чтения чанками¶
def read_chunks(path, chunk_size=10_000):
"""Читает CSV чанками через pandas."""
import pandas as pd
for chunk in pd.read_csv(path, chunksize=chunk_size):
yield chunk
# Обработка 100 GB CSV при 50 MB RAM
for chunk in read_chunks("orders.csv", chunk_size=50_000):
transformed = transform(chunk)
load_to_db(transformed)
Генераторные выражения¶
Как list comprehension, но ленивые — не создают список в памяти:
# List comprehension — создаёт список (все в памяти)
squares = [x**2 for x in range(10_000_000)] # ~80 MB
# Generator expression — ленивое вычисление
squares = (x**2 for x in range(10_000_000)) # ~0 MB
total = sum(squares) # вычисляет по одному
yield from — делегирование генераторов¶
def read_multiple_files(paths):
for path in paths:
yield from read_lines(path)
# Эквивалентно, но чище чем:
# for line in read_lines(path):
# yield line
send() — двусторонняя коммуникация¶
def accumulator():
total = 0
while True:
value = yield total
if value is None:
break
total += value
acc = accumulator()
next(acc) # инициализация → 0
acc.send(10) # → 10
acc.send(20) # → 30
acc.send(5) # → 35
Контекстные менеджеры¶
with и протокол __enter__/__exit__¶
Контекстный менеджер гарантирует: ресурс будет закрыт, даже если произойдёт исключение.
class DBConnection:
def __init__(self, dsn):
self.dsn = dsn
self.conn = None
def __enter__(self):
self.conn = psycopg2.connect(self.dsn)
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
if exc_type:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
return False # не подавляем исключение
with DBConnection("postgresql://...") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO ...")
# commit автоматический, при ошибке — rollback
contextlib.contextmanager — генератор как менеджер¶
from contextlib import contextmanager
@contextmanager
def temp_table(conn, name, schema_sql):
"""Создаёт временную таблицу, удаляет после использования."""
cursor = conn.cursor()
cursor.execute(f"CREATE TEMP TABLE {name} ({schema_sql})")
try:
yield cursor
finally:
cursor.execute(f"DROP TABLE IF EXISTS {name}")
cursor.close()
with temp_table(conn, "staging", "id INT, val TEXT") as cur:
cur.execute("INSERT INTO staging VALUES (1, 'a')")
cur.execute("INSERT INTO target SELECT * FROM staging")
# staging автоматически удалена
Практический паттерн: пул соединений¶
@contextmanager
def get_connection(pool):
"""Берёт соединение из пула, возвращает после использования."""
conn = pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
pool.putconn(conn)
with get_connection(pool) as conn:
...
Dataclasses¶
Dataclass — класс для хранения данных без boilerplate:
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class PipelineConfig:
source: str
destination: str
batch_size: int = 10_000
created_at: datetime = field(default_factory=datetime.now)
tags: list[str] = field(default_factory=list)
config = PipelineConfig(
source="s3://raw/orders/",
destination="postgresql://dwh/public.orders",
batch_size=50_000,
tags=["daily", "critical"]
)
print(config)
# PipelineConfig(source='s3://raw/orders/', destination='postgresql://...', ...)
frozen=True — неизменяемый конфиг¶
@dataclass(frozen=True)
class TableSchema:
name: str
columns: tuple[str, ...]
primary_key: str
schema = TableSchema("orders", ("id", "customer_id", "amount"), "id")
# schema.name = "new" → FrozenInstanceError
__post_init__ — валидация¶
@dataclass
class DateRange:
start: datetime
end: datetime
def __post_init__(self):
if self.start >= self.end:
raise ValueError(f"start ({self.start}) must be before end ({self.end})")
Type Hints¶
Type hints не влияют на runtime, но помогают IDE, mypy и коллегам понять код:
from typing import Iterator, Optional
def extract_records(
path: str,
limit: Optional[int] = None,
columns: list[str] | None = None
) -> Iterator[dict[str, any]]:
"""Читает записи из файла."""
import json
with open(path) as f:
for i, line in enumerate(f):
if limit and i >= limit:
break
record = json.loads(line)
if columns:
record = {k: v for k, v in record.items() if k in columns}
yield record
TypeAlias и NewType¶
from typing import TypeAlias, NewType
# TypeAlias — просто псевдоним
Row: TypeAlias = dict[str, any]
DataFrame: TypeAlias = list[Row]
# NewType — отдельный тип (mypy различает)
UserId = NewType("UserId", int)
OrderId = NewType("OrderId", int)
def get_orders(user_id: UserId) -> list[OrderId]:
...
Паттерн: Pipeline из генераторов¶
Классический DE-паттерн — цепочка генераторов (Extract → Transform → Load):
def extract(path: str) -> Iterator[dict]:
"""Читает JSONL построчно."""
with open(path) as f:
for line in f:
yield json.loads(line)
def transform(records: Iterator[dict]) -> Iterator[dict]:
"""Фильтрует и обогащает."""
for record in records:
if record.get("status") == "active":
record["processed_at"] = datetime.now().isoformat()
yield record
def load(records: Iterator[dict], conn) -> int:
"""Вставляет пачками."""
batch = []
count = 0
for record in records:
batch.append(record)
if len(batch) >= 1000:
insert_batch(conn, batch)
count += len(batch)
batch.clear()
if batch:
insert_batch(conn, batch)
count += len(batch)
return count
# Вся цепочка ленивая — одна запись в памяти
raw = extract("events.jsonl")
clean = transform(raw)
total = load(clean, conn)
Что запомнить¶
| Концепция | Когда использовать | Пример в DE |
|---|---|---|
| Декораторы | Cross-cutting concerns (retry, log, validate) | @retry, @timer, @validate_dataframe |
| Генераторы | Потоковая обработка больших данных | Чтение файлов чанками, ETL-pipeline |
| Контексты | Гарантированная очистка ресурсов | DB-соединения, временные таблицы, lock-файлы |
| Dataclasses | Структурированные конфиги и DTO | PipelineConfig, TableSchema |
| Type Hints | Документация + статический анализ | Все публичные функции |