Great Expectations — валидация данных¶
Great Expectations (GX) — Python-фреймворк для валидации данных. Ты описываешь ожидания (expectations) к данным, запускаешь проверки — и GX сообщает, какие правила нарушены. Результаты доступны в виде HTML-отчёта (Data Docs).
Зачем Great Expectations¶
| Без GX | С GX |
|---|---|
| Проверки — ручные SQL-запросы | Декларативные expectations |
| Результаты в терминале | Data Docs — интерактивный HTML-отчёт |
| Нет истории проверок | Checkpoints с логированием |
| Сложно интегрировать в пайплайн | Встроенная интеграция с Airflow |
Установка¶
Создание проекта:
Команда создаёт структуру:
great_expectations/
├── great_expectations.yml # Конфигурация проекта
├── expectations/ # Сохранённые наборы ожиданий
├── checkpoints/ # Конфигурации запуска проверок
├── plugins/ # Кастомные expectations
└── uncommitted/ # Data Docs и временные файлы
└── data_docs/
GX Core vs GX Cloud
Great Expectations существует в двух версиях: GX Core (open source, локальный) и GX Cloud (SaaS с дашбордом и алертами). Для старта достаточно GX Core.
Основные концепции¶
Data Context¶
Центральный объект — точка входа в GX:
get_context() находит great_expectations.yml в текущей директории или создаёт ephemeral-контекст в памяти.
Data Source¶
Подключение к данным. GX поддерживает pandas DataFrame, SQL-базы, Spark:
# Подключение к PostgreSQL
datasource = context.data_sources.add_postgres(
name="my_postgres",
connection_string="postgresql://user:pass@localhost:5432/mydb"
)
# Определение Data Asset (таблицы)
asset = datasource.add_table_asset(
name="orders",
table_name="orders"
)
# Создание Batch Definition
batch_def = asset.add_batch_definition_whole_table("full_table")
Expectations¶
Expectations — правила, которым данные должны соответствовать. Каждое expectation — метод, который проверяет конкретное свойство:
import great_expectations.expectations as gxe
# Колонка не содержит NULL
gxe.ExpectColumnValuesToNotBeNull(column="order_id")
# Значения в допустимом диапазоне
gxe.ExpectColumnValuesToBeBetween(
column="total_amount",
min_value=0,
max_value=1000000
)
# Значения из списка
gxe.ExpectColumnValuesToBeInSet(
column="status",
value_set=["paid", "pending", "cancelled", "refunded"]
)
# Уникальность
gxe.ExpectColumnValuesToBeUnique(column="order_id")
Expectation Suite¶
Набор expectations для одного Data Asset:
suite = context.suites.add(
gx.ExpectationSuite(name="orders_suite")
)
suite.add_expectation(
gxe.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gxe.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gxe.ExpectColumnValuesToBeInSet(
column="status",
value_set=["paid", "pending", "cancelled", "refunded"]
)
)
suite.add_expectation(
gxe.ExpectColumnValuesToBeBetween(
column="total_amount",
min_value=0
)
)
Пример: валидация CSV с продажами¶
Полный пример от загрузки до отчёта:
import great_expectations as gx
import great_expectations.expectations as gxe
# 1. Создаём контекст
context = gx.get_context()
# 2. Подключаем CSV
datasource = context.data_sources.add_pandas_filesystem(
name="local_files",
base_directory="./data"
)
asset = datasource.add_csv_asset(
name="sales",
batching_regex=r"sales_(?P<year>\d{4})\.csv"
)
batch_def = asset.add_batch_definition_monthly(
name="monthly", column="year"
)
# 3. Создаём набор ожиданий
suite = context.suites.add(
gx.ExpectationSuite(name="sales_quality")
)
suite.add_expectation(
gxe.ExpectColumnValuesToNotBeNull(column="sale_id")
)
suite.add_expectation(
gxe.ExpectColumnValuesToBeUnique(column="sale_id")
)
suite.add_expectation(
gxe.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=100000
)
)
suite.add_expectation(
gxe.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gxe.ExpectColumnValuesToBeInSet(
column="payment_type",
value_set=["card", "cash", "transfer"]
)
)
# 4. Создаём и запускаем Validation Definition
validation_def = context.validation_definitions.add(
gx.ValidationDefinition(
name="sales_validation",
data=batch_def,
suite=suite
)
)
# 5. Запускаем проверку
results = validation_def.run()
# 6. Смотрим результаты
print(f"Success: {results.success}")
for r in results.results:
exp = r.expectation_config.type
col = r.expectation_config.kwargs.get("column", "")
ok = "✓" if r.success else "✗"
print(f" {ok} {exp} ({col})")
Пример вывода:
Success: False
✓ expect_column_values_to_not_be_null (sale_id)
✓ expect_column_values_to_be_unique (sale_id)
✗ expect_column_values_to_be_between (amount)
✓ expect_column_values_to_not_be_null (customer_id)
✓ expect_column_values_to_be_in_set (payment_type)
Data Docs — автодокументация¶
GX генерирует HTML-отчёт с результатами всех проверок:
Data Docs показывают:
- Список проверенных Data Assets — какие таблицы/файлы проверялись
- Результаты по каждому expectation — пройден или нет, процент нарушений
- Детали нарушений — конкретные значения, которые не прошли проверку
- История запусков — когда проверялось, сколько строк обработано
Data Docs как документация качества
Data Docs можно захостить на внутреннем сервере. Аналитики и менеджеры видят актуальный статус качества данных без доступа к коду.
Checkpoints¶
Checkpoint объединяет Validation Definition с действиями (actions) — что делать после проверки:
checkpoint = context.checkpoints.add(
gx.Checkpoint(
name="sales_checkpoint",
validation_definitions=[validation_def],
actions=[
gx.checkpoint.UpdateDataDocsAction(
name="update_data_docs"
)
]
)
)
# Запуск
result = checkpoint.run()
Actions могут:
- Обновить Data Docs
- Отправить уведомление в Slack
- Записать результат в базу данных
- Остановить пайплайн при провале
Интеграция с Airflow¶
GX можно вызывать из Airflow DAG как Python-задачу:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False
)
def data_quality_pipeline():
@task
def validate_orders():
import great_expectations as gx
context = gx.get_context(
project_root_dir="/opt/airflow/great_expectations"
)
checkpoint = context.checkpoints.get("orders_checkpoint")
result = checkpoint.run()
if not result.success:
raise ValueError(
"Data quality check failed! "
"Check Data Docs for details."
)
@task
def build_marts():
# dbt run или другая трансформация
pass
# Сначала проверяем качество, потом строим витрины
validate_orders() >> build_marts()
data_quality_pipeline()
Проверяй данные ДО трансформаций
Размести проверку качества между загрузкой (EL) и трансформацией (T). Если данные плохие — пайплайн останавливается до того, как испорченные данные попадут в витрины.
Популярные expectations¶
| Expectation | Что проверяет |
|---|---|
expect_column_values_to_not_be_null |
Нет NULL |
expect_column_values_to_be_unique |
Нет дубликатов |
expect_column_values_to_be_in_set |
Значения из списка |
expect_column_values_to_be_between |
Значения в диапазоне |
expect_column_values_to_match_regex |
Значения по regex |
expect_table_row_count_to_be_between |
Количество строк в диапазоне |
expect_column_pair_values_a_to_be_greater_than_b |
Сравнение двух колонок |
expect_column_values_to_be_of_type |
Тип данных колонки |
expect_compound_columns_to_be_unique |
Уникальность составного ключа |
Всего в GX более 300 встроенных expectations. Полный список — в документации.
Что запомнить¶
- GX = expectations (правила) + Data Docs (отчёт) + Checkpoints (запуск)
- Начни с 4 базовых:
not_null,unique,be_in_set,be_between - Data Docs — HTML-отчёт для всей команды, не только для инженеров
- Checkpoints интегрируются с Airflow через Python-задачи
- Проверяй данные до трансформаций — между EL и T
Проверь себя¶
Источники¶
- Great Expectations Documentation — официальная документация GX
- GX Core: Set up a GX environment — установка и настройка
- GX Core: Expectations overview — обзор expectations