Перейти к содержанию

Логирование и конфигурация

Зачем это DE?

Пайплайн упал в 3 часа ночи. Без логов — чёрный ящик: что обработано, где ошибка, сколько строк загружено? Логирование — глаза пайплайна. Конфигурация — способ не хардкодить пароли и параметры.


Стандартный logging

Базовая настройка

Python
import logging

# Настройка — один раз при старте
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

logger = logging.getLogger(__name__)

logger.debug("Отладка")      # не видно при INFO
logger.info("Старт пайплайна")
logger.warning("Файл пустой, пропускаем")
logger.error("Ошибка подключения к БД")
logger.critical("Диск полон, аварийная остановка")

Уровни

Text Only
DEBUG    → Детальная отладка (запросы, ответы)
INFO     → Ход выполнения (старт, завершение, счётчики)
WARNING  → Не ошибка, но подозрительно (пустой файл, retry)
ERROR    → Ошибка, но пайплайн продолжает работу
CRITICAL → Фатальная ошибка, аварийная остановка

Логгер на модуль

Python
# etl/extract.py
logger = logging.getLogger("etl.extract")

# etl/transform.py
logger = logging.getLogger("etl.transform")

# Управление уровнями по иерархии
logging.getLogger("etl").setLevel(logging.DEBUG)        # всё
logging.getLogger("etl.extract").setLevel(logging.WARNING)  # только предупреждения

Файл + консоль

Python
import logging

logger = logging.getLogger("pipeline")
logger.setLevel(logging.DEBUG)

# Консоль — INFO и выше
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))

# Файл — DEBUG и выше (полная отладка)
file_handler = logging.FileHandler("pipeline.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
    "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
))

logger.addHandler(console)
logger.addHandler(file_handler)

structlog — структурированные логи

JSON-логи легко парсить (ELK, Loki, CloudWatch). structlog — стандарт для структурированного логирования.

Bash
pip install structlog

Настройка

Python
import structlog

structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()  # → JSON
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger("pipeline")

Использование

Python
logger.info("extract_started", source="s3://raw/orders/", date="2025-01-15")
# {"event": "extract_started", "source": "s3://raw/orders/", "date": "2025-01-15",
#  "level": "info", "timestamp": "2025-01-15T10:30:00Z"}

logger.info("transform_complete", rows_in=10000, rows_out=9847, dropped=153)
# {"event": "transform_complete", "rows_in": 10000, "rows_out": 9847, "dropped": 153, ...}

try:
    process()
except Exception:
    logger.exception("transform_failed", table="orders")
    # автоматически добавляет traceback

Bind — контекст для всех последующих логов

Python
log = logger.bind(pipeline="daily_orders", run_id="abc123")

log.info("started")
# {"event": "started", "pipeline": "daily_orders", "run_id": "abc123", ...}

log.info("extract_done", rows=5000)
# {"event": "extract_done", "pipeline": "daily_orders", "run_id": "abc123", "rows": 5000, ...}

Конфигурация: pydantic-settings

Проблема: откуда брать параметры?

Python
# ❌ Хардкод
DB_HOST = "localhost"
DB_PASSWORD = "secret123"

# ❌ Голые os.environ — нет валидации, нет значений по умолчанию
DB_HOST = os.environ["DB_HOST"]  # KeyError если не задано

# ✅ pydantic-settings — типизация + валидация + .env

pydantic-settings

Bash
pip install pydantic-settings
Python
from pydantic_settings import BaseSettings
from pydantic import Field

class Settings(BaseSettings):
    # Источники (приоритет: env > .env > default)
    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

    # База данных
    db_host: str = "localhost"
    db_port: int = 5432
    db_name: str = "dwh"
    db_user: str = "etl"
    db_password: str  # обязательное, нет default

    # Пайплайн
    batch_size: int = Field(default=10_000, ge=100, le=1_000_000)
    log_level: str = "INFO"
    debug: bool = False

    @property
    def db_dsn(self) -> str:
        return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"

settings = Settings()
# Читает из переменных окружения: DB_HOST, DB_PORT, DB_PASSWORD, ...
# Или из .env файла

.env файл

Bash
# .env
DB_HOST=pg-prod.internal
DB_PORT=5432
DB_NAME=dwh
DB_USER=etl_user
DB_PASSWORD=super_secret_123
BATCH_SIZE=50000
LOG_LEVEL=DEBUG

.env в .gitignore!

Никогда не коммить .env с паролями. Добавь в .gitignore и .dockerignore.

Вложенные настройки

Python
class DatabaseSettings(BaseSettings):
    model_config = {"env_prefix": "DB_"}
    host: str = "localhost"
    port: int = 5432
    name: str = "dwh"
    user: str = "etl"
    password: str

class KafkaSettings(BaseSettings):
    model_config = {"env_prefix": "KAFKA_"}
    brokers: str = "localhost:9092"
    topic: str = "events"
    group_id: str = "etl-consumer"

class Settings(BaseSettings):
    db: DatabaseSettings = DatabaseSettings()
    kafka: KafkaSettings = KafkaSettings()
    batch_size: int = 10_000

Обработка ошибок в пайплайнах

Стратегия: fail-fast vs skip-and-log

Стратегия Когда Пример
Fail-fast Критические данные, нужна полнота Финансовые транзакции
Skip-and-log Допустима потеря < 1% строк Логи событий, метрики
Dead Letter Queue Нужно обработать позже Kafka consumer, API webhook

Skip-and-log паттерн

Python
import structlog

logger = structlog.get_logger()

def process_records(records):
    success = 0
    errors = 0
    for record in records:
        try:
            transformed = transform(record)
            load(transformed)
            success += 1
        except Exception:
            errors += 1
            logger.exception("record_failed",
                           record_id=record.get("id"),
                           record_preview=str(record)[:200])
            if errors > len(records) * 0.05:  # > 5% ошибок
                raise RuntimeError(
                    f"Слишком много ошибок: {errors}/{success + errors}"
                )

    logger.info("batch_complete",
               success=success, errors=errors,
               error_rate=f"{errors/(success+errors)*100:.1f}%")

Retry с экспоненциальным backoff

Python
import time
import random

def retry_with_backoff(func, max_attempts=5, base_delay=1.0):
    for attempt in range(1, max_attempts + 1):
        try:
            return func()
        except Exception as e:
            if attempt == max_attempts:
                raise
            delay = base_delay * (2 ** (attempt - 1))
            jitter = random.uniform(0, delay * 0.1)
            logger.warning("retry",
                         attempt=attempt,
                         delay=delay + jitter,
                         error=str(e))
            time.sleep(delay + jitter)

CLI: typer

Для утилит и скриптов — typer (обёртка над Click с type hints):

Python
import typer

app = typer.Typer()

@app.command()
def run(
    source: str = typer.Argument(..., help="Путь к файлу или DSN"),
    batch_size: int = typer.Option(10_000, "--batch", "-b", help="Размер батча"),
    dry_run: bool = typer.Option(False, "--dry-run", help="Только проверка"),
    log_level: str = typer.Option("INFO", "--log-level", "-l")
):
    """Запуск ETL-пайплайна."""
    setup_logging(log_level)
    logger.info("pipeline_start", source=source, batch_size=batch_size)

    if dry_run:
        logger.info("dry_run_mode")
        return

    pipeline(source, batch_size)

if __name__ == "__main__":
    app()
Bash
python pipeline.py /data/orders.csv --batch 50000 --log-level DEBUG
python pipeline.py --help

Собираем всё вместе

Python
# config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    model_config = {"env_file": ".env"}
    db_dsn: str
    batch_size: int = 10_000
    log_level: str = "INFO"

settings = Settings()

# logging_setup.py
import structlog

def setup_logging(level: str = "INFO"):
    structlog.configure(
        processors=[
            structlog.stdlib.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.JSONRenderer()
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
    )

# main.py
from config import settings
from logging_setup import setup_logging

setup_logging(settings.log_level)
logger = structlog.get_logger().bind(pipeline="daily_orders")

logger.info("started", batch_size=settings.batch_size)
# {"event": "started", "pipeline": "daily_orders", "batch_size": 10000, ...}

Проверь себя


Источники