Аналитические паттерны DE¶
Зачем это DE?¶
Data Engineer строит витрины, на которых аналитики принимают решения. Сессии, когорты, воронки, дедупликация, SCD — это не абстрактная теория, а ежедневные задачи в любом DWH.
Сессионизация (Gap-and-Island)¶
Задача¶
Есть таблица событий (events). Нужно разбить поток событий одного пользователя на сессии: если между событиями > 30 минут — новая сессия.
Решение¶
WITH events_with_gap AS (
SELECT
event_id,
customer_id,
event_date,
event_type,
-- Разница с предыдущим событием того же пользователя
event_date - LAG(event_date) OVER (
PARTITION BY customer_id ORDER BY event_date
) AS gap
FROM events
),
sessions_marked AS (
SELECT *,
-- Новая сессия, если gap > 30 минут или первое событие
SUM(CASE WHEN gap > interval '30 minutes' OR gap IS NULL
THEN 1 ELSE 0 END) OVER (
PARTITION BY customer_id ORDER BY event_date
) AS session_num
FROM events_with_gap
)
SELECT
customer_id,
session_num,
MIN(event_date) AS session_start,
MAX(event_date) AS session_end,
COUNT(*) AS events_count,
MAX(event_date) - MIN(event_date) AS duration
FROM sessions_marked
GROUP BY customer_id, session_num
ORDER BY customer_id, session_start;
Ключевая идея: LAG() → gap → SUM(CASE WHEN gap > threshold) → нарастающий счётчик сессий.
Когортный анализ¶
Задача¶
Определить retention по когортам: какой процент пользователей, зарегистрированных в месяце X, продолжает делать заказы в месяцах X+1, X+2, ...
Решение¶
WITH cohorts AS (
-- Когорта = месяц регистрации
SELECT
customer_id,
DATE_TRUNC('month', registered_at) AS cohort_month
FROM customers
),
activity AS (
-- Месяц каждого заказа
SELECT DISTINCT
o.customer_id,
DATE_TRUNC('month', o.order_date) AS activity_month
FROM orders o
),
cohort_data AS (
SELECT
c.cohort_month,
a.activity_month,
-- Номер месяца от регистрации
EXTRACT(YEAR FROM age(a.activity_month, c.cohort_month)) * 12 +
EXTRACT(MONTH FROM age(a.activity_month, c.cohort_month)) AS month_number,
COUNT(DISTINCT c.customer_id) AS users
FROM cohorts c
JOIN activity a USING (customer_id)
GROUP BY c.cohort_month, a.activity_month
),
cohort_sizes AS (
SELECT cohort_month, COUNT(*) AS cohort_size
FROM cohorts
GROUP BY cohort_month
)
SELECT
cd.cohort_month,
cs.cohort_size,
cd.month_number,
cd.users,
ROUND(100.0 * cd.users / cs.cohort_size, 1) AS retention_pct
FROM cohort_data cd
JOIN cohort_sizes cs USING (cohort_month)
WHERE cd.month_number BETWEEN 0 AND 6
ORDER BY cd.cohort_month, cd.month_number;
| cohort_month | cohort_size | month_number | users | retention_pct |
|---|---|---|---|---|
| 2025-01 | 50 | 0 | 50 | 100.0 |
| 2025-01 | 50 | 1 | 32 | 64.0 |
| 2025-01 | 50 | 2 | 25 | 50.0 |
| 2025-02 | 45 | 0 | 45 | 100.0 |
| 2025-02 | 45 | 1 | 28 | 62.2 |
Воронка конверсии¶
Задача¶
Пользователи проходят шаги: page_view → add_to_cart → checkout → purchase. Посчитать конверсию на каждом шаге.
Решение¶
WITH funnel AS (
SELECT
customer_id,
MAX(CASE WHEN event_type = 'page_view' THEN 1 END) AS step_1_view,
MAX(CASE WHEN event_type = 'add_to_cart' THEN 1 END) AS step_2_cart,
MAX(CASE WHEN event_type = 'checkout' THEN 1 END) AS step_3_checkout,
MAX(CASE WHEN event_type = 'purchase' THEN 1 END) AS step_4_purchase
FROM events
WHERE event_date >= '2025-01-01'
GROUP BY customer_id
)
SELECT
COUNT(*) FILTER (WHERE step_1_view = 1) AS views,
COUNT(*) FILTER (WHERE step_2_cart = 1) AS carts,
COUNT(*) FILTER (WHERE step_3_checkout = 1) AS checkouts,
COUNT(*) FILTER (WHERE step_4_purchase = 1) AS purchases,
ROUND(100.0 * COUNT(*) FILTER (WHERE step_2_cart = 1) /
NULLIF(COUNT(*) FILTER (WHERE step_1_view = 1), 0), 1) AS view_to_cart_pct,
ROUND(100.0 * COUNT(*) FILTER (WHERE step_4_purchase = 1) /
NULLIF(COUNT(*) FILTER (WHERE step_1_view = 1), 0), 1) AS overall_conversion_pct
FROM funnel;
Строгая воронка (с учётом порядка)¶
-- Только если шаги произошли В ПРАВИЛЬНОМ ПОРЯДКЕ
WITH ordered_events AS (
SELECT
customer_id,
event_type,
event_date,
MIN(event_date) FILTER (WHERE event_type = 'page_view') OVER w AS first_view,
MIN(event_date) FILTER (WHERE event_type = 'add_to_cart') OVER w AS first_cart,
MIN(event_date) FILTER (WHERE event_type = 'checkout') OVER w AS first_checkout,
MIN(event_date) FILTER (WHERE event_type = 'purchase') OVER w AS first_purchase
FROM events
WINDOW w AS (PARTITION BY customer_id)
)
SELECT DISTINCT customer_id
FROM ordered_events
WHERE first_view < first_cart
AND first_cart < first_checkout
AND first_checkout < first_purchase;
Дедупликация¶
По ROW_NUMBER¶
-- Оставить последнюю запись для каждого email
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY email
ORDER BY registered_at DESC
) AS rn
FROM customers
)
DELETE FROM customers
WHERE customer_id IN (
SELECT customer_id FROM ranked WHERE rn > 1
);
По DISTINCT ON (PostgreSQL)¶
-- Быстрее для SELECT (не для DELETE)
SELECT DISTINCT ON (email) *
FROM customers
ORDER BY email, registered_at DESC;
Поиск дублей с подсчётом¶
SELECT email, COUNT(*) AS cnt,
array_agg(customer_id ORDER BY registered_at) AS ids
FROM customers
GROUP BY email
HAVING COUNT(*) > 1;
SCD Type 2 через SQL¶
Slowly Changing Dimensions: отслеживание истории изменений.
Применение нового snapshot¶
-- Закрываем текущие записи, у которых изменились данные
UPDATE dim_customer SET
valid_to = CURRENT_DATE,
is_current = false
WHERE is_current = true
AND customer_id IN (
SELECT dc.customer_id
FROM dim_customer dc
JOIN customers c ON dc.customer_id = c.customer_id
WHERE dc.is_current = true
AND (dc.city != c.city OR dc.region != c.region)
);
-- Вставляем новые версии
INSERT INTO dim_customer (customer_id, name, city, region, segment, valid_from, valid_to, is_current)
SELECT
c.customer_id, c.name, c.city, c.region,
CASE
WHEN total.revenue > 100000 THEN 'VIP'
WHEN total.revenue > 10000 THEN 'Regular'
ELSE 'New'
END AS segment,
CURRENT_DATE, '9999-12-31', true
FROM customers c
LEFT JOIN (
SELECT customer_id, SUM(total_amount) AS revenue
FROM orders GROUP BY customer_id
) total USING (customer_id)
WHERE c.customer_id NOT IN (
SELECT customer_id FROM dim_customer WHERE is_current = true
);
Запрос к SCD2: состояние на дату¶
-- Город клиента на 2025-06-15
SELECT *
FROM dim_customer
WHERE customer_id = 42
AND valid_from <= '2025-06-15'
AND valid_to > '2025-06-15';
Gap-and-Island: поиск непрерывных последовательностей¶
Задача¶
Найти непрерывные дни активности каждого клиента (streak).
WITH daily_activity AS (
SELECT DISTINCT customer_id, order_date
FROM orders
),
islands AS (
SELECT
customer_id,
order_date,
-- Вычитаем порядковый номер из даты:
-- для непрерывных дат разность будет одинаковой
order_date - ROW_NUMBER() OVER (
PARTITION BY customer_id ORDER BY order_date
)::int AS island_id
FROM daily_activity
)
SELECT
customer_id,
MIN(order_date) AS streak_start,
MAX(order_date) AS streak_end,
COUNT(*) AS streak_days
FROM islands
GROUP BY customer_id, island_id
HAVING COUNT(*) >= 3 -- стрики от 3 дней
ORDER BY streak_days DESC;
Ключевая идея: date - ROW_NUMBER() даёт одинаковое значение для последовательных дат.
Running Total и Cumulative Metrics¶
-- Накопительная выручка по месяцам
SELECT
DATE_TRUNC('month', order_date) AS month,
SUM(total_amount) AS monthly_revenue,
SUM(SUM(total_amount)) OVER (ORDER BY DATE_TRUNC('month', order_date)) AS cumulative_revenue
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month;
Moving Average¶
-- Скользящее среднее за 7 дней
SELECT
order_date,
COUNT(*) AS daily_orders,
AVG(COUNT(*)) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS avg_7d
FROM orders
GROUP BY order_date
ORDER BY order_date;
FILTER — условные агрегаты¶
SELECT
DATE_TRUNC('month', order_date) AS month,
COUNT(*) AS total_orders,
COUNT(*) FILTER (WHERE status = 'delivered') AS delivered,
COUNT(*) FILTER (WHERE status = 'cancelled') AS cancelled,
SUM(total_amount) FILTER (WHERE status = 'delivered') AS delivered_revenue,
ROUND(
100.0 * COUNT(*) FILTER (WHERE status = 'cancelled') / COUNT(*), 1
) AS cancel_rate_pct
FROM orders
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month;
Что запомнить¶
- Сессионизация —
LAG()→ gap →SUM(CASE)— нарастающий счётчик сессий - Когорты —
DATE_TRUNC(registered_at)+ JOIN на активность → retention по месяцам - Воронки —
FILTER (WHERE event_type = ...)для подсчёта конверсии по шагам - Дедупликация —
ROW_NUMBER()+DISTINCT ON(PostgreSQL) — два основных подхода - SCD Type 2 —
valid_from/valid_to+is_currentдля отслеживания истории - Gap-and-Island —
date - ROW_NUMBER()даёт одинаковое значение для последовательных дат - FILTER — чище и быстрее, чем
CASE WHENдля условных агрегатов (PostgreSQL 9.4+)