Перейти к содержанию

pandas — трансформации данных

pandas — основная библиотека Python для табличных данных. Дата-инженер использует pandas для ad-hoc трансформаций, прототипирования ETL-логики и обработки небольших датасетов (до нескольких ГБ). Для больших объёмов есть Spark и DuckDB, но pandas — отправная точка.


DataFrame: создание и базовые операции

DataFrame — двумерная таблица с именованными колонками и типизированными данными.

Python
import pandas as pd

# Из словаря
df = pd.DataFrame({
    "user_id": [1, 2, 3, 4],
    "name": ["Alice", "Bob", "Carol", "Dave"],
    "region": ["EU", "US", "EU", "US"],
    "revenue": [1200, 800, 1500, 900],
})

print(df)
Text Only
   user_id   name region  revenue
0        1  Alice     EU     1200
1        2    Bob     US      800
2        3  Carol     EU     1500
3        4   Dave     US      900

Основные атрибуты

Python
df.shape      # (4, 4) — строки × колонки
df.dtypes     # типы данных каждой колонки
df.columns    # Index(['user_id', 'name', 'region', 'revenue'])
df.info()     # сводка: типы, non-null, память

Чтение и запись данных

CSV

Python
# Чтение
df = pd.read_csv("sales.csv", sep=",", encoding="utf-8")

# Запись
df.to_csv("output.csv", index=False, encoding="utf-8")

Полезные параметры read_csv

  • dtype={"user_id": "int64"} — явные типы (не угадывать)
  • parse_dates=["created_at"] — парсить даты
  • usecols=["id", "name"] — читать только нужные колонки
  • nrows=1000 — прочитать первые N строк для проверки

Parquet

Python
# Чтение (быстрее и компактнее CSV)
df = pd.read_parquet("sales.parquet")

# Запись
df.to_parquet("output.parquet", index=False, engine="pyarrow")

Почему Parquet

Parquet — колоночный формат. Он сжимает данные в 5–10 раз лучше CSV и читает только нужные колонки. Для аналитических пайплайнов это стандарт.

SQL

Python
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost:5432/mydb")

# Чтение
df = pd.read_sql("SELECT * FROM orders WHERE created_at > '2025-01-01'", engine)

# Запись в таблицу
df.to_sql("orders_clean", engine, if_exists="replace", index=False)

to_sql на больших таблицах

to_sql по умолчанию вставляет строки по одной. Для больших DataFrame добавь method="multi" и chunksize=5000 — это ускорит вставку в десятки раз.

JSON

Python
# Из файла
df = pd.read_json("data.json")

# Из вложенного JSON (API-ответ)
import json

raw = '{"data": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}'
records = json.loads(raw)["data"]
df = pd.DataFrame(records)

Фильтрация и выборка

Python
# Фильтр по условию
eu_users = df[df["region"] == "EU"]

# Несколько условий (& — и, | — или)
high_eu = df[(df["region"] == "EU") & (df["revenue"] > 1000)]

# query — читаемая альтернатива
high_eu = df.query("region == 'EU' and revenue > 1000")

# Выбрать колонки
names = df[["user_id", "name"]]

# Первые/последние строки
df.head(10)
df.tail(5)

Группировки и агрегации

Python
# Сумма выручки по регионам
by_region = (
    df.groupby("region")["revenue"]
    .sum()
    .reset_index()
    .rename(columns={"revenue": "total_revenue"})
)
print(by_region)
Text Only
  region  total_revenue
0     EU           2700
1     US           1700

Несколько агрегатов

Python
agg = (
    df.groupby("region")
    .agg(
        user_count=("user_id", "count"),
        total_revenue=("revenue", "sum"),
        avg_revenue=("revenue", "mean"),
    )
    .reset_index()
)
print(agg)
Text Only
  region  user_count  total_revenue  avg_revenue
0     EU           2           2700       1350.0
1     US           2           1700        850.0

Merge — соединение таблиц

merge в pandas — аналог SQL JOIN:

Python
orders = pd.DataFrame({
    "order_id": [101, 102, 103],
    "user_id": [1, 2, 5],
    "amount": [250, 180, 300],
})

users = pd.DataFrame({
    "user_id": [1, 2, 3],
    "name": ["Alice", "Bob", "Carol"],
})

# INNER JOIN (только совпадения)
result = orders.merge(users, on="user_id", how="inner")
print(result)
Text Only
   order_id  user_id  amount   name
0       101        1     250  Alice
1       102        2     180    Bob

Типы merge

pandas SQL Описание
how="inner" INNER JOIN Только совпадения
how="left" LEFT JOIN Все строки из левой таблицы
how="right" RIGHT JOIN Все строки из правой таблицы
how="outer" FULL OUTER JOIN Все строки из обеих таблиц

Если ключи называются по-разному

Python
orders.merge(users, left_on="customer_id", right_on="user_id", how="left")

Типичные паттерны DE

Дедупликация

Python
# Удалить полные дубли
df_clean = df.drop_duplicates()

# Оставить последнюю запись по ключу
df_latest = (
    df.sort_values("updated_at")
    .drop_duplicates(subset=["user_id"], keep="last")
)

Pivot — развернуть строки в колонки

Python
sales = pd.DataFrame({
    "month": ["Jan", "Jan", "Feb", "Feb"],
    "region": ["EU", "US", "EU", "US"],
    "revenue": [1000, 800, 1200, 900],
})

pivot = sales.pivot_table(
    values="revenue",
    index="month",
    columns="region",
    aggfunc="sum",
)
print(pivot)
Text Only
region    EU   US
month
Feb     1200  900
Jan     1000  800

Обработка пропусков

Python
# Проверить пропуски
df.isna().sum()

# Заполнить значением
df["revenue"] = df["revenue"].fillna(0)

# Удалить строки с пропусками в ключевых полях
df = df.dropna(subset=["user_id", "email"])

Приведение типов

Python
# Строка → дата
df["created_at"] = pd.to_datetime(df["created_at"])

# Объект → число
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")

# Категории (экономит память)
df["region"] = df["region"].astype("category")

Когда pandas не хватает

Ситуация Альтернатива
Данные > 5 ГБ PySpark, Dask
Аналитика по файлам без загрузки в память DuckDB
Нужна параллельность Polars
Потоковая обработка Flink, Kafka Streams

DuckDB — SQL поверх файлов

Python
import duckdb
result = duckdb.sql("SELECT region, SUM(revenue) FROM 'sales.parquet' GROUP BY 1")
print(result.df())
DuckDB читает Parquet, CSV, JSON напрямую — без загрузки в DataFrame.


Что запомнить

Тема Ключевая мысль
Чтение данных read_csv, read_parquet, read_sql — единый интерфейс
Фильтрация df[условие] или df.query()
Группировки groupby().agg() — аналог SQL GROUP BY
Merge df.merge() — аналог SQL JOIN
Дедупликация drop_duplicates(subset=[...], keep="last")
Ограничения Для данных > 5 ГБ переходи на Spark или DuckDB

Проверь себя


Источники