DataFrames в деталях
Spark DataFrames в деталях¶
DataFrame — основной API для работы с данными в Spark. Это распределённая коллекция строк с именованными столбцами, аналог таблицы в SQL. В отличие от RDD, DataFrame знает свою схему (типы столбцов), что позволяет Catalyst-оптимизатору строить эффективные планы.
Создание DataFrame¶
Из файлов¶
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ETL Kitchen") \
.config("spark.sql.shuffle.partitions", 8) \
.getOrCreate()
# CSV
df = spark.read.csv("data/orders.csv", header=True, inferSchema=True)
# Parquet (рекомендуется для хранения)
df = spark.read.parquet("data/orders.parquet")
# JSON
df = spark.read.json("data/events.json")
# JDBC (PostgreSQL)
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/etl_kitchen") \
.option("dbtable", "orders") \
.option("user", "etl_user") \
.option("password", "etl_pass") \
.load()
Из коллекции¶
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", IntegerType(), nullable=True),
])
data = [("Алексей", 30), ("Мария", 25), ("Дмитрий", None)]
df = spark.createDataFrame(data, schema)
Всегда задавай схему явно
inferSchema=True удобен для прототипа, но: (1) читает данные дважды, (2) может угадать тип неправильно (строка "01" → integer 1). В продакшене — всегда явная схема.
Схема (Schema)¶
# Посмотреть схему
df.printSchema()
# root
# |-- order_id: integer
# |-- customer_id: integer
# |-- order_date: date
# |-- status: string
# |-- total_amount: decimal(12,2)
# Программный доступ
df.schema # StructType
df.columns # ['order_id', 'customer_id', ...]
df.dtypes # [('order_id', 'int'), ...]
StructType — декларация схемы¶
from pyspark.sql.types import *
orders_schema = StructType([
StructField("order_id", IntegerType(), False),
StructField("customer_id", IntegerType(), True),
StructField("order_date", DateType(), False),
StructField("status", StringType(), True),
StructField("total_amount", DecimalType(12, 2), True),
StructField("items", ArrayType(
StructType([
StructField("product_id", IntegerType()),
StructField("quantity", IntegerType()),
])
), True),
])
Column API — трансформации¶
Выборка и фильтрация¶
from pyspark.sql import functions as F
# Выбор столбцов
df.select("name", "city")
df.select(F.col("name"), F.col("city"))
# Фильтрация
df.filter(F.col("city") == "Москва")
df.filter((F.col("price") > 1000) & (F.col("is_available") == True))
df.where("status IN ('new', 'processing')") # SQL-синтаксис тоже работает
# Новый столбец
df.withColumn("total", F.col("quantity") * F.col("unit_price"))
df.withColumn("year", F.year("order_date"))
# Переименование
df.withColumnRenamed("order_date", "date")
# Удаление
df.drop("temporary_col")
Агрегации¶
# GROUP BY
df.groupBy("city").agg(
F.count("*").alias("total_orders"),
F.sum("total_amount").alias("revenue"),
F.avg("total_amount").alias("avg_order"),
F.max("order_date").alias("last_order"),
)
# Несколько агрегаций
from pyspark.sql.functions import countDistinct, round
df.groupBy("region").agg(
countDistinct("customer_id").alias("unique_customers"),
round(F.avg("total_amount"), 2).alias("avg_order"),
)
Оконные функции¶
from pyspark.sql.window import Window
# Определение окна
window_by_customer = Window.partitionBy("customer_id").orderBy("order_date")
# ROW_NUMBER
df.withColumn("order_num", F.row_number().over(window_by_customer))
# Running total
df.withColumn("running_total",
F.sum("total_amount").over(
window_by_customer.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
)
# LAG / LEAD
df.withColumn("prev_order_date",
F.lag("order_date", 1).over(window_by_customer)
)
Joins¶
# INNER JOIN
orders_df.join(customers_df, "customer_id") # одноимённый ключ
orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id)
# LEFT JOIN
orders_df.join(customers_df, "customer_id", "left")
# Anti-join (клиенты без заказов)
customers_df.join(orders_df, "customer_id", "left_anti")
# Cross join
df1.crossJoin(df2)
Broadcast Join¶
Если одна таблица маленькая (< 10 MB по умолчанию), Spark отправляет её на все executor'ы — без shuffle.
from pyspark.sql.functions import broadcast
# Принудительный broadcast (если Spark не угадал)
orders_df.join(broadcast(categories_df), "category_id")
Broadcast только для маленьких таблиц
Broadcast 1 GB таблицы на 100 executor'ов = 100 GB сетевого трафика. Порог: spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 MB).
UDF — пользовательские функции¶
Python UDF¶
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def classify_order(amount):
if amount is None:
return "unknown"
if amount > 100000:
return "premium"
if amount > 10000:
return "standard"
return "small"
df.withColumn("order_class", classify_order(F.col("total_amount")))
Python UDF — медленные
Python UDF сериализует данные из JVM в Python и обратно (через pickle/Arrow). В 10-100 раз медленнее встроенных функций. Используй:
- Встроенные функции (
F.when,F.regexp_extract, etc.) — всегда первый выбор - Pandas UDF — работает пакетами через Apache Arrow, в 3-10 раз быстрее Python UDF
- Python UDF — только если нет альтернативы
Pandas UDF (vectorized)¶
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def classify_order_pandas(amounts: pd.Series) -> pd.Series:
return pd.cut(amounts,
bins=[-float('inf'), 10000, 100000, float('inf')],
labels=["small", "standard", "premium"])
df.withColumn("order_class", classify_order_pandas(F.col("total_amount")))
explain() — план выполнения¶
# Простой план
df.filter(F.col("city") == "Москва").explain()
# Подробный план (все фазы: Parsed → Analyzed → Optimized → Physical)
df.filter(F.col("city") == "Москва").explain(mode="extended")
# Форматированный (PG-стиль)
df.explain(mode="formatted")
Что смотреть в плане:
| Узел | Описание | На что обратить внимание |
|---|---|---|
Exchange |
Shuffle (перераспределение данных) | Самая дорогая операция |
BroadcastExchange |
Broadcast маленькой таблицы | ОК если таблица маленькая |
Sort |
Сортировка | Если не нужна — лишний overhead |
Filter |
Фильтрация | Pushdown к источнику? |
Project |
Выбор столбцов | Column pruning? |
Кеширование¶
# Кешировать в память (десериализованный формат)
df.cache() # = df.persist(StorageLevel.MEMORY_AND_DISK)
df.count() # триггерит кеширование (lazy!)
# Явный уровень
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # сериализованный, компактнее
# Освободить кеш
df.unpersist()
| Уровень | Память | Диск | Сериализация | Когда |
|---|---|---|---|---|
MEMORY_ONLY |
Да | Нет | Нет | Маленькие данные, быстрый доступ |
MEMORY_AND_DISK |
Да | Да (fallback) | Нет | По умолчанию (cache) |
MEMORY_AND_DISK_SER |
Да | Да | Да | Большие данные, экономия памяти |
DISK_ONLY |
Нет | Да | Да | Огромные данные |
Когда кешировать
Кешируй DataFrame, который используется несколько раз (JOIN, потом агрегация, потом запись). Не кешируй то, что читается один раз.
Запись результатов¶
# Parquet (рекомендуется)
df.write.mode("overwrite").parquet("output/orders")
# Партиционирование при записи (важно для Data Lake)
df.write.partitionBy("year", "month").parquet("output/orders")
# CSV
df.write.mode("append").option("header", True).csv("output/orders_csv")
# JDBC
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/etl_kitchen") \
.option("dbtable", "orders_aggregated") \
.option("user", "etl_user") \
.mode("overwrite") \
.save()
# Delta Lake
df.write.format("delta").mode("overwrite").save("output/orders_delta")
Режимы записи¶
| Режим | Поведение |
|---|---|
overwrite |
Удаляет существующие данные |
append |
Добавляет к существующим |
ignore |
Ничего не делает если данные есть |
error (default) |
Ошибка если данные есть |
Проверь себя¶
Источники¶
- Spark: DataFrame Guide — официальная документация
- PySpark API: Functions — встроенные функции
- Spark: Performance Tuning — оптимизация