Перейти к содержанию

Декораторы, генераторы и контексты

Зачем это DE?

Data Engineer пишет не просто скрипты — он строит переиспользуемый, поддерживаемый код. Декораторы автоматизируют retry и логирование, генераторы позволяют обрабатывать гигабайты без OOM, а контекстные менеджеры гарантируют закрытие соединений.


Декораторы

Что такое декоратор

Декоратор — функция, которая принимает функцию и возвращает новую функцию (обёртку). Синтаксис @decorator — сахар для func = decorator(func).

Python
import functools
import time

def timer(func):
    """Замеряет время выполнения."""
    @functools.wraps(func)  # сохраняет __name__, __doc__
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} выполнена за {elapsed:.2f}s")
        return result
    return wrapper

@timer
def load_data():
    time.sleep(1)
    return [1, 2, 3]

load_data()  # load_data выполнена за 1.00s

Не забывай @functools.wraps

Без wraps обёртка «съедает» имя и docstring оригинальной функции. Это ломает логи, отладку и help().

Декоратор с параметрами

Декоратор с параметрами — это фабрика декораторов: функция, возвращающая декоратор.

Python
def retry(max_attempts=3, delay=1.0):
    """Повторяет вызов при ошибке."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    print(f"Попытка {attempt}/{max_attempts} не удалась: {e}")
                    time.sleep(delay)
            raise last_error
        return wrapper
    return decorator

@retry(max_attempts=5, delay=2.0)
def fetch_api(url):
    ...

Практические декораторы для DE

Python
import logging

logger = logging.getLogger(__name__)

def log_calls(func):
    """Логирует вход и выход из функции."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        logger.info(f"→ {func.__name__}(args={args}, kwargs={kwargs})")
        result = func(*args, **kwargs)
        logger.info(f"← {func.__name__} вернула {type(result).__name__}")
        return result
    return wrapper

def validate_dataframe(required_columns):
    """Проверяет наличие колонок в DataFrame."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(df, *args, **kwargs):
            missing = set(required_columns) - set(df.columns)
            if missing:
                raise ValueError(f"Отсутствуют колонки: {missing}")
            return func(df, *args, **kwargs)
        return wrapper
    return decorator

@validate_dataframe(["customer_id", "amount"])
def aggregate_sales(df):
    return df.groupby("customer_id")["amount"].sum()

Генераторы

Обычная функция vs генератор

Обычная функция return — возвращает всё сразу и умирает. Генератор yield — отдаёт по одному элементу и замораживается до следующего вызова next().

Python
# Обычная — загружает ВСЕ строки в память
def read_all_lines(path):
    with open(path) as f:
        return f.readlines()  # 10 GB → OOM

# Генератор — читает по одной строке
def read_lines(path):
    with open(path) as f:
        for line in f:
            yield line.strip()  # одна строка в памяти

for line in read_lines("huge_file.csv"):
    process(line)

Генератор для чтения чанками

Python
def read_chunks(path, chunk_size=10_000):
    """Читает CSV чанками через pandas."""
    import pandas as pd
    for chunk in pd.read_csv(path, chunksize=chunk_size):
        yield chunk

# Обработка 100 GB CSV при 50 MB RAM
for chunk in read_chunks("orders.csv", chunk_size=50_000):
    transformed = transform(chunk)
    load_to_db(transformed)

Генераторные выражения

Как list comprehension, но ленивые — не создают список в памяти:

Python
# List comprehension — создаёт список (все в памяти)
squares = [x**2 for x in range(10_000_000)]  # ~80 MB

# Generator expression — ленивое вычисление
squares = (x**2 for x in range(10_000_000))  # ~0 MB
total = sum(squares)  # вычисляет по одному

yield from — делегирование генераторов

Python
def read_multiple_files(paths):
    for path in paths:
        yield from read_lines(path)

# Эквивалентно, но чище чем:
# for line in read_lines(path):
#     yield line

send() — двусторонняя коммуникация

Python
def accumulator():
    total = 0
    while True:
        value = yield total
        if value is None:
            break
        total += value

acc = accumulator()
next(acc)         # инициализация → 0
acc.send(10)      # → 10
acc.send(20)      # → 30
acc.send(5)       # → 35

Контекстные менеджеры

with и протокол __enter__/__exit__

Контекстный менеджер гарантирует: ресурс будет закрыт, даже если произойдёт исключение.

Python
class DBConnection:
    def __init__(self, dsn):
        self.dsn = dsn
        self.conn = None

    def __enter__(self):
        self.conn = psycopg2.connect(self.dsn)
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            if exc_type:
                self.conn.rollback()
            else:
                self.conn.commit()
            self.conn.close()
        return False  # не подавляем исключение

with DBConnection("postgresql://...") as conn:
    cursor = conn.cursor()
    cursor.execute("INSERT INTO ...")
# commit автоматический, при ошибке — rollback

contextlib.contextmanager — генератор как менеджер

Python
from contextlib import contextmanager

@contextmanager
def temp_table(conn, name, schema_sql):
    """Создаёт временную таблицу, удаляет после использования."""
    cursor = conn.cursor()
    cursor.execute(f"CREATE TEMP TABLE {name} ({schema_sql})")
    try:
        yield cursor
    finally:
        cursor.execute(f"DROP TABLE IF EXISTS {name}")
        cursor.close()

with temp_table(conn, "staging", "id INT, val TEXT") as cur:
    cur.execute("INSERT INTO staging VALUES (1, 'a')")
    cur.execute("INSERT INTO target SELECT * FROM staging")
# staging автоматически удалена

Практический паттерн: пул соединений

Python
@contextmanager
def get_connection(pool):
    """Берёт соединение из пула, возвращает после использования."""
    conn = pool.getconn()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        pool.putconn(conn)

with get_connection(pool) as conn:
    ...

Dataclasses

Dataclass — класс для хранения данных без boilerplate:

Python
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class PipelineConfig:
    source: str
    destination: str
    batch_size: int = 10_000
    created_at: datetime = field(default_factory=datetime.now)
    tags: list[str] = field(default_factory=list)

config = PipelineConfig(
    source="s3://raw/orders/",
    destination="postgresql://dwh/public.orders",
    batch_size=50_000,
    tags=["daily", "critical"]
)
print(config)
# PipelineConfig(source='s3://raw/orders/', destination='postgresql://...', ...)

frozen=True — неизменяемый конфиг

Python
@dataclass(frozen=True)
class TableSchema:
    name: str
    columns: tuple[str, ...]
    primary_key: str

schema = TableSchema("orders", ("id", "customer_id", "amount"), "id")
# schema.name = "new"  → FrozenInstanceError

__post_init__ — валидация

Python
@dataclass
class DateRange:
    start: datetime
    end: datetime

    def __post_init__(self):
        if self.start >= self.end:
            raise ValueError(f"start ({self.start}) must be before end ({self.end})")

Type Hints

Type hints не влияют на runtime, но помогают IDE, mypy и коллегам понять код:

Python
from typing import Iterator, Optional

def extract_records(
    path: str,
    limit: Optional[int] = None,
    columns: list[str] | None = None
) -> Iterator[dict[str, any]]:
    """Читает записи из файла."""
    import json
    with open(path) as f:
        for i, line in enumerate(f):
            if limit and i >= limit:
                break
            record = json.loads(line)
            if columns:
                record = {k: v for k, v in record.items() if k in columns}
            yield record

TypeAlias и NewType

Python
from typing import TypeAlias, NewType

# TypeAlias — просто псевдоним
Row: TypeAlias = dict[str, any]
DataFrame: TypeAlias = list[Row]

# NewType — отдельный тип (mypy различает)
UserId = NewType("UserId", int)
OrderId = NewType("OrderId", int)

def get_orders(user_id: UserId) -> list[OrderId]:
    ...

Паттерн: Pipeline из генераторов

Классический DE-паттерн — цепочка генераторов (Extract → Transform → Load):

Python
def extract(path: str) -> Iterator[dict]:
    """Читает JSONL построчно."""
    with open(path) as f:
        for line in f:
            yield json.loads(line)

def transform(records: Iterator[dict]) -> Iterator[dict]:
    """Фильтрует и обогащает."""
    for record in records:
        if record.get("status") == "active":
            record["processed_at"] = datetime.now().isoformat()
            yield record

def load(records: Iterator[dict], conn) -> int:
    """Вставляет пачками."""
    batch = []
    count = 0
    for record in records:
        batch.append(record)
        if len(batch) >= 1000:
            insert_batch(conn, batch)
            count += len(batch)
            batch.clear()
    if batch:
        insert_batch(conn, batch)
        count += len(batch)
    return count

# Вся цепочка ленивая — одна запись в памяти
raw = extract("events.jsonl")
clean = transform(raw)
total = load(clean, conn)

Что запомнить

Концепция Когда использовать Пример в DE
Декораторы Cross-cutting concerns (retry, log, validate) @retry, @timer, @validate_dataframe
Генераторы Потоковая обработка больших данных Чтение файлов чанками, ETL-pipeline
Контексты Гарантированная очистка ресурсов DB-соединения, временные таблицы, lock-файлы
Dataclasses Структурированные конфиги и DTO PipelineConfig, TableSchema
Type Hints Документация + статический анализ Все публичные функции

Проверь себя


Источники