Перейти к содержанию

Structured Streaming

Structured Streaming

Structured Streaming — потоковая обработка данных в Spark, построенная поверх DataFrame API. Ты пишешь тот же код, что и для batch, но данные приходят непрерывно.

Ключевая идея: поток — это бесконечная таблица, к которой постоянно добавляются строки. Каждый trigger (микро-батч) обрабатывает новые строки.


Архитектура

Text Only
Источник (Kafka/Files/Socket)
  readStream          ← определяет откуда читать
  Transformations     ← тот же DataFrame API (filter, join, groupBy)
  writeStream         ← определяет куда писать
  Sink (Kafka/Files/Console/JDBC)

Пример: чтение из Kafka → запись в Parquet

Python
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("Streaming ETL") \
    .getOrCreate()

# Читаем из Kafka
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .load()

# Парсим JSON из value
events = raw_stream \
    .select(
        F.col("key").cast("string"),
        F.from_json(F.col("value").cast("string"),
            "customer_id INT, event_type STRING, event_date TIMESTAMP, page_url STRING"
        ).alias("data"),
        F.col("timestamp").alias("kafka_ts")
    ) \
    .select("data.*", "kafka_ts")

# Трансформации — тот же API, что и batch
filtered = events.filter(F.col("event_type").isin("purchase", "add_to_cart"))

# Пишем в Parquet
query = filtered.writeStream \
    .format("parquet") \
    .option("path", "output/events_stream") \
    .option("checkpointLocation", "checkpoints/events") \
    .partitionBy("event_type") \
    .trigger(processingTime="1 minute") \
    .start()

query.awaitTermination()

Источники (Sources)

Источник Формат Типичное использование
Kafka kafka Основной для продакшн-стриминга
Файлы parquet, json, csv Мониторинг директории
Socket socket Только для тестов
Rate rate Генерация тестовых данных

Чтение файлов (File Source)

Python
# Spark мониторит директорию и обрабатывает новые файлы
stream = spark.readStream \
    .schema(my_schema) \
    .format("json") \
    .option("maxFilesPerTrigger", 10) \
    .load("input/events/")

Output Modes

Режим Описание Когда использовать
append Только новые строки Фильтрация, map-трансформации (без агрегаций)
update Только изменённые строки Агрегации с watermark
complete Все строки результата Агрегации без watermark (полный пересчёт)
Python
query = aggregated.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

Triggers — когда обрабатывать

Python
# Микро-батч каждые 30 секунд
.trigger(processingTime="30 seconds")

# Один раз (batch-like, полезно для backfill)
.trigger(once=True)

# Один раз, но без рестарта (Spark 3.3+)
.trigger(availableNow=True)

# Continuous processing (низкая латентность, ~1 ms, experimental)
.trigger(continuous="1 second")

Watermarks — работа с опозданиями

Реальные данные приходят с задержкой: событие произошло в 10:00, а попало в Kafka в 10:05. Watermark говорит Spark: «данные старше X минут можно игнорировать».

Python
from pyspark.sql.functions import window

events_with_watermark = events \
    .withWatermark("event_date", "10 minutes")

# Агрегация с окнами
event_counts = events_with_watermark \
    .groupBy(
        window("event_date", "5 minutes"),  # 5-минутные окна
        "event_type"
    ) \
    .count()

Как watermark работает

Text Only
Время событий:  10:00  10:02  10:05  10:03  10:08  10:01  10:12
Watermark (10 min):                                        ↑
                                                    max_event_time = 10:12
                                                    watermark = 10:12 - 10min = 10:02
                                                    → событие 10:01 отбрасывается
                                                    → событие 10:03 всё ещё принимается

Watermark — не строгая граница

Watermark — это нижняя граница. Spark может принять данные чуть старше watermark, но не гарантирует это. Для бизнес-критичных данных используй запас (watermark = 2 × макс ожидаемая задержка).


Checkpointing

Checkpoint сохраняет состояние стриминга (offsets, state) для recovery после сбоя. Обязательно для продакшена.

Python
query = df.writeStream \
    .option("checkpointLocation", "hdfs:///checkpoints/my_stream") \
    .start()

Что хранится: - Offsets (какие данные уже прочитаны) - State (промежуточные результаты агрегаций) - Commit log (какие батчи завершены)

Нельзя менять запрос и сохранить checkpoint

Если изменил схему, добавил/удалил столбцы в groupBy, или сменил source — удали checkpoint и запусти с нуля. Иначе — ошибки десериализации state.


foreachBatch — произвольная логика

foreachBatch даёт доступ к каждому микро-батчу как к обычному DataFrame. Полезно для:

  • Запись в несколько sinks
  • UPSERT (MERGE) в БД
  • Дедупликация
Python
def process_batch(batch_df, batch_id):
    # Дедупликация внутри батча
    deduped = batch_df.dropDuplicates(["event_id"])

    # Запись в Parquet
    deduped.write.mode("append").parquet("output/events")

    # Запись в PostgreSQL (JDBC)
    deduped.write.format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/etl_kitchen") \
        .option("dbtable", "events_stream") \
        .mode("append") \
        .save()

    print(f"Batch {batch_id}: {deduped.count()} rows")

query = events.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "checkpoints/events_multi") \
    .trigger(processingTime="1 minute") \
    .start()

Stateful операции

Агрегации с состоянием

Python
# Подсчёт событий за скользящие 10-минутные окна с шагом 5 минут
windowed = events \
    .withWatermark("event_date", "15 minutes") \
    .groupBy(
        window("event_date", "10 minutes", "5 minutes"),
        "event_type"
    ) \
    .agg(
        F.count("*").alias("event_count"),
        F.countDistinct("customer_id").alias("unique_users"),
    )

Дедупликация в потоке

Python
deduped = events \
    .withWatermark("event_date", "1 hour") \
    .dropDuplicates(["event_id"])

Stream-Static Join

Поток можно JOIN-ить с обычной (static) таблицей:

Python
# Static таблица (загружается один раз)
customers = spark.read.parquet("data/customers.parquet")

# Stream JOIN static
enriched = events_stream.join(customers, "customer_id", "left")

Static таблица не обновляется автоматически

Если справочник изменился, нужно перезапустить стрим или использовать Delta Lake с версионированием.


Exactly-Once семантика

Structured Streaming гарантирует exactly-once обработку при:

  1. Источник поддерживает replay (Kafka с offsets, файлы)
  2. Checkpoint настроен
  3. Sink идемпотентен (Parquet, Delta) или поддерживает транзакции

Для JDBC/custom sinks exactly-once нужно обеспечивать вручную (UPSERT, дедупликация по batch_id).


Проверь себя


Источники