pandas — трансформации данных¶
pandas — основная библиотека Python для табличных данных. Дата-инженер использует pandas для ad-hoc трансформаций, прототипирования ETL-логики и обработки небольших датасетов (до нескольких ГБ). Для больших объёмов есть Spark и DuckDB, но pandas — отправная точка.
DataFrame: создание и базовые операции¶
DataFrame — двумерная таблица с именованными колонками и типизированными данными.
import pandas as pd
# Из словаря
df = pd.DataFrame({
"user_id": [1, 2, 3, 4],
"name": ["Alice", "Bob", "Carol", "Dave"],
"region": ["EU", "US", "EU", "US"],
"revenue": [1200, 800, 1500, 900],
})
print(df)
user_id name region revenue
0 1 Alice EU 1200
1 2 Bob US 800
2 3 Carol EU 1500
3 4 Dave US 900
Основные атрибуты¶
df.shape # (4, 4) — строки × колонки
df.dtypes # типы данных каждой колонки
df.columns # Index(['user_id', 'name', 'region', 'revenue'])
df.info() # сводка: типы, non-null, память
Чтение и запись данных¶
CSV¶
# Чтение
df = pd.read_csv("sales.csv", sep=",", encoding="utf-8")
# Запись
df.to_csv("output.csv", index=False, encoding="utf-8")
Полезные параметры read_csv
dtype={"user_id": "int64"}— явные типы (не угадывать)parse_dates=["created_at"]— парсить датыusecols=["id", "name"]— читать только нужные колонкиnrows=1000— прочитать первые N строк для проверки
Parquet¶
# Чтение (быстрее и компактнее CSV)
df = pd.read_parquet("sales.parquet")
# Запись
df.to_parquet("output.parquet", index=False, engine="pyarrow")
Почему Parquet
Parquet — колоночный формат. Он сжимает данные в 5–10 раз лучше CSV и читает только нужные колонки. Для аналитических пайплайнов это стандарт.
SQL¶
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")
# Чтение
df = pd.read_sql("SELECT * FROM orders WHERE created_at > '2025-01-01'", engine)
# Запись в таблицу
df.to_sql("orders_clean", engine, if_exists="replace", index=False)
to_sql на больших таблицах
to_sql по умолчанию вставляет строки по одной. Для больших DataFrame добавь method="multi" и chunksize=5000 — это ускорит вставку в десятки раз.
JSON¶
# Из файла
df = pd.read_json("data.json")
# Из вложенного JSON (API-ответ)
import json
raw = '{"data": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}'
records = json.loads(raw)["data"]
df = pd.DataFrame(records)
Фильтрация и выборка¶
# Фильтр по условию
eu_users = df[df["region"] == "EU"]
# Несколько условий (& — и, | — или)
high_eu = df[(df["region"] == "EU") & (df["revenue"] > 1000)]
# query — читаемая альтернатива
high_eu = df.query("region == 'EU' and revenue > 1000")
# Выбрать колонки
names = df[["user_id", "name"]]
# Первые/последние строки
df.head(10)
df.tail(5)
Группировки и агрегации¶
# Сумма выручки по регионам
by_region = (
df.groupby("region")["revenue"]
.sum()
.reset_index()
.rename(columns={"revenue": "total_revenue"})
)
print(by_region)
Несколько агрегатов¶
agg = (
df.groupby("region")
.agg(
user_count=("user_id", "count"),
total_revenue=("revenue", "sum"),
avg_revenue=("revenue", "mean"),
)
.reset_index()
)
print(agg)
Merge — соединение таблиц¶
merge в pandas — аналог SQL JOIN:
orders = pd.DataFrame({
"order_id": [101, 102, 103],
"user_id": [1, 2, 5],
"amount": [250, 180, 300],
})
users = pd.DataFrame({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
})
# INNER JOIN (только совпадения)
result = orders.merge(users, on="user_id", how="inner")
print(result)
Типы merge¶
| pandas | SQL | Описание |
|---|---|---|
how="inner" |
INNER JOIN | Только совпадения |
how="left" |
LEFT JOIN | Все строки из левой таблицы |
how="right" |
RIGHT JOIN | Все строки из правой таблицы |
how="outer" |
FULL OUTER JOIN | Все строки из обеих таблиц |
Если ключи называются по-разному
Типичные паттерны DE¶
Дедупликация¶
# Удалить полные дубли
df_clean = df.drop_duplicates()
# Оставить последнюю запись по ключу
df_latest = (
df.sort_values("updated_at")
.drop_duplicates(subset=["user_id"], keep="last")
)
Pivot — развернуть строки в колонки¶
sales = pd.DataFrame({
"month": ["Jan", "Jan", "Feb", "Feb"],
"region": ["EU", "US", "EU", "US"],
"revenue": [1000, 800, 1200, 900],
})
pivot = sales.pivot_table(
values="revenue",
index="month",
columns="region",
aggfunc="sum",
)
print(pivot)
Обработка пропусков¶
# Проверить пропуски
df.isna().sum()
# Заполнить значением
df["revenue"] = df["revenue"].fillna(0)
# Удалить строки с пропусками в ключевых полях
df = df.dropna(subset=["user_id", "email"])
Приведение типов¶
# Строка → дата
df["created_at"] = pd.to_datetime(df["created_at"])
# Объект → число
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
# Категории (экономит память)
df["region"] = df["region"].astype("category")
Когда pandas не хватает¶
| Ситуация | Альтернатива |
|---|---|
| Данные > 5 ГБ | PySpark, Dask |
| Аналитика по файлам без загрузки в память | DuckDB |
| Нужна параллельность | Polars |
| Потоковая обработка | Flink, Kafka Streams |
DuckDB — SQL поверх файлов
DuckDB читает Parquet, CSV, JSON напрямую — без загрузки в DataFrame.Что запомнить¶
| Тема | Ключевая мысль |
|---|---|
| Чтение данных | read_csv, read_parquet, read_sql — единый интерфейс |
| Фильтрация | df[условие] или df.query() |
| Группировки | groupby().agg() — аналог SQL GROUP BY |
| Merge | df.merge() — аналог SQL JOIN |
| Дедупликация | drop_duplicates(subset=[...], keep="last") |
| Ограничения | Для данных > 5 ГБ переходи на Spark или DuckDB |
Проверь себя¶
Источники¶
- pandas User Guide — полная документация по всем операциям
- pandas Getting Started Tutorials — пошаговые уроки
- pandas.DataFrame.merge — справка по merge
- DuckDB — Python API — альтернатива для больших файлов