SQLAlchemy и работа с БД¶
Зачем это DE?¶
Data Engineer работает с базами постоянно: читает из source, пишет в staging, обновляет DWH. SQLAlchemy — стандарт Python для работы с реляционными БД: от сырых запросов до ORM. А Alembic — миграции схемы без потери данных.
Архитектура SQLAlchemy¶
┌─────────────────────────────────────────┐
│ SQLAlchemy ORM │ ← Классы-модели, Session
├─────────────────────────────────────────┤
│ SQLAlchemy Core │ ← Table, select(), insert()
├─────────────────────────────────────────┤
│ Engine │ ← Пул соединений + диалект
├─────────────────────────────────────────┤
│ DBAPI (psycopg2, asyncpg) │ ← Драйвер БД
└─────────────────────────────────────────┘
Для DE обычно достаточно Core — ORM нужен для веб-приложений, а не для пайплайнов.
Engine и Connection¶
Создание Engine¶
from sqlalchemy import create_engine
# Синхронный — psycopg2 (по умолчанию)
engine = create_engine(
"postgresql://user:pass@localhost:5432/dwh",
pool_size=5, # соединений в пуле
max_overflow=10, # дополнительных сверх пула
pool_timeout=30, # ожидание свободного соединения
pool_recycle=3600, # пересоздавать через 1 час
echo=False # True — логирует все SQL
)
Никогда не храни пароли в коде
Используй переменные окружения: os.environ["DB_DSN"] или pydantic-settings.
Выполнение запросов¶
from sqlalchemy import text
# Через context manager — соединение закрывается автоматически
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM orders WHERE status = :status"),
{"status": "delivered"})
for row in result:
print(row.order_id, row.total_amount)
# Commit вручную (autocommit выключен по умолчанию в 2.0)
conn.execute(text("UPDATE orders SET status = 'archived' WHERE id = :id"),
{"id": 42})
conn.commit()
begin() — автокоммит при успехе¶
with engine.begin() as conn:
# commit() вызовется автоматически при выходе без ошибки
# rollback() — при исключении
conn.execute(text("INSERT INTO logs (msg) VALUES (:msg)"),
{"msg": "pipeline started"})
Core: Table и MetaData¶
Определение таблиц¶
from sqlalchemy import MetaData, Table, Column, Integer, String, Numeric, DateTime, ForeignKey
metadata = MetaData()
customers = Table("customers", metadata,
Column("customer_id", Integer, primary_key=True),
Column("name", String(100), nullable=False),
Column("email", String(255), unique=True),
Column("city", String(50)),
Column("registered_at", DateTime)
)
orders = Table("orders", metadata,
Column("order_id", Integer, primary_key=True),
Column("customer_id", Integer, ForeignKey("customers.customer_id")),
Column("total_amount", Numeric(12, 2)),
Column("status", String(20))
)
Reflect — получить схему из БД¶
metadata = MetaData()
metadata.reflect(bind=engine) # читает ВСЕ таблицы из БД
orders = metadata.tables["orders"]
print(orders.columns.keys()) # ['order_id', 'customer_id', ...]
SELECT через Core¶
from sqlalchemy import select, func
# Простой SELECT
stmt = select(customers).where(customers.c.city == "Москва")
with engine.connect() as conn:
rows = conn.execute(stmt).fetchall()
# JOIN + GROUP BY
stmt = (
select(
customers.c.city,
func.count(orders.c.order_id).label("order_count"),
func.sum(orders.c.total_amount).label("revenue")
)
.join(orders, customers.c.customer_id == orders.c.customer_id)
.group_by(customers.c.city)
.order_by(func.sum(orders.c.total_amount).desc())
)
INSERT, UPDATE, DELETE¶
from sqlalchemy import insert, update, delete
# INSERT
stmt = insert(customers).values(name="Иван", email="ivan@example.com", city="Москва")
with engine.begin() as conn:
result = conn.execute(stmt)
print(result.inserted_primary_key) # (42,)
# Bulk INSERT — список словарей
data = [
{"name": "Анна", "email": "anna@ex.com", "city": "СПб"},
{"name": "Пётр", "email": "petr@ex.com", "city": "Казань"},
]
with engine.begin() as conn:
conn.execute(insert(customers), data)
# UPDATE
stmt = update(customers).where(customers.c.city == "Москва").values(city="Moscow")
# DELETE
stmt = delete(orders).where(orders.c.status == "cancelled")
Массовая загрузка данных¶
executemany vs execute_values vs COPY¶
| Метод | Скорость (100K строк) | Когда использовать |
|---|---|---|
executemany |
~30 сек | Никогда для bulk |
execute_values (psycopg2) |
~3 сек | Средние объёмы |
COPY |
~0.5 сек | Большие объёмы |
Быстрая загрузка через COPY¶
import csv
import io
def bulk_load_copy(engine, table_name, records, columns):
"""Загрузка через COPY FROM — самый быстрый способ."""
buffer = io.StringIO()
writer = csv.writer(buffer, delimiter='\t')
for record in records:
writer.writerow([record.get(col) for col in columns])
buffer.seek(0)
raw_conn = engine.raw_connection()
try:
cursor = raw_conn.cursor()
cursor.copy_from(buffer, table_name, columns=columns, sep='\t', null='')
raw_conn.commit()
finally:
raw_conn.close()
pandas to_sql — быстро, но не всегда оптимально¶
import pandas as pd
df = pd.read_csv("orders.csv")
# Медленно: INSERT по одной строке
df.to_sql("orders", engine, if_exists="append", index=False)
# Быстрее: через execute_values
from sqlalchemy import text
df.to_sql("orders", engine, if_exists="append", index=False,
method="multi", chunksize=5000)
Для > 100K строк используй COPY
pandas.to_sql() — удобен для прототипов. Для продакшена — COPY FROM или psycopg2.copy_expert().
ORM — когда нужен¶
ORM полезен для CRUD-приложений (API, веб). Для ETL — обычно Core.
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customers"
customer_id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str | None] = mapped_column(String(255), unique=True)
city: Mapped[str | None] = mapped_column(String(50))
# Запрос через ORM
with Session(engine) as session:
customers = session.query(Customer).filter(Customer.city == "Москва").all()
for c in customers:
print(c.name, c.email)
Alembic — миграции¶
Alembic отслеживает изменения схемы и генерирует миграции (как Django migrations, но для любого проекта).
Инициализация¶
Структура:
migrations/
env.py # конфигурация
versions/ # файлы миграций
alembic.ini # connection string
Создание миграции¶
# Автогенерация из моделей
alembic revision --autogenerate -m "add city column to customers"
# Ручная
alembic revision -m "create partitions"
Файл миграции¶
"""add city column to customers"""
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column("customers", sa.Column("city", sa.String(50)))
def downgrade():
op.drop_column("customers", "city")
Применение¶
alembic upgrade head # применить все
alembic upgrade +1 # одну вперёд
alembic downgrade -1 # откатить одну
alembic history # история
alembic current # текущая версия
Alembic в Docker
Запускай alembic upgrade head в init-контейнере или entrypoint до старта приложения.
Async: asyncpg + SQLAlchemy 2.0¶
Для высокой конкурентности (API-серверы, параллельные запросы):
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/dwh",
pool_size=20
)
AsyncSessionLocal = sessionmaker(async_engine, class_=AsyncSession)
async def get_top_customers():
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Customer).order_by(Customer.name).limit(10)
)
return result.scalars().all()
Паттерны для DE¶
Connection Factory¶
from contextlib import contextmanager
@contextmanager
def get_engine(env: str = "prod"):
"""Создаёт engine по окружению, закрывает при выходе."""
dsn = os.environ[f"DB_{env.upper()}_DSN"]
engine = create_engine(dsn, pool_size=5)
try:
yield engine
finally:
engine.dispose()
with get_engine("staging") as engine:
df = pd.read_sql("SELECT * FROM raw_orders", engine)
Upsert (INSERT ... ON CONFLICT)¶
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(customers).values(data)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={
"name": stmt.excluded.name,
"city": stmt.excluded.city
}
)
with engine.begin() as conn:
conn.execute(stmt)
Что запомнить¶
| Компонент | Для чего | Когда |
|---|---|---|
| Engine | Пул соединений | Всегда |
| Core | SQL-выражения без ORM | ETL, аналитика |
| ORM | Объектная модель | CRUD, API |
| Alembic | Миграции схемы | Все проекты с эволюцией схемы |
| COPY | Массовая загрузка | > 100K строк |
| asyncpg | Асинхронные запросы | API-серверы |