Введение
Введение в Apache Spark¶
Spark — фреймворк для распределённой обработки данных. Когда pandas не справляется с объёмом, а SQL-запросы в хранилище работают слишком долго, Spark становится основным инструментом дата-инженера.
Зачем Spark¶
| Задача | pandas | Spark |
|---|---|---|
| Файл 500 MB | Справится | Избыточно |
| Файл 50 GB | Не поместится в память | Распределит по кластеру |
| JOIN двух таблиц по 1 TB | Невозможно | Стандартная задача |
| ETL-пайплайн с трансформациями | Один процесс | Параллелизм на десятках узлов |
Правило: если данные помещаются в память одной машины — используй pandas. Если нет — Spark.
Архитектура¶
Spark работает по модели master-worker:
- Driver — главный процесс. Разбивает задачу на stages и tasks, координирует выполнение.
- Executor — рабочий процесс на узле кластера. Выполняет задачи и хранит данные в памяти.
- Cluster Manager — распределяет ресурсы (YARN, Kubernetes, Standalone).
┌──────────┐
│ Driver │
│ (SparkContext)
└────┬─────┘
│
├── Executor 1 (Node A) — tasks + cache
├── Executor 2 (Node B) — tasks + cache
└── Executor 3 (Node C) — tasks + cache
RDD vs DataFrame vs Dataset¶
| Абстракция | Описание | Когда использовать |
|---|---|---|
| RDD | Низкоуровневый API, коллекция объектов | Нестандартные преобразования |
| DataFrame | Таблица со схемой, как в SQL | 95% задач в DE |
| Dataset | Типизированный DataFrame (только Scala/Java) | Scala-проекты |
В PySpark работаешь с DataFrame — это аналог таблицы в БД.
PySpark: быстрый старт¶
Создание SparkSession¶
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("etl-pipeline") \
.master("local[*]") \
.getOrCreate()
local[*] — запуск локально на всех доступных ядрах. В кластере master задаётся менеджером ресурсов.
Чтение данных¶
# CSV
df = spark.read.csv("data/sales.csv", header=True, inferSchema=True)
# Parquet (предпочтительный формат)
df = spark.read.parquet("data/sales.parquet")
# JSON
df = spark.read.json("data/events.json")
# JDBC (PostgreSQL)
df = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/warehouse") \
.option("dbtable", "public.sales") \
.option("user", "etl_user") \
.option("password", "etl_pass") \
.load()
Трансформации¶
from pyspark.sql import functions as F
# фильтрация
df_filtered = df.filter(F.col("amount") > 100)
# новый столбец
df_with_year = df.withColumn("year", F.year("order_date"))
# агрегация
df_agg = df.groupBy("region").agg(
F.sum("amount").alias("total_amount"),
F.count("order_id").alias("order_count")
)
# JOIN
df_result = df_orders.join(df_customers, "customer_id", "left")
# SQL
df.createOrReplaceTempView("sales")
df_sql = spark.sql("""
SELECT region, SUM(amount) AS total
FROM sales
WHERE amount > 100
GROUP BY region
""")
Запись результата¶
# Parquet (рекомендуется)
df_result.write.parquet("output/sales_agg.parquet", mode="overwrite")
# Партиционирование по столбцу
df_result.write.partitionBy("year", "month") \
.parquet("output/sales_partitioned/")
# В PostgreSQL
df_result.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/warehouse") \
.option("dbtable", "public.sales_agg") \
.option("user", "etl_user") \
.option("password", "etl_pass") \
.mode("overwrite") \
.save()
Lazy evaluation¶
Spark не выполняет трансформации сразу. Он строит план (DAG) и запускает вычисления только при action:
| Тип | Примеры | Что происходит |
|---|---|---|
| Transformation | filter, select, groupBy, join |
Добавляется в план, ничего не считается |
| Action | show, count, collect, write |
Запускает выполнение всего плана |
# Это ничего не считает — только строит план
df_filtered = df.filter(F.col("amount") > 100)
df_agg = df_filtered.groupBy("region").sum("amount")
# А вот это запускает вычисление
df_agg.show()
Зачем lazy evaluation
Spark может оптимизировать весь план целиком: убрать лишние столбцы, объединить фильтры, выбрать порядок JOIN. Это называется Catalyst optimizer.
Партиционирование и кеширование¶
Партиции¶
DataFrame разбит на партиции — куски данных, которые обрабатываются параллельно.
# посмотреть количество партиций
df.rdd.getNumPartitions()
# перепартиционировать
df_repartitioned = df.repartition(8, "region")
# уменьшить число партиций (без shuffle)
df_coalesced = df.coalesce(4)
Кеширование¶
Если DataFrame используется несколько раз, его можно закешировать:
df.cache() # в память
df.persist(StorageLevel.MEMORY_AND_DISK) # память + диск
df.unpersist() # освободить
Spark vs pandas vs SQL¶
| Критерий | pandas | Spark | SQL в DWH |
|---|---|---|---|
| Объём данных | До ~10 GB | Десятки TB | Зависит от DWH |
| Где выполняется | Одна машина | Кластер | Движок DWH |
| Язык | Python | Python/Scala/SQL | SQL |
| Latency | Мгновенно на малых данных | Overhead на запуск | Зависит от запроса |
| Типичная задача | Ad-hoc анализ | ETL-пайплайн | Аналитические запросы |
Когда использовать Spark в DE¶
- ETL-пайплайны с данными > 10 GB
- Обработка логов, событий, clickstream
- Чтение из S3/HDFS/GCS и запись обратно
- Трансформации, которые не ложатся в SQL (ML-preprocessing, UDF)
- Потоковая обработка (Spark Structured Streaming)
Проверь себя¶
Источники¶
- Spark documentation — официальная документация
- PySpark API reference — API для Python