Перейти к содержанию

SQLAlchemy и работа с БД

Зачем это DE?

Data Engineer работает с базами постоянно: читает из source, пишет в staging, обновляет DWH. SQLAlchemy — стандарт Python для работы с реляционными БД: от сырых запросов до ORM. А Alembic — миграции схемы без потери данных.


Архитектура SQLAlchemy

Text Only
┌─────────────────────────────────────────┐
│              SQLAlchemy ORM             │  ← Классы-модели, Session
├─────────────────────────────────────────┤
│            SQLAlchemy Core              │  ← Table, select(), insert()
├─────────────────────────────────────────┤
│               Engine                    │  ← Пул соединений + диалект
├─────────────────────────────────────────┤
│          DBAPI (psycopg2, asyncpg)      │  ← Драйвер БД
└─────────────────────────────────────────┘

Для DE обычно достаточно Core — ORM нужен для веб-приложений, а не для пайплайнов.


Engine и Connection

Создание Engine

Python
from sqlalchemy import create_engine

# Синхронный — psycopg2 (по умолчанию)
engine = create_engine(
    "postgresql://user:pass@localhost:5432/dwh",
    pool_size=5,           # соединений в пуле
    max_overflow=10,       # дополнительных сверх пула
    pool_timeout=30,       # ожидание свободного соединения
    pool_recycle=3600,     # пересоздавать через 1 час
    echo=False             # True — логирует все SQL
)

Никогда не храни пароли в коде

Используй переменные окружения: os.environ["DB_DSN"] или pydantic-settings.

Выполнение запросов

Python
from sqlalchemy import text

# Через context manager — соединение закрывается автоматически
with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM orders WHERE status = :status"),
                          {"status": "delivered"})
    for row in result:
        print(row.order_id, row.total_amount)

    # Commit вручную (autocommit выключен по умолчанию в 2.0)
    conn.execute(text("UPDATE orders SET status = 'archived' WHERE id = :id"),
                 {"id": 42})
    conn.commit()

begin() — автокоммит при успехе

Python
with engine.begin() as conn:
    # commit() вызовется автоматически при выходе без ошибки
    # rollback() — при исключении
    conn.execute(text("INSERT INTO logs (msg) VALUES (:msg)"),
                 {"msg": "pipeline started"})

Core: Table и MetaData

Определение таблиц

Python
from sqlalchemy import MetaData, Table, Column, Integer, String, Numeric, DateTime, ForeignKey

metadata = MetaData()

customers = Table("customers", metadata,
    Column("customer_id", Integer, primary_key=True),
    Column("name", String(100), nullable=False),
    Column("email", String(255), unique=True),
    Column("city", String(50)),
    Column("registered_at", DateTime)
)

orders = Table("orders", metadata,
    Column("order_id", Integer, primary_key=True),
    Column("customer_id", Integer, ForeignKey("customers.customer_id")),
    Column("total_amount", Numeric(12, 2)),
    Column("status", String(20))
)

Reflect — получить схему из БД

Python
metadata = MetaData()
metadata.reflect(bind=engine)  # читает ВСЕ таблицы из БД

orders = metadata.tables["orders"]
print(orders.columns.keys())  # ['order_id', 'customer_id', ...]

SELECT через Core

Python
from sqlalchemy import select, func

# Простой SELECT
stmt = select(customers).where(customers.c.city == "Москва")
with engine.connect() as conn:
    rows = conn.execute(stmt).fetchall()

# JOIN + GROUP BY
stmt = (
    select(
        customers.c.city,
        func.count(orders.c.order_id).label("order_count"),
        func.sum(orders.c.total_amount).label("revenue")
    )
    .join(orders, customers.c.customer_id == orders.c.customer_id)
    .group_by(customers.c.city)
    .order_by(func.sum(orders.c.total_amount).desc())
)

INSERT, UPDATE, DELETE

Python
from sqlalchemy import insert, update, delete

# INSERT
stmt = insert(customers).values(name="Иван", email="ivan@example.com", city="Москва")
with engine.begin() as conn:
    result = conn.execute(stmt)
    print(result.inserted_primary_key)  # (42,)

# Bulk INSERT — список словарей
data = [
    {"name": "Анна", "email": "anna@ex.com", "city": "СПб"},
    {"name": "Пётр", "email": "petr@ex.com", "city": "Казань"},
]
with engine.begin() as conn:
    conn.execute(insert(customers), data)

# UPDATE
stmt = update(customers).where(customers.c.city == "Москва").values(city="Moscow")

# DELETE
stmt = delete(orders).where(orders.c.status == "cancelled")

Массовая загрузка данных

executemany vs execute_values vs COPY

Метод Скорость (100K строк) Когда использовать
executemany ~30 сек Никогда для bulk
execute_values (psycopg2) ~3 сек Средние объёмы
COPY ~0.5 сек Большие объёмы

Быстрая загрузка через COPY

Python
import csv
import io

def bulk_load_copy(engine, table_name, records, columns):
    """Загрузка через COPY FROM — самый быстрый способ."""
    buffer = io.StringIO()
    writer = csv.writer(buffer, delimiter='\t')
    for record in records:
        writer.writerow([record.get(col) for col in columns])
    buffer.seek(0)

    raw_conn = engine.raw_connection()
    try:
        cursor = raw_conn.cursor()
        cursor.copy_from(buffer, table_name, columns=columns, sep='\t', null='')
        raw_conn.commit()
    finally:
        raw_conn.close()

pandas to_sql — быстро, но не всегда оптимально

Python
import pandas as pd

df = pd.read_csv("orders.csv")

# Медленно: INSERT по одной строке
df.to_sql("orders", engine, if_exists="append", index=False)

# Быстрее: через execute_values
from sqlalchemy import text
df.to_sql("orders", engine, if_exists="append", index=False,
          method="multi", chunksize=5000)

Для > 100K строк используй COPY

pandas.to_sql() — удобен для прототипов. Для продакшена — COPY FROM или psycopg2.copy_expert().


ORM — когда нужен

ORM полезен для CRUD-приложений (API, веб). Для ETL — обычно Core.

Python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

class Base(DeclarativeBase):
    pass

class Customer(Base):
    __tablename__ = "customers"

    customer_id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str | None] = mapped_column(String(255), unique=True)
    city: Mapped[str | None] = mapped_column(String(50))

# Запрос через ORM
with Session(engine) as session:
    customers = session.query(Customer).filter(Customer.city == "Москва").all()
    for c in customers:
        print(c.name, c.email)

Alembic — миграции

Alembic отслеживает изменения схемы и генерирует миграции (как Django migrations, но для любого проекта).

Инициализация

Bash
pip install alembic
alembic init migrations

Структура:

Text Only
migrations/
  env.py           # конфигурация
  versions/        # файлы миграций
alembic.ini        # connection string

Создание миграции

Bash
# Автогенерация из моделей
alembic revision --autogenerate -m "add city column to customers"

# Ручная
alembic revision -m "create partitions"

Файл миграции

Python
"""add city column to customers"""
from alembic import op
import sqlalchemy as sa

def upgrade():
    op.add_column("customers", sa.Column("city", sa.String(50)))

def downgrade():
    op.drop_column("customers", "city")

Применение

Bash
alembic upgrade head      # применить все
alembic upgrade +1        # одну вперёд
alembic downgrade -1      # откатить одну
alembic history           # история
alembic current           # текущая версия

Alembic в Docker

Запускай alembic upgrade head в init-контейнере или entrypoint до старта приложения.


Async: asyncpg + SQLAlchemy 2.0

Для высокой конкурентности (API-серверы, параллельные запросы):

Python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

async_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/dwh",
    pool_size=20
)

AsyncSessionLocal = sessionmaker(async_engine, class_=AsyncSession)

async def get_top_customers():
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(Customer).order_by(Customer.name).limit(10)
        )
        return result.scalars().all()

Паттерны для DE

Connection Factory

Python
from contextlib import contextmanager

@contextmanager
def get_engine(env: str = "prod"):
    """Создаёт engine по окружению, закрывает при выходе."""
    dsn = os.environ[f"DB_{env.upper()}_DSN"]
    engine = create_engine(dsn, pool_size=5)
    try:
        yield engine
    finally:
        engine.dispose()

with get_engine("staging") as engine:
    df = pd.read_sql("SELECT * FROM raw_orders", engine)

Upsert (INSERT ... ON CONFLICT)

Python
from sqlalchemy.dialects.postgresql import insert as pg_insert

stmt = pg_insert(customers).values(data)
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={
        "name": stmt.excluded.name,
        "city": stmt.excluded.city
    }
)
with engine.begin() as conn:
    conn.execute(stmt)

Что запомнить

Компонент Для чего Когда
Engine Пул соединений Всегда
Core SQL-выражения без ORM ETL, аналитика
ORM Объектная модель CRUD, API
Alembic Миграции схемы Все проекты с эволюцией схемы
COPY Массовая загрузка > 100K строк
asyncpg Асинхронные запросы API-серверы

Проверь себя


Источники