Перейти к содержанию

Great Expectations — валидация данных

Great Expectations (GX) — Python-фреймворк для валидации данных. Ты описываешь ожидания (expectations) к данным, запускаешь проверки — и GX сообщает, какие правила нарушены. Результаты доступны в виде HTML-отчёта (Data Docs).


Зачем Great Expectations

Без GX С GX
Проверки — ручные SQL-запросы Декларативные expectations
Результаты в терминале Data Docs — интерактивный HTML-отчёт
Нет истории проверок Checkpoints с логированием
Сложно интегрировать в пайплайн Встроенная интеграция с Airflow

Установка

Bash
pip install great_expectations

Создание проекта:

Bash
great_expectations init

Команда создаёт структуру:

Text Only
great_expectations/
├── great_expectations.yml   # Конфигурация проекта
├── expectations/            # Сохранённые наборы ожиданий
├── checkpoints/             # Конфигурации запуска проверок
├── plugins/                 # Кастомные expectations
└── uncommitted/             # Data Docs и временные файлы
    └── data_docs/

GX Core vs GX Cloud

Great Expectations существует в двух версиях: GX Core (open source, локальный) и GX Cloud (SaaS с дашбордом и алертами). Для старта достаточно GX Core.


Основные концепции

Data Context

Центральный объект — точка входа в GX:

Python
import great_expectations as gx

context = gx.get_context()

get_context() находит great_expectations.yml в текущей директории или создаёт ephemeral-контекст в памяти.

Data Source

Подключение к данным. GX поддерживает pandas DataFrame, SQL-базы, Spark:

Python
# Подключение к PostgreSQL
datasource = context.data_sources.add_postgres(
    name="my_postgres",
    connection_string="postgresql://user:pass@localhost:5432/mydb"
)

# Определение Data Asset (таблицы)
asset = datasource.add_table_asset(
    name="orders",
    table_name="orders"
)

# Создание Batch Definition
batch_def = asset.add_batch_definition_whole_table("full_table")

Expectations

Expectations — правила, которым данные должны соответствовать. Каждое expectation — метод, который проверяет конкретное свойство:

Python
import great_expectations.expectations as gxe

# Колонка не содержит NULL
gxe.ExpectColumnValuesToNotBeNull(column="order_id")

# Значения в допустимом диапазоне
gxe.ExpectColumnValuesToBeBetween(
    column="total_amount",
    min_value=0,
    max_value=1000000
)

# Значения из списка
gxe.ExpectColumnValuesToBeInSet(
    column="status",
    value_set=["paid", "pending", "cancelled", "refunded"]
)

# Уникальность
gxe.ExpectColumnValuesToBeUnique(column="order_id")

Expectation Suite

Набор expectations для одного Data Asset:

Python
suite = context.suites.add(
    gx.ExpectationSuite(name="orders_suite")
)

suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gxe.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["paid", "pending", "cancelled", "refunded"]
    )
)
suite.add_expectation(
    gxe.ExpectColumnValuesToBeBetween(
        column="total_amount",
        min_value=0
    )
)

Пример: валидация CSV с продажами

Полный пример от загрузки до отчёта:

Python
import great_expectations as gx
import great_expectations.expectations as gxe

# 1. Создаём контекст
context = gx.get_context()

# 2. Подключаем CSV
datasource = context.data_sources.add_pandas_filesystem(
    name="local_files",
    base_directory="./data"
)

asset = datasource.add_csv_asset(
    name="sales",
    batching_regex=r"sales_(?P<year>\d{4})\.csv"
)

batch_def = asset.add_batch_definition_monthly(
    name="monthly", column="year"
)

# 3. Создаём набор ожиданий
suite = context.suites.add(
    gx.ExpectationSuite(name="sales_quality")
)

suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="sale_id")
)
suite.add_expectation(
    gxe.ExpectColumnValuesToBeUnique(column="sale_id")
)
suite.add_expectation(
    gxe.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0, max_value=100000
    )
)
suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(
        column="payment_type",
        value_set=["card", "cash", "transfer"]
    )
)

# 4. Создаём и запускаем Validation Definition
validation_def = context.validation_definitions.add(
    gx.ValidationDefinition(
        name="sales_validation",
        data=batch_def,
        suite=suite
    )
)

# 5. Запускаем проверку
results = validation_def.run()

# 6. Смотрим результаты
print(f"Success: {results.success}")
for r in results.results:
    exp = r.expectation_config.type
    col = r.expectation_config.kwargs.get("column", "")
    ok = "✓" if r.success else "✗"
    print(f"  {ok} {exp} ({col})")

Пример вывода:

Text Only
Success: False
  ✓ expect_column_values_to_not_be_null (sale_id)
  ✓ expect_column_values_to_be_unique (sale_id)
  ✗ expect_column_values_to_be_between (amount)
  ✓ expect_column_values_to_not_be_null (customer_id)
  ✓ expect_column_values_to_be_in_set (payment_type)

Data Docs — автодокументация

GX генерирует HTML-отчёт с результатами всех проверок:

Python
context.build_data_docs()
context.open_data_docs()

Data Docs показывают:

  • Список проверенных Data Assets — какие таблицы/файлы проверялись
  • Результаты по каждому expectation — пройден или нет, процент нарушений
  • Детали нарушений — конкретные значения, которые не прошли проверку
  • История запусков — когда проверялось, сколько строк обработано

Data Docs как документация качества

Data Docs можно захостить на внутреннем сервере. Аналитики и менеджеры видят актуальный статус качества данных без доступа к коду.


Checkpoints

Checkpoint объединяет Validation Definition с действиями (actions) — что делать после проверки:

Python
checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="sales_checkpoint",
        validation_definitions=[validation_def],
        actions=[
            gx.checkpoint.UpdateDataDocsAction(
                name="update_data_docs"
            )
        ]
    )
)

# Запуск
result = checkpoint.run()

Actions могут:

  • Обновить Data Docs
  • Отправить уведомление в Slack
  • Записать результат в базу данных
  • Остановить пайплайн при провале

Интеграция с Airflow

GX можно вызывать из Airflow DAG как Python-задачу:

Python
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False
)
def data_quality_pipeline():

    @task
    def validate_orders():
        import great_expectations as gx

        context = gx.get_context(
            project_root_dir="/opt/airflow/great_expectations"
        )
        checkpoint = context.checkpoints.get("orders_checkpoint")
        result = checkpoint.run()

        if not result.success:
            raise ValueError(
                "Data quality check failed! "
                "Check Data Docs for details."
            )

    @task
    def build_marts():
        # dbt run или другая трансформация
        pass

    # Сначала проверяем качество, потом строим витрины
    validate_orders() >> build_marts()

data_quality_pipeline()

Проверяй данные ДО трансформаций

Размести проверку качества между загрузкой (EL) и трансформацией (T). Если данные плохие — пайплайн останавливается до того, как испорченные данные попадут в витрины.


Популярные expectations

Expectation Что проверяет
expect_column_values_to_not_be_null Нет NULL
expect_column_values_to_be_unique Нет дубликатов
expect_column_values_to_be_in_set Значения из списка
expect_column_values_to_be_between Значения в диапазоне
expect_column_values_to_match_regex Значения по regex
expect_table_row_count_to_be_between Количество строк в диапазоне
expect_column_pair_values_a_to_be_greater_than_b Сравнение двух колонок
expect_column_values_to_be_of_type Тип данных колонки
expect_compound_columns_to_be_unique Уникальность составного ключа

Всего в GX более 300 встроенных expectations. Полный список — в документации.


Что запомнить

  • GX = expectations (правила) + Data Docs (отчёт) + Checkpoints (запуск)
  • Начни с 4 базовых: not_null, unique, be_in_set, be_between
  • Data Docs — HTML-отчёт для всей команды, не только для инженеров
  • Checkpoints интегрируются с Airflow через Python-задачи
  • Проверяй данные до трансформаций — между EL и T

Проверь себя


Источники