Перейти к содержанию

Введение

Введение в Apache Spark

Spark — фреймворк для распределённой обработки данных. Когда pandas не справляется с объёмом, а SQL-запросы в хранилище работают слишком долго, Spark становится основным инструментом дата-инженера.

Зачем Spark

Задача pandas Spark
Файл 500 MB Справится Избыточно
Файл 50 GB Не поместится в память Распределит по кластеру
JOIN двух таблиц по 1 TB Невозможно Стандартная задача
ETL-пайплайн с трансформациями Один процесс Параллелизм на десятках узлов

Правило: если данные помещаются в память одной машины — используй pandas. Если нет — Spark.

Архитектура

Spark работает по модели master-worker:

  • Driver — главный процесс. Разбивает задачу на stages и tasks, координирует выполнение.
  • Executor — рабочий процесс на узле кластера. Выполняет задачи и хранит данные в памяти.
  • Cluster Manager — распределяет ресурсы (YARN, Kubernetes, Standalone).
Text Only
┌──────────┐
│  Driver   │
│ (SparkContext)
└────┬─────┘
     ├── Executor 1 (Node A) — tasks + cache
     ├── Executor 2 (Node B) — tasks + cache
     └── Executor 3 (Node C) — tasks + cache

RDD vs DataFrame vs Dataset

Абстракция Описание Когда использовать
RDD Низкоуровневый API, коллекция объектов Нестандартные преобразования
DataFrame Таблица со схемой, как в SQL 95% задач в DE
Dataset Типизированный DataFrame (только Scala/Java) Scala-проекты

В PySpark работаешь с DataFrame — это аналог таблицы в БД.

PySpark: быстрый старт

Создание SparkSession

Python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("etl-pipeline") \
    .master("local[*]") \
    .getOrCreate()

local[*] — запуск локально на всех доступных ядрах. В кластере master задаётся менеджером ресурсов.

Чтение данных

Python
# CSV
df = spark.read.csv("data/sales.csv", header=True, inferSchema=True)

# Parquet (предпочтительный формат)
df = spark.read.parquet("data/sales.parquet")

# JSON
df = spark.read.json("data/events.json")

# JDBC (PostgreSQL)
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/warehouse") \
    .option("dbtable", "public.sales") \
    .option("user", "etl_user") \
    .option("password", "etl_pass") \
    .load()

Трансформации

Python
from pyspark.sql import functions as F

# фильтрация
df_filtered = df.filter(F.col("amount") > 100)

# новый столбец
df_with_year = df.withColumn("year", F.year("order_date"))

# агрегация
df_agg = df.groupBy("region").agg(
    F.sum("amount").alias("total_amount"),
    F.count("order_id").alias("order_count")
)

# JOIN
df_result = df_orders.join(df_customers, "customer_id", "left")

# SQL
df.createOrReplaceTempView("sales")
df_sql = spark.sql("""
    SELECT region, SUM(amount) AS total
    FROM sales
    WHERE amount > 100
    GROUP BY region
""")

Запись результата

Python
# Parquet (рекомендуется)
df_result.write.parquet("output/sales_agg.parquet", mode="overwrite")

# Партиционирование по столбцу
df_result.write.partitionBy("year", "month") \
    .parquet("output/sales_partitioned/")

# В PostgreSQL
df_result.write.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/warehouse") \
    .option("dbtable", "public.sales_agg") \
    .option("user", "etl_user") \
    .option("password", "etl_pass") \
    .mode("overwrite") \
    .save()

Lazy evaluation

Spark не выполняет трансформации сразу. Он строит план (DAG) и запускает вычисления только при action:

Тип Примеры Что происходит
Transformation filter, select, groupBy, join Добавляется в план, ничего не считается
Action show, count, collect, write Запускает выполнение всего плана
Python
# Это ничего не считает — только строит план
df_filtered = df.filter(F.col("amount") > 100)
df_agg = df_filtered.groupBy("region").sum("amount")

# А вот это запускает вычисление
df_agg.show()

Зачем lazy evaluation

Spark может оптимизировать весь план целиком: убрать лишние столбцы, объединить фильтры, выбрать порядок JOIN. Это называется Catalyst optimizer.

Партиционирование и кеширование

Партиции

DataFrame разбит на партиции — куски данных, которые обрабатываются параллельно.

Python
# посмотреть количество партиций
df.rdd.getNumPartitions()

# перепартиционировать
df_repartitioned = df.repartition(8, "region")

# уменьшить число партиций (без shuffle)
df_coalesced = df.coalesce(4)

Кеширование

Если DataFrame используется несколько раз, его можно закешировать:

Python
df.cache()  # в память
df.persist(StorageLevel.MEMORY_AND_DISK)  # память + диск
df.unpersist()  # освободить

Spark vs pandas vs SQL

Критерий pandas Spark SQL в DWH
Объём данных До ~10 GB Десятки TB Зависит от DWH
Где выполняется Одна машина Кластер Движок DWH
Язык Python Python/Scala/SQL SQL
Latency Мгновенно на малых данных Overhead на запуск Зависит от запроса
Типичная задача Ad-hoc анализ ETL-пайплайн Аналитические запросы

Когда использовать Spark в DE

  • ETL-пайплайны с данными > 10 GB
  • Обработка логов, событий, clickstream
  • Чтение из S3/HDFS/GCS и запись обратно
  • Трансформации, которые не ложатся в SQL (ML-preprocessing, UDF)
  • Потоковая обработка (Spark Structured Streaming)

Проверь себя

Источники