Перейти к содержанию

DataFrames в деталях

Spark DataFrames в деталях

DataFrame — основной API для работы с данными в Spark. Это распределённая коллекция строк с именованными столбцами, аналог таблицы в SQL. В отличие от RDD, DataFrame знает свою схему (типы столбцов), что позволяет Catalyst-оптимизатору строить эффективные планы.


Создание DataFrame

Из файлов

Python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL Kitchen") \
    .config("spark.sql.shuffle.partitions", 8) \
    .getOrCreate()

# CSV
df = spark.read.csv("data/orders.csv", header=True, inferSchema=True)

# Parquet (рекомендуется для хранения)
df = spark.read.parquet("data/orders.parquet")

# JSON
df = spark.read.json("data/events.json")

# JDBC (PostgreSQL)
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/etl_kitchen") \
    .option("dbtable", "orders") \
    .option("user", "etl_user") \
    .option("password", "etl_pass") \
    .load()

Из коллекции

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=True),
])

data = [("Алексей", 30), ("Мария", 25), ("Дмитрий", None)]
df = spark.createDataFrame(data, schema)

Всегда задавай схему явно

inferSchema=True удобен для прототипа, но: (1) читает данные дважды, (2) может угадать тип неправильно (строка "01" → integer 1). В продакшене — всегда явная схема.


Схема (Schema)

Python
# Посмотреть схему
df.printSchema()
# root
#  |-- order_id: integer
#  |-- customer_id: integer
#  |-- order_date: date
#  |-- status: string
#  |-- total_amount: decimal(12,2)

# Программный доступ
df.schema            # StructType
df.columns           # ['order_id', 'customer_id', ...]
df.dtypes            # [('order_id', 'int'), ...]

StructType — декларация схемы

Python
from pyspark.sql.types import *

orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", DateType(), False),
    StructField("status", StringType(), True),
    StructField("total_amount", DecimalType(12, 2), True),
    StructField("items", ArrayType(
        StructType([
            StructField("product_id", IntegerType()),
            StructField("quantity", IntegerType()),
        ])
    ), True),
])

Column API — трансформации

Выборка и фильтрация

Python
from pyspark.sql import functions as F

# Выбор столбцов
df.select("name", "city")
df.select(F.col("name"), F.col("city"))

# Фильтрация
df.filter(F.col("city") == "Москва")
df.filter((F.col("price") > 1000) & (F.col("is_available") == True))
df.where("status IN ('new', 'processing')")  # SQL-синтаксис тоже работает

# Новый столбец
df.withColumn("total", F.col("quantity") * F.col("unit_price"))
df.withColumn("year", F.year("order_date"))

# Переименование
df.withColumnRenamed("order_date", "date")

# Удаление
df.drop("temporary_col")

Агрегации

Python
# GROUP BY
df.groupBy("city").agg(
    F.count("*").alias("total_orders"),
    F.sum("total_amount").alias("revenue"),
    F.avg("total_amount").alias("avg_order"),
    F.max("order_date").alias("last_order"),
)

# Несколько агрегаций
from pyspark.sql.functions import countDistinct, round

df.groupBy("region").agg(
    countDistinct("customer_id").alias("unique_customers"),
    round(F.avg("total_amount"), 2).alias("avg_order"),
)

Оконные функции

Python
from pyspark.sql.window import Window

# Определение окна
window_by_customer = Window.partitionBy("customer_id").orderBy("order_date")

# ROW_NUMBER
df.withColumn("order_num", F.row_number().over(window_by_customer))

# Running total
df.withColumn("running_total",
    F.sum("total_amount").over(
        window_by_customer.rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )
)

# LAG / LEAD
df.withColumn("prev_order_date",
    F.lag("order_date", 1).over(window_by_customer)
)

Joins

Python
# INNER JOIN
orders_df.join(customers_df, "customer_id")          # одноимённый ключ
orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id)

# LEFT JOIN
orders_df.join(customers_df, "customer_id", "left")

# Anti-join (клиенты без заказов)
customers_df.join(orders_df, "customer_id", "left_anti")

# Cross join
df1.crossJoin(df2)

Broadcast Join

Если одна таблица маленькая (< 10 MB по умолчанию), Spark отправляет её на все executor'ы — без shuffle.

Python
from pyspark.sql.functions import broadcast

# Принудительный broadcast (если Spark не угадал)
orders_df.join(broadcast(categories_df), "category_id")

Broadcast только для маленьких таблиц

Broadcast 1 GB таблицы на 100 executor'ов = 100 GB сетевого трафика. Порог: spark.sql.autoBroadcastJoinThreshold (по умолчанию 10 MB).


UDF — пользовательские функции

Python UDF

Python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def classify_order(amount):
    if amount is None:
        return "unknown"
    if amount > 100000:
        return "premium"
    if amount > 10000:
        return "standard"
    return "small"

df.withColumn("order_class", classify_order(F.col("total_amount")))

Python UDF — медленные

Python UDF сериализует данные из JVM в Python и обратно (через pickle/Arrow). В 10-100 раз медленнее встроенных функций. Используй:

  1. Встроенные функции (F.when, F.regexp_extract, etc.) — всегда первый выбор
  2. Pandas UDF — работает пакетами через Apache Arrow, в 3-10 раз быстрее Python UDF
  3. Python UDF — только если нет альтернативы

Pandas UDF (vectorized)

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def classify_order_pandas(amounts: pd.Series) -> pd.Series:
    return pd.cut(amounts,
                  bins=[-float('inf'), 10000, 100000, float('inf')],
                  labels=["small", "standard", "premium"])

df.withColumn("order_class", classify_order_pandas(F.col("total_amount")))

explain() — план выполнения

Python
# Простой план
df.filter(F.col("city") == "Москва").explain()

# Подробный план (все фазы: Parsed → Analyzed → Optimized → Physical)
df.filter(F.col("city") == "Москва").explain(mode="extended")

# Форматированный (PG-стиль)
df.explain(mode="formatted")

Что смотреть в плане:

Узел Описание На что обратить внимание
Exchange Shuffle (перераспределение данных) Самая дорогая операция
BroadcastExchange Broadcast маленькой таблицы ОК если таблица маленькая
Sort Сортировка Если не нужна — лишний overhead
Filter Фильтрация Pushdown к источнику?
Project Выбор столбцов Column pruning?

Кеширование

Python
# Кешировать в память (десериализованный формат)
df.cache()       # = df.persist(StorageLevel.MEMORY_AND_DISK)
df.count()       # триггерит кеширование (lazy!)

# Явный уровень
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)   # сериализованный, компактнее

# Освободить кеш
df.unpersist()
Уровень Память Диск Сериализация Когда
MEMORY_ONLY Да Нет Нет Маленькие данные, быстрый доступ
MEMORY_AND_DISK Да Да (fallback) Нет По умолчанию (cache)
MEMORY_AND_DISK_SER Да Да Да Большие данные, экономия памяти
DISK_ONLY Нет Да Да Огромные данные

Когда кешировать

Кешируй DataFrame, который используется несколько раз (JOIN, потом агрегация, потом запись). Не кешируй то, что читается один раз.


Запись результатов

Python
# Parquet (рекомендуется)
df.write.mode("overwrite").parquet("output/orders")

# Партиционирование при записи (важно для Data Lake)
df.write.partitionBy("year", "month").parquet("output/orders")

# CSV
df.write.mode("append").option("header", True).csv("output/orders_csv")

# JDBC
df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/etl_kitchen") \
    .option("dbtable", "orders_aggregated") \
    .option("user", "etl_user") \
    .mode("overwrite") \
    .save()

# Delta Lake
df.write.format("delta").mode("overwrite").save("output/orders_delta")

Режимы записи

Режим Поведение
overwrite Удаляет существующие данные
append Добавляет к существующим
ignore Ничего не делает если данные есть
error (default) Ошибка если данные есть

Проверь себя


Источники