Оптимизация
Оптимизация Spark¶
Spark может быть и в 100 раз быстрее, и в 10 раз медленнее, чем нужно. Разница — в понимании, как данные перемещаются между узлами и как Catalyst строит план выполнения.
Catalyst Optimizer¶
Catalyst — оптимизатор запросов Spark SQL. Он трансформирует логический план в физический за 4 фазы:
SQL / DataFrame API
↓
1. Parsing → Unresolved Logical Plan
2. Analysis → Resolved Logical Plan (типы, имена)
3. Optimization → Optimized Logical Plan (pushdown, pruning, fold)
4. Planning → Physical Plan (выбор алгоритмов JOIN, сортировки)
↓
Execution (RDD)
Что делает Catalyst автоматически¶
| Оптимизация | Описание |
|---|---|
| Predicate Pushdown | WHERE передаётся к источнику (Parquet, JDBC) |
| Column Pruning | Читаются только нужные столбцы |
| Constant Folding | 1 + 1 вычисляется на этапе планирования, не в runtime |
| Filter Reordering | Более селективные фильтры — первыми |
| Join Reordering | Оптимальный порядок JOIN (с PG 3.0 — statistics-based) |
Shuffle — главный враг производительности¶
Shuffle — перераспределение данных между executor'ами. Происходит при:
groupBy().agg()join()(кроме broadcast)repartition()distinct()orderBy()(global sort)
Shuffle означает: сериализация → запись на диск → сетевая передача → десериализация. Это самая дорогая операция в Spark.
Как уменьшить shuffle¶
# 1. Broadcast join вместо shuffle join
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "key")
# 2. Правильное количество shuffle-партиций
spark.conf.set("spark.sql.shuffle.partitions", 200) # дефолт 200
# Для маленьких данных (< 1 GB) — ставь 8-20
# Для больших (> 100 GB) — 200-1000
# 3. Избегай unnecessary repartition
# Плохо: df.repartition(100).filter(...) — сначала shuffle, потом фильтр
# Хорошо: df.filter(...).repartition(100) — сначала фильтр, меньше данных
Data Skew — перекос данных¶
Если один ключ содержит 90% строк (например, city = 'Москва'), один executor обрабатывает почти всё, остальные простаивают.
Диагностика¶
# Распределение по ключу JOIN/GROUP
df.groupBy("city").count().orderBy(F.desc("count")).show(10)
В Spark UI: одна задача работает 30 минут, остальные — 10 секунд.
Salting — борьба с перекосом¶
import pyspark.sql.functions as F
# Добавляем «соль» — случайное число к ключу
NUM_SALTS = 10
# К большой таблице — добавляем соль
big_df = big_df.withColumn("salt", (F.rand() * NUM_SALTS).cast("int"))
big_df = big_df.withColumn("salted_key", F.concat("city", F.lit("_"), "salt"))
# К маленькой таблице — размножаем (explode по всем значениям соли)
from pyspark.sql.functions import explode, array, lit
salts = [lit(i) for i in range(NUM_SALTS)]
small_df = small_df.withColumn("salt", explode(array(*salts)))
small_df = small_df.withColumn("salted_key", F.concat("city", F.lit("_"), "salt"))
# JOIN по salted_key → данные распределяются равномерно
result = big_df.join(small_df, "salted_key")
result = result.drop("salt", "salted_key")
Adaptive Query Execution (AQE)¶
AQE (Spark 3.0+) оптимизирует план во время выполнения на основе реальной статистики.
Что делает AQE¶
| Функция | Описание |
|---|---|
| Coalescing partitions | Объединяет мелкие shuffle-партиции (если < 64 MB) |
| Skew join handling | Автоматически разбивает перекошенные партиции |
| Broadcast conversion | Переключает shuffle join → broadcast, если таблица оказалась маленькой |
# Настройки AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
Управление партициями¶
repartition vs coalesce¶
# repartition — полный shuffle (дорого, но равномерно)
df.repartition(100) # 100 партиций
df.repartition("city") # по столбцу (все строки одного города → одна партиция)
df.repartition(50, "city") # 50 партиций по столбцу
# coalesce — без shuffle (только уменьшение)
df.coalesce(10) # объединяет партиции локально
| Метод | Shuffle | Направление | Когда |
|---|---|---|---|
repartition(n) |
Да | Увеличить или уменьшить | После фильтрации, перед записью |
coalesce(n) |
Нет | Только уменьшить | Перед записью маленького результата |
Оптимальный размер партиции¶
- Цель: 128 MB - 256 MB на партицию
- Слишком мелкие (< 10 MB) → overhead на планирование
- Слишком крупные (> 1 GB) → OOM, long GC pauses
# Оценка: файл 100 GB в Parquet → 100 GB / 200 MB = ~500 партиций
df.rdd.getNumPartitions() # текущее количество
Spark UI — диагностика¶
Spark UI доступен на http://driver:4040 во время выполнения.
На что смотреть¶
| Вкладка | Что искать |
|---|---|
| Jobs | Общее время, количество stages |
| Stages | Shuffle Read/Write — если большие → оптимизируй |
| Tasks | Разброс времени задач (skew: одна задача 10 мин, остальные 5 сек) |
| SQL | Графический план, метрики каждого узла |
| Storage | Кешированные DataFrame, размер |
| Executors | GC Time, Memory, Disk spillage |
Красные флаги¶
- Shuffle Write > 10 GB → слишком много данных перемещается
- Spill (Memory → Disk) → не хватает памяти, увеличь
spark.executor.memoryили уменьши партиции - GC Time > 10% → слишком много объектов, используй серилизованное хранение или Kryo
- Один task длится в 10x дольше → data skew
Управление памятью¶
# Основные параметры
spark.conf.set("spark.executor.memory", "8g") # heap
spark.conf.set("spark.executor.memoryOverhead", "2g") # off-heap (Python, Arrow)
spark.conf.set("spark.memory.fraction", 0.6) # доля heap для execution + storage
spark.conf.set("spark.memory.storageFraction", 0.5) # доля для storage (cache)
Executor Memory (8 GB):
├── Reserved (300 MB)
├── User Memory (40%): UDF, RDD metadata
└── Unified Memory (60% = 4.8 GB):
├── Execution: joins, sorts, aggregations
└── Storage: cached DataFrames
Чеклист оптимизации¶
- Используй встроенные функции вместо UDF
- Parquet/Delta вместо CSV/JSON (column pruning, predicate pushdown)
- Broadcast join для маленьких таблиц (< 100 MB)
- Включи AQE (
spark.sql.adaptive.enabled = true) - Правильный размер партиций (128-256 MB)
- coalesce перед записью маленького результата
- Кешируй DataFrame, используемый несколько раз
- Filter early — чем раньше фильтруешь, тем меньше данных
- Избегай collect() — он тянет все данные на driver
- Мониторь Spark UI — shuffle, skew, spill
Проверь себя¶
Источники¶
- Spark: Performance Tuning — официальный гайд
- Spark: Adaptive Query Execution — AQE
- Spark UI: Monitoring — описание вкладок