Delta Lake
Delta Lake¶
Delta Lake — open-source storage layer, который добавляет ACID-транзакции поверх Parquet-файлов. Это стандарт де-факто для Data Lakehouse.
Зачем: обычный Parquet в Data Lake — это набор файлов без гарантий. Два процесса пишут одновременно → конфликт. Чтение во время записи → неконсистентные данные. Delta решает эти проблемы.
Архитектура¶
Delta Table = Parquet файлы + Transaction Log (_delta_log/)
data/
├── _delta_log/
│ ├── 00000000000000000000.json ← commit 0: CREATE TABLE
│ ├── 00000000000000000001.json ← commit 1: INSERT
│ ├── 00000000000000000002.json ← commit 2: UPDATE
│ └── 00000000000000000010.checkpoint.parquet ← snapshot
├── part-00000-...-.parquet
├── part-00001-...-.parquet
└── part-00002-...-.parquet
Transaction Log (_delta_log/) — журнал всех операций. Каждый коммит — JSON-файл с описанием добавленных/удалённых файлов. Это обеспечивает ACID.
CRUD-операции¶
Создание и запись¶
# Запись DataFrame как Delta
df.write.format("delta").mode("overwrite").save("data/orders_delta")
# Создание managed table
df.write.format("delta").saveAsTable("orders")
# Append
new_orders.write.format("delta").mode("append").save("data/orders_delta")
Чтение¶
df = spark.read.format("delta").load("data/orders_delta")
# SQL
spark.sql("SELECT * FROM delta.`data/orders_delta` WHERE status = 'delivered'")
UPDATE¶
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "data/orders_delta")
delta_table.update(
condition="status = 'processing' AND order_date < '2024-06-01'",
set={"status": "'cancelled'"}
)
DELETE¶
MERGE INTO — upsert¶
Самая мощная операция Delta. Обновляет существующие строки и вставляет новые за одну атомарную операцию.
# Incoming data
updates = spark.read.parquet("data/orders_updates.parquet")
delta_table = DeltaTable.forPath(spark, "data/orders_delta")
delta_table.alias("target") \
.merge(updates.alias("source"), "target.order_id = source.order_id") \
.whenMatchedUpdate(set={
"status": "source.status",
"total_amount": "source.total_amount",
"delivery_date": "source.delivery_date",
}) \
.whenNotMatchedInsertAll() \
.execute()
MERGE с условиями¶
delta_table.alias("t") \
.merge(updates.alias("s"), "t.order_id = s.order_id") \
.whenMatchedUpdate(
condition="s.updated_at > t.updated_at", # обновляем только если новее
set={"status": "s.status", "total_amount": "s.total_amount"}
) \
.whenMatchedDelete(
condition="s.is_deleted = true" # удаляем помеченные
) \
.whenNotMatchedInsertAll() \
.execute()
MERGE заменяет антипаттерн read → filter → write
Без Delta: read → left_anti join → union → overwrite. С Delta: одна операция MERGE, атомарная и оптимизированная.
Time Travel¶
Delta хранит историю всех изменений. Можно читать данные на любой момент в прошлом.
# По версии
df_v0 = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("data/orders_delta")
# По timestamp
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2025-03-14 10:00:00") \
.load("data/orders_delta")
-- SQL
SELECT * FROM delta.`data/orders_delta` VERSION AS OF 5;
SELECT * FROM delta.`data/orders_delta` TIMESTAMP AS OF '2025-03-14';
История изменений¶
delta_table = DeltaTable.forPath(spark, "data/orders_delta")
delta_table.history().show()
# version | timestamp | operation | operationParameters
# 3 | 2025-03-15 10:00:00 | MERGE | {predicate: ...}
# 2 | 2025-03-14 15:00:00 | UPDATE | {predicate: status = ...}
# 1 | 2025-03-14 10:00:00 | WRITE | {mode: Append}
# 0 | 2025-03-13 09:00:00 | WRITE | {mode: Overwrite}
Восстановление¶
# Откатить к версии 2
delta_table.restoreToVersion(2)
# Откатить к timestamp
delta_table.restoreToTimestamp("2025-03-14 15:00:00")
Schema Evolution¶
# Разрешить добавление новых столбцов при записи
new_df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("data/orders_delta")
Правила:
- Новые столбцы добавляются с NULL для старых строк
- Тип столбца можно расширить (INT → LONG, но не STRING → INT)
- Удаление столбцов через
ALTER TABLE ... DROP COLUMN(Delta 2.0+)
Оптимизация¶
OPTIMIZE — компактификация файлов¶
-- Объединить мелкие файлы в крупные (1 GB)
OPTIMIZE delta.`data/orders_delta`;
-- С Z-ORDER для ускорения запросов по customer_id
OPTIMIZE delta.`data/orders_delta` ZORDER BY (customer_id);
Z-ORDER¶
Z-ORDER перегруппирует данные так, чтобы строки с близкими значениями ключа оказались в одних файлах. Это ускоряет фильтрацию — Spark пропускает файлы через data skipping.
Без Z-ORDER: С Z-ORDER по customer_id:
File 1: [1,5,20,3] File 1: [1,2,3,4,5]
File 2: [2,15,8,1] File 2: [6,7,8,9,10]
File 3: [10,4,25,7] File 3: [11,15,20,25]
WHERE customer_id = 3: WHERE customer_id = 3:
Читать все 3 файла Читать только File 1!
VACUUM — удаление старых файлов¶
-- Удалить файлы старше 7 дней (дефолт)
VACUUM delta.`data/orders_delta`;
-- Удалить файлы старше 24 часов
VACUUM delta.`data/orders_delta` RETAIN 24 HOURS;
VACUUM удаляет возможность time travel
После VACUUM нельзя читать версии старше retention period. Не устанавливай RETAIN < 7 дней без веской причины.
Delta vs Iceberg vs Hudi¶
| Критерий | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| Создатель | Databricks | Netflix → Apache | Uber → Apache |
| Формат данных | Parquet | Parquet, ORC, Avro | Parquet, ORC |
| Transaction Log | JSON + checkpoint | Metadata (Avro/JSON) | Timeline |
| MERGE | Да | Да (Spark 3.x) | Да (core feature) |
| Time Travel | Да | Да | Ограниченно |
| Z-ORDER | Да | Sort Order (hidden partitioning) | Clustering |
| Партиционирование | Явное (Hive-style) | Hidden partitioning | Явное |
| Multi-engine | Spark, Flink, Trino, DuckDB | Spark, Flink, Trino, DuckDB | Spark, Flink |
| OSS vs Managed | OSS + Databricks | Полностью OSS | Полностью OSS |
Выбор:
- Delta Lake — если используешь Databricks или Spark-first стек
- Iceberg — если нужен multi-engine (Spark + Trino + Flink), hidden partitioning
- Hudi — если основной паттерн — CDC/upsert (Uber-стиль)
Проверь себя¶
Источники¶
- Delta Lake Documentation — официальная документация
- Delta Lake: MERGE — MERGE INTO
- Delta Lake: Optimizations — OPTIMIZE, Z-ORDER, VACUUM