Перейти к содержанию

Delta Lake

Delta Lake

Delta Lake — open-source storage layer, который добавляет ACID-транзакции поверх Parquet-файлов. Это стандарт де-факто для Data Lakehouse.

Зачем: обычный Parquet в Data Lake — это набор файлов без гарантий. Два процесса пишут одновременно → конфликт. Чтение во время записи → неконсистентные данные. Delta решает эти проблемы.


Архитектура

Text Only
Delta Table = Parquet файлы + Transaction Log (_delta_log/)

data/
├── _delta_log/
│   ├── 00000000000000000000.json   ← commit 0: CREATE TABLE
│   ├── 00000000000000000001.json   ← commit 1: INSERT
│   ├── 00000000000000000002.json   ← commit 2: UPDATE
│   └── 00000000000000000010.checkpoint.parquet  ← snapshot
├── part-00000-...-.parquet
├── part-00001-...-.parquet
└── part-00002-...-.parquet

Transaction Log (_delta_log/) — журнал всех операций. Каждый коммит — JSON-файл с описанием добавленных/удалённых файлов. Это обеспечивает ACID.


CRUD-операции

Создание и запись

Python
# Запись DataFrame как Delta
df.write.format("delta").mode("overwrite").save("data/orders_delta")

# Создание managed table
df.write.format("delta").saveAsTable("orders")

# Append
new_orders.write.format("delta").mode("append").save("data/orders_delta")

Чтение

Python
df = spark.read.format("delta").load("data/orders_delta")

# SQL
spark.sql("SELECT * FROM delta.`data/orders_delta` WHERE status = 'delivered'")

UPDATE

Python
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "data/orders_delta")

delta_table.update(
    condition="status = 'processing' AND order_date < '2024-06-01'",
    set={"status": "'cancelled'"}
)

DELETE

Python
delta_table.delete("status = 'cancelled' AND order_date < '2024-01-01'")

MERGE INTO — upsert

Самая мощная операция Delta. Обновляет существующие строки и вставляет новые за одну атомарную операцию.

Python
# Incoming data
updates = spark.read.parquet("data/orders_updates.parquet")

delta_table = DeltaTable.forPath(spark, "data/orders_delta")

delta_table.alias("target") \
    .merge(updates.alias("source"), "target.order_id = source.order_id") \
    .whenMatchedUpdate(set={
        "status": "source.status",
        "total_amount": "source.total_amount",
        "delivery_date": "source.delivery_date",
    }) \
    .whenNotMatchedInsertAll() \
    .execute()

MERGE с условиями

Python
delta_table.alias("t") \
    .merge(updates.alias("s"), "t.order_id = s.order_id") \
    .whenMatchedUpdate(
        condition="s.updated_at > t.updated_at",  # обновляем только если новее
        set={"status": "s.status", "total_amount": "s.total_amount"}
    ) \
    .whenMatchedDelete(
        condition="s.is_deleted = true"  # удаляем помеченные
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

MERGE заменяет антипаттерн read → filter → write

Без Delta: read → left_anti join → union → overwrite. С Delta: одна операция MERGE, атомарная и оптимизированная.


Time Travel

Delta хранит историю всех изменений. Можно читать данные на любой момент в прошлом.

Python
# По версии
df_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("data/orders_delta")

# По timestamp
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2025-03-14 10:00:00") \
    .load("data/orders_delta")
SQL
-- SQL
SELECT * FROM delta.`data/orders_delta` VERSION AS OF 5;
SELECT * FROM delta.`data/orders_delta` TIMESTAMP AS OF '2025-03-14';

История изменений

Python
delta_table = DeltaTable.forPath(spark, "data/orders_delta")
delta_table.history().show()

# version | timestamp           | operation | operationParameters
# 3       | 2025-03-15 10:00:00 | MERGE     | {predicate: ...}
# 2       | 2025-03-14 15:00:00 | UPDATE    | {predicate: status = ...}
# 1       | 2025-03-14 10:00:00 | WRITE     | {mode: Append}
# 0       | 2025-03-13 09:00:00 | WRITE     | {mode: Overwrite}

Восстановление

Python
# Откатить к версии 2
delta_table.restoreToVersion(2)

# Откатить к timestamp
delta_table.restoreToTimestamp("2025-03-14 15:00:00")

Schema Evolution

Python
# Разрешить добавление новых столбцов при записи
new_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("data/orders_delta")
Python
# Или глобально
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

Правила:

  • Новые столбцы добавляются с NULL для старых строк
  • Тип столбца можно расширить (INT → LONG, но не STRING → INT)
  • Удаление столбцов через ALTER TABLE ... DROP COLUMN (Delta 2.0+)

Оптимизация

OPTIMIZE — компактификация файлов

SQL
-- Объединить мелкие файлы в крупные (1 GB)
OPTIMIZE delta.`data/orders_delta`;

-- С Z-ORDER для ускорения запросов по customer_id
OPTIMIZE delta.`data/orders_delta` ZORDER BY (customer_id);

Z-ORDER

Z-ORDER перегруппирует данные так, чтобы строки с близкими значениями ключа оказались в одних файлах. Это ускоряет фильтрацию — Spark пропускает файлы через data skipping.

Text Only
Без Z-ORDER:          С Z-ORDER по customer_id:
File 1: [1,5,20,3]    File 1: [1,2,3,4,5]
File 2: [2,15,8,1]    File 2: [6,7,8,9,10]
File 3: [10,4,25,7]   File 3: [11,15,20,25]

WHERE customer_id = 3: WHERE customer_id = 3:
Читать все 3 файла     Читать только File 1!

VACUUM — удаление старых файлов

SQL
-- Удалить файлы старше 7 дней (дефолт)
VACUUM delta.`data/orders_delta`;

-- Удалить файлы старше 24 часов
VACUUM delta.`data/orders_delta` RETAIN 24 HOURS;

VACUUM удаляет возможность time travel

После VACUUM нельзя читать версии старше retention period. Не устанавливай RETAIN < 7 дней без веской причины.


Delta vs Iceberg vs Hudi

Критерий Delta Lake Apache Iceberg Apache Hudi
Создатель Databricks Netflix → Apache Uber → Apache
Формат данных Parquet Parquet, ORC, Avro Parquet, ORC
Transaction Log JSON + checkpoint Metadata (Avro/JSON) Timeline
MERGE Да Да (Spark 3.x) Да (core feature)
Time Travel Да Да Ограниченно
Z-ORDER Да Sort Order (hidden partitioning) Clustering
Партиционирование Явное (Hive-style) Hidden partitioning Явное
Multi-engine Spark, Flink, Trino, DuckDB Spark, Flink, Trino, DuckDB Spark, Flink
OSS vs Managed OSS + Databricks Полностью OSS Полностью OSS

Выбор:

  • Delta Lake — если используешь Databricks или Spark-first стек
  • Iceberg — если нужен multi-engine (Spark + Trino + Flink), hidden partitioning
  • Hudi — если основной паттерн — CDC/upsert (Uber-стиль)

Проверь себя


Источники