Логирование и конфигурация¶
Зачем это DE?¶
Пайплайн упал в 3 часа ночи. Без логов — чёрный ящик: что обработано, где ошибка, сколько строк загружено? Логирование — глаза пайплайна. Конфигурация — способ не хардкодить пароли и параметры.
Стандартный logging¶
Базовая настройка¶
Python
import logging
# Настройка — один раз при старте
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
logger.debug("Отладка") # не видно при INFO
logger.info("Старт пайплайна")
logger.warning("Файл пустой, пропускаем")
logger.error("Ошибка подключения к БД")
logger.critical("Диск полон, аварийная остановка")
Уровни¶
Text Only
DEBUG → Детальная отладка (запросы, ответы)
INFO → Ход выполнения (старт, завершение, счётчики)
WARNING → Не ошибка, но подозрительно (пустой файл, retry)
ERROR → Ошибка, но пайплайн продолжает работу
CRITICAL → Фатальная ошибка, аварийная остановка
Логгер на модуль¶
Python
# etl/extract.py
logger = logging.getLogger("etl.extract")
# etl/transform.py
logger = logging.getLogger("etl.transform")
# Управление уровнями по иерархии
logging.getLogger("etl").setLevel(logging.DEBUG) # всё
logging.getLogger("etl.extract").setLevel(logging.WARNING) # только предупреждения
Файл + консоль¶
Python
import logging
logger = logging.getLogger("pipeline")
logger.setLevel(logging.DEBUG)
# Консоль — INFO и выше
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
# Файл — DEBUG и выше (полная отладка)
file_handler = logging.FileHandler("pipeline.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
))
logger.addHandler(console)
logger.addHandler(file_handler)
structlog — структурированные логи¶
JSON-логи легко парсить (ELK, Loki, CloudWatch). structlog — стандарт для структурированного логирования.
Настройка¶
Python
import structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer() # → JSON
],
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger("pipeline")
Использование¶
Python
logger.info("extract_started", source="s3://raw/orders/", date="2025-01-15")
# {"event": "extract_started", "source": "s3://raw/orders/", "date": "2025-01-15",
# "level": "info", "timestamp": "2025-01-15T10:30:00Z"}
logger.info("transform_complete", rows_in=10000, rows_out=9847, dropped=153)
# {"event": "transform_complete", "rows_in": 10000, "rows_out": 9847, "dropped": 153, ...}
try:
process()
except Exception:
logger.exception("transform_failed", table="orders")
# автоматически добавляет traceback
Bind — контекст для всех последующих логов¶
Python
log = logger.bind(pipeline="daily_orders", run_id="abc123")
log.info("started")
# {"event": "started", "pipeline": "daily_orders", "run_id": "abc123", ...}
log.info("extract_done", rows=5000)
# {"event": "extract_done", "pipeline": "daily_orders", "run_id": "abc123", "rows": 5000, ...}
Конфигурация: pydantic-settings¶
Проблема: откуда брать параметры?¶
Python
# ❌ Хардкод
DB_HOST = "localhost"
DB_PASSWORD = "secret123"
# ❌ Голые os.environ — нет валидации, нет значений по умолчанию
DB_HOST = os.environ["DB_HOST"] # KeyError если не задано
# ✅ pydantic-settings — типизация + валидация + .env
pydantic-settings¶
Python
from pydantic_settings import BaseSettings
from pydantic import Field
class Settings(BaseSettings):
# Источники (приоритет: env > .env > default)
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
# База данных
db_host: str = "localhost"
db_port: int = 5432
db_name: str = "dwh"
db_user: str = "etl"
db_password: str # обязательное, нет default
# Пайплайн
batch_size: int = Field(default=10_000, ge=100, le=1_000_000)
log_level: str = "INFO"
debug: bool = False
@property
def db_dsn(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
settings = Settings()
# Читает из переменных окружения: DB_HOST, DB_PORT, DB_PASSWORD, ...
# Или из .env файла
.env файл¶
Bash
# .env
DB_HOST=pg-prod.internal
DB_PORT=5432
DB_NAME=dwh
DB_USER=etl_user
DB_PASSWORD=super_secret_123
BATCH_SIZE=50000
LOG_LEVEL=DEBUG
.env в .gitignore!
Никогда не коммить .env с паролями. Добавь в .gitignore и .dockerignore.
Вложенные настройки¶
Python
class DatabaseSettings(BaseSettings):
model_config = {"env_prefix": "DB_"}
host: str = "localhost"
port: int = 5432
name: str = "dwh"
user: str = "etl"
password: str
class KafkaSettings(BaseSettings):
model_config = {"env_prefix": "KAFKA_"}
brokers: str = "localhost:9092"
topic: str = "events"
group_id: str = "etl-consumer"
class Settings(BaseSettings):
db: DatabaseSettings = DatabaseSettings()
kafka: KafkaSettings = KafkaSettings()
batch_size: int = 10_000
Обработка ошибок в пайплайнах¶
Стратегия: fail-fast vs skip-and-log¶
| Стратегия | Когда | Пример |
|---|---|---|
| Fail-fast | Критические данные, нужна полнота | Финансовые транзакции |
| Skip-and-log | Допустима потеря < 1% строк | Логи событий, метрики |
| Dead Letter Queue | Нужно обработать позже | Kafka consumer, API webhook |
Skip-and-log паттерн¶
Python
import structlog
logger = structlog.get_logger()
def process_records(records):
success = 0
errors = 0
for record in records:
try:
transformed = transform(record)
load(transformed)
success += 1
except Exception:
errors += 1
logger.exception("record_failed",
record_id=record.get("id"),
record_preview=str(record)[:200])
if errors > len(records) * 0.05: # > 5% ошибок
raise RuntimeError(
f"Слишком много ошибок: {errors}/{success + errors}"
)
logger.info("batch_complete",
success=success, errors=errors,
error_rate=f"{errors/(success+errors)*100:.1f}%")
Retry с экспоненциальным backoff¶
Python
import time
import random
def retry_with_backoff(func, max_attempts=5, base_delay=1.0):
for attempt in range(1, max_attempts + 1):
try:
return func()
except Exception as e:
if attempt == max_attempts:
raise
delay = base_delay * (2 ** (attempt - 1))
jitter = random.uniform(0, delay * 0.1)
logger.warning("retry",
attempt=attempt,
delay=delay + jitter,
error=str(e))
time.sleep(delay + jitter)
CLI: typer¶
Для утилит и скриптов — typer (обёртка над Click с type hints):
Python
import typer
app = typer.Typer()
@app.command()
def run(
source: str = typer.Argument(..., help="Путь к файлу или DSN"),
batch_size: int = typer.Option(10_000, "--batch", "-b", help="Размер батча"),
dry_run: bool = typer.Option(False, "--dry-run", help="Только проверка"),
log_level: str = typer.Option("INFO", "--log-level", "-l")
):
"""Запуск ETL-пайплайна."""
setup_logging(log_level)
logger.info("pipeline_start", source=source, batch_size=batch_size)
if dry_run:
logger.info("dry_run_mode")
return
pipeline(source, batch_size)
if __name__ == "__main__":
app()
Собираем всё вместе¶
Python
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
model_config = {"env_file": ".env"}
db_dsn: str
batch_size: int = 10_000
log_level: str = "INFO"
settings = Settings()
# logging_setup.py
import structlog
def setup_logging(level: str = "INFO"):
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
)
# main.py
from config import settings
from logging_setup import setup_logging
setup_logging(settings.log_level)
logger = structlog.get_logger().bind(pipeline="daily_orders")
logger.info("started", batch_size=settings.batch_size)
# {"event": "started", "pipeline": "daily_orders", "batch_size": 10000, ...}