Structured Streaming
Structured Streaming¶
Structured Streaming — потоковая обработка данных в Spark, построенная поверх DataFrame API. Ты пишешь тот же код, что и для batch, но данные приходят непрерывно.
Ключевая идея: поток — это бесконечная таблица, к которой постоянно добавляются строки. Каждый trigger (микро-батч) обрабатывает новые строки.
Архитектура¶
Источник (Kafka/Files/Socket)
↓
readStream ← определяет откуда читать
↓
Transformations ← тот же DataFrame API (filter, join, groupBy)
↓
writeStream ← определяет куда писать
↓
Sink (Kafka/Files/Console/JDBC)
Пример: чтение из Kafka → запись в Parquet¶
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder \
.appName("Streaming ETL") \
.getOrCreate()
# Читаем из Kafka
raw_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Парсим JSON из value
events = raw_stream \
.select(
F.col("key").cast("string"),
F.from_json(F.col("value").cast("string"),
"customer_id INT, event_type STRING, event_date TIMESTAMP, page_url STRING"
).alias("data"),
F.col("timestamp").alias("kafka_ts")
) \
.select("data.*", "kafka_ts")
# Трансформации — тот же API, что и batch
filtered = events.filter(F.col("event_type").isin("purchase", "add_to_cart"))
# Пишем в Parquet
query = filtered.writeStream \
.format("parquet") \
.option("path", "output/events_stream") \
.option("checkpointLocation", "checkpoints/events") \
.partitionBy("event_type") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
Источники (Sources)¶
| Источник | Формат | Типичное использование |
|---|---|---|
| Kafka | kafka |
Основной для продакшн-стриминга |
| Файлы | parquet, json, csv |
Мониторинг директории |
| Socket | socket |
Только для тестов |
| Rate | rate |
Генерация тестовых данных |
Чтение файлов (File Source)¶
# Spark мониторит директорию и обрабатывает новые файлы
stream = spark.readStream \
.schema(my_schema) \
.format("json") \
.option("maxFilesPerTrigger", 10) \
.load("input/events/")
Output Modes¶
| Режим | Описание | Когда использовать |
|---|---|---|
append |
Только новые строки | Фильтрация, map-трансформации (без агрегаций) |
update |
Только изменённые строки | Агрегации с watermark |
complete |
Все строки результата | Агрегации без watermark (полный пересчёт) |
Triggers — когда обрабатывать¶
# Микро-батч каждые 30 секунд
.trigger(processingTime="30 seconds")
# Один раз (batch-like, полезно для backfill)
.trigger(once=True)
# Один раз, но без рестарта (Spark 3.3+)
.trigger(availableNow=True)
# Continuous processing (низкая латентность, ~1 ms, experimental)
.trigger(continuous="1 second")
Watermarks — работа с опозданиями¶
Реальные данные приходят с задержкой: событие произошло в 10:00, а попало в Kafka в 10:05. Watermark говорит Spark: «данные старше X минут можно игнорировать».
from pyspark.sql.functions import window
events_with_watermark = events \
.withWatermark("event_date", "10 minutes")
# Агрегация с окнами
event_counts = events_with_watermark \
.groupBy(
window("event_date", "5 minutes"), # 5-минутные окна
"event_type"
) \
.count()
Как watermark работает¶
Время событий: 10:00 10:02 10:05 10:03 10:08 10:01 10:12
Watermark (10 min): ↑
max_event_time = 10:12
watermark = 10:12 - 10min = 10:02
→ событие 10:01 отбрасывается
→ событие 10:03 всё ещё принимается
Watermark — не строгая граница
Watermark — это нижняя граница. Spark может принять данные чуть старше watermark, но не гарантирует это. Для бизнес-критичных данных используй запас (watermark = 2 × макс ожидаемая задержка).
Checkpointing¶
Checkpoint сохраняет состояние стриминга (offsets, state) для recovery после сбоя. Обязательно для продакшена.
query = df.writeStream \
.option("checkpointLocation", "hdfs:///checkpoints/my_stream") \
.start()
Что хранится: - Offsets (какие данные уже прочитаны) - State (промежуточные результаты агрегаций) - Commit log (какие батчи завершены)
Нельзя менять запрос и сохранить checkpoint
Если изменил схему, добавил/удалил столбцы в groupBy, или сменил source — удали checkpoint и запусти с нуля. Иначе — ошибки десериализации state.
foreachBatch — произвольная логика¶
foreachBatch даёт доступ к каждому микро-батчу как к обычному DataFrame. Полезно для:
- Запись в несколько sinks
- UPSERT (MERGE) в БД
- Дедупликация
def process_batch(batch_df, batch_id):
# Дедупликация внутри батча
deduped = batch_df.dropDuplicates(["event_id"])
# Запись в Parquet
deduped.write.mode("append").parquet("output/events")
# Запись в PostgreSQL (JDBC)
deduped.write.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/etl_kitchen") \
.option("dbtable", "events_stream") \
.mode("append") \
.save()
print(f"Batch {batch_id}: {deduped.count()} rows")
query = events.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", "checkpoints/events_multi") \
.trigger(processingTime="1 minute") \
.start()
Stateful операции¶
Агрегации с состоянием¶
# Подсчёт событий за скользящие 10-минутные окна с шагом 5 минут
windowed = events \
.withWatermark("event_date", "15 minutes") \
.groupBy(
window("event_date", "10 minutes", "5 minutes"),
"event_type"
) \
.agg(
F.count("*").alias("event_count"),
F.countDistinct("customer_id").alias("unique_users"),
)
Дедупликация в потоке¶
Stream-Static Join¶
Поток можно JOIN-ить с обычной (static) таблицей:
# Static таблица (загружается один раз)
customers = spark.read.parquet("data/customers.parquet")
# Stream JOIN static
enriched = events_stream.join(customers, "customer_id", "left")
Static таблица не обновляется автоматически
Если справочник изменился, нужно перезапустить стрим или использовать Delta Lake с версионированием.
Exactly-Once семантика¶
Structured Streaming гарантирует exactly-once обработку при:
- Источник поддерживает replay (Kafka с offsets, файлы)
- Checkpoint настроен
- Sink идемпотентен (Parquet, Delta) или поддерживает транзакции
Для JDBC/custom sinks exactly-once нужно обеспечивать вручную (UPSERT, дедупликация по batch_id).
Проверь себя¶
Источники¶
- Spark: Structured Streaming Guide — официальная документация
- Spark: Kafka Integration — Kafka source/sink