Перейти к содержанию

Оптимизация

Оптимизация Spark

Spark может быть и в 100 раз быстрее, и в 10 раз медленнее, чем нужно. Разница — в понимании, как данные перемещаются между узлами и как Catalyst строит план выполнения.


Catalyst Optimizer

Catalyst — оптимизатор запросов Spark SQL. Он трансформирует логический план в физический за 4 фазы:

Text Only
SQL / DataFrame API
  1. Parsing      → Unresolved Logical Plan
  2. Analysis     → Resolved Logical Plan (типы, имена)
  3. Optimization → Optimized Logical Plan (pushdown, pruning, fold)
  4. Planning     → Physical Plan (выбор алгоритмов JOIN, сортировки)
    Execution (RDD)

Что делает Catalyst автоматически

Оптимизация Описание
Predicate Pushdown WHERE передаётся к источнику (Parquet, JDBC)
Column Pruning Читаются только нужные столбцы
Constant Folding 1 + 1 вычисляется на этапе планирования, не в runtime
Filter Reordering Более селективные фильтры — первыми
Join Reordering Оптимальный порядок JOIN (с PG 3.0 — statistics-based)

Shuffle — главный враг производительности

Shuffle — перераспределение данных между executor'ами. Происходит при:

  • groupBy().agg()
  • join() (кроме broadcast)
  • repartition()
  • distinct()
  • orderBy() (global sort)

Shuffle означает: сериализация → запись на диск → сетевая передача → десериализация. Это самая дорогая операция в Spark.

Как уменьшить shuffle

Python
# 1. Broadcast join вместо shuffle join
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")

# 2. Правильное количество shuffle-партиций
spark.conf.set("spark.sql.shuffle.partitions", 200)  # дефолт 200
# Для маленьких данных (< 1 GB) — ставь 8-20
# Для больших (> 100 GB) — 200-1000

# 3. Избегай unnecessary repartition
# Плохо: df.repartition(100).filter(...)  — сначала shuffle, потом фильтр
# Хорошо: df.filter(...).repartition(100) — сначала фильтр, меньше данных

Data Skew — перекос данных

Если один ключ содержит 90% строк (например, city = 'Москва'), один executor обрабатывает почти всё, остальные простаивают.

Диагностика

Python
# Распределение по ключу JOIN/GROUP
df.groupBy("city").count().orderBy(F.desc("count")).show(10)

В Spark UI: одна задача работает 30 минут, остальные — 10 секунд.

Salting — борьба с перекосом

Python
import pyspark.sql.functions as F

# Добавляем «соль» — случайное число к ключу
NUM_SALTS = 10

# К большой таблице — добавляем соль
big_df = big_df.withColumn("salt", (F.rand() * NUM_SALTS).cast("int"))
big_df = big_df.withColumn("salted_key", F.concat("city", F.lit("_"), "salt"))

# К маленькой таблице — размножаем (explode по всем значениям соли)
from pyspark.sql.functions import explode, array, lit

salts = [lit(i) for i in range(NUM_SALTS)]
small_df = small_df.withColumn("salt", explode(array(*salts)))
small_df = small_df.withColumn("salted_key", F.concat("city", F.lit("_"), "salt"))

# JOIN по salted_key → данные распределяются равномерно
result = big_df.join(small_df, "salted_key")
result = result.drop("salt", "salted_key")

Adaptive Query Execution (AQE)

AQE (Spark 3.0+) оптимизирует план во время выполнения на основе реальной статистики.

Python
spark.conf.set("spark.sql.adaptive.enabled", True)  # дефолт True с Spark 3.2

Что делает AQE

Функция Описание
Coalescing partitions Объединяет мелкие shuffle-партиции (если < 64 MB)
Skew join handling Автоматически разбивает перекошенные партиции
Broadcast conversion Переключает shuffle join → broadcast, если таблица оказалась маленькой
Python
# Настройки AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

Управление партициями

repartition vs coalesce

Python
# repartition — полный shuffle (дорого, но равномерно)
df.repartition(100)               # 100 партиций
df.repartition("city")            # по столбцу (все строки одного города → одна партиция)
df.repartition(50, "city")        # 50 партиций по столбцу

# coalesce — без shuffle (только уменьшение)
df.coalesce(10)                   # объединяет партиции локально
Метод Shuffle Направление Когда
repartition(n) Да Увеличить или уменьшить После фильтрации, перед записью
coalesce(n) Нет Только уменьшить Перед записью маленького результата

Оптимальный размер партиции

  • Цель: 128 MB - 256 MB на партицию
  • Слишком мелкие (< 10 MB) → overhead на планирование
  • Слишком крупные (> 1 GB) → OOM, long GC pauses
Python
# Оценка: файл 100 GB в Parquet → 100 GB / 200 MB = ~500 партиций
df.rdd.getNumPartitions()  # текущее количество

Spark UI — диагностика

Spark UI доступен на http://driver:4040 во время выполнения.

На что смотреть

Вкладка Что искать
Jobs Общее время, количество stages
Stages Shuffle Read/Write — если большие → оптимизируй
Tasks Разброс времени задач (skew: одна задача 10 мин, остальные 5 сек)
SQL Графический план, метрики каждого узла
Storage Кешированные DataFrame, размер
Executors GC Time, Memory, Disk spillage

Красные флаги

  • Shuffle Write > 10 GB → слишком много данных перемещается
  • Spill (Memory → Disk) → не хватает памяти, увеличь spark.executor.memory или уменьши партиции
  • GC Time > 10% → слишком много объектов, используй серилизованное хранение или Kryo
  • Один task длится в 10x дольше → data skew

Управление памятью

Python
# Основные параметры
spark.conf.set("spark.executor.memory", "8g")      # heap
spark.conf.set("spark.executor.memoryOverhead", "2g")  # off-heap (Python, Arrow)
spark.conf.set("spark.memory.fraction", 0.6)       # доля heap для execution + storage
spark.conf.set("spark.memory.storageFraction", 0.5) # доля для storage (cache)
Text Only
Executor Memory (8 GB):
├── Reserved (300 MB)
├── User Memory (40%): UDF, RDD metadata
└── Unified Memory (60% = 4.8 GB):
    ├── Execution: joins, sorts, aggregations
    └── Storage: cached DataFrames

Чеклист оптимизации

  1. Используй встроенные функции вместо UDF
  2. Parquet/Delta вместо CSV/JSON (column pruning, predicate pushdown)
  3. Broadcast join для маленьких таблиц (< 100 MB)
  4. Включи AQE (spark.sql.adaptive.enabled = true)
  5. Правильный размер партиций (128-256 MB)
  6. coalesce перед записью маленького результата
  7. Кешируй DataFrame, используемый несколько раз
  8. Filter early — чем раньше фильтруешь, тем меньше данных
  9. Избегай collect() — он тянет все данные на driver
  10. Мониторь Spark UI — shuffle, skew, spill

Проверь себя


Источники