Перейти к содержанию

Конкурентность и параллелизм

Зачем это DE?

Data Engineer работает с I/O: API, базы данных, файлы. Конкурентность позволяет не ждать каждый ответ последовательно, а запускать десятки запросов параллельно. Разница — минуты vs часы.


Три модели: threading, multiprocessing, asyncio

Text Only
┌──────────────────┬──────────────┬────────────────┬──────────────┐
│                  │  threading   │ multiprocessing │   asyncio    │
├──────────────────┼──────────────┼────────────────┼──────────────┤
│ Параллелизм      │ ❌ (GIL)     │ ✅ (процессы)   │ ❌ (1 поток)  │
│ Конкурентность   │ ✅            │ ✅              │ ✅            │
│ I/O-bound        │ ✅ хорошо    │ ⚠️ избыточно   │ ✅ отлично    │
│ CPU-bound        │ ❌ бесполезно│ ✅ отлично      │ ❌ бесполезно │
│ Память на задачу │ ~8 KB (поток)│ ~30 MB (процесс)│ ~1 KB (корутина)│
│ Типичное кол-во  │ 10-50        │ CPU cores      │ 1000+         │
└──────────────────┴──────────────┴────────────────┴──────────────┘

GIL — Global Interpreter Lock

GIL не даёт нескольким потокам выполнять Python-байткод одновременно. Потоки переключаются, но не работают параллельно.

Исключение: GIL отпускается при I/O (сеть, диск, sleep). Поэтому threading помогает для I/O-bound задач.


threading — параллельный I/O

ThreadPoolExecutor

Python
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

urls = [f"https://api.example.com/page/{i}" for i in range(100)]

def fetch(url):
    resp = requests.get(url, timeout=10)
    return resp.json()

# 20 потоков — 20 запросов одновременно
with ThreadPoolExecutor(max_workers=20) as pool:
    futures = {pool.submit(fetch, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
            process(data)
        except Exception as e:
            print(f"Ошибка {url}: {e}")

Практический пример: параллельная выгрузка из нескольких БД

Python
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import create_engine, text

sources = {
    "crm": "postgresql://crm_host/crm",
    "billing": "postgresql://billing_host/billing",
    "logs": "postgresql://logs_host/logs",
}

def extract_from(name, dsn):
    engine = create_engine(dsn)
    with engine.connect() as conn:
        rows = conn.execute(text("SELECT * FROM daily_stats")).fetchall()
    engine.dispose()
    return name, rows

with ThreadPoolExecutor(max_workers=len(sources)) as pool:
    futures = [pool.submit(extract_from, name, dsn)
               for name, dsn in sources.items()]
    for future in as_completed(futures):
        name, rows = future.result()
        print(f"{name}: {len(rows)} строк")

multiprocessing — настоящий параллелизм

Для CPU-bound задач (парсинг, трансформации, сериализация) — каждый процесс имеет свой GIL.

ProcessPoolExecutor

Python
from concurrent.futures import ProcessPoolExecutor
import pandas as pd

files = ["chunk_001.parquet", "chunk_002.parquet", ..., "chunk_100.parquet"]

def transform_chunk(path):
    df = pd.read_parquet(path)
    df["revenue"] = df["quantity"] * df["price"]
    df = df[df["revenue"] > 0]
    return df

# Обрабатываем файлы на всех ядрах
with ProcessPoolExecutor() as pool:  # max_workers = cpu_count()
    results = list(pool.map(transform_chunk, files))

final_df = pd.concat(results, ignore_index=True)

Сериализация через pickle

Данные между процессами передаются через pickle. Нельзя передавать соединения БД, лямбды, генераторы. Передавай пути к файлам или сериализуемые данные.

Когда НЕ использовать multiprocessing

  • Задач мало (overhead на создание процесса > пользы)
  • Данные огромные (сериализация в pickle медленная)
  • Нужен shared state (используй multiprocessing.Manager или Redis)

asyncio — тысячи одновременных I/O

Основы: async/await

Python
import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as resp:
        return await resp.json()

async def main():
    urls = [f"https://api.example.com/page/{i}" for i in range(1000)]

    async with aiohttp.ClientSession() as session:
        # Запускаем ВСЕ 1000 запросов «одновременно»
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, Exception):
            print(f"Ошибка: {result}")
        else:
            process(result)

asyncio.run(main())

Семафор — ограничение конкурентности

Python
async def fetch_with_limit(sem, session, url):
    async with sem:  # не более N одновременных запросов
        async with session.get(url) as resp:
            return await resp.json()

async def main():
    sem = asyncio.Semaphore(50)  # макс 50 одновременных
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(sem, session, url) for url in urls]
        return await asyncio.gather(*tasks)

asyncpg — асинхронная работа с PostgreSQL

Python
import asyncpg

async def load_data():
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/dwh",
        min_size=5, max_size=20
    )

    async with pool.acquire() as conn:
        # Быстрый SELECT
        rows = await conn.fetch("SELECT * FROM orders WHERE status = $1",
                                "delivered")

        # Массовая вставка — быстрее psycopg2
        await conn.copy_records_to_table(
            "staging_orders",
            records=[(1, "Alice", 100.0), (2, "Bob", 200.0)],
            columns=["id", "name", "amount"]
        )

    await pool.close()

Сравнение на реальной задаче

Задача: скачать 1000 страниц API и записать в БД.

Python
# 1. Последовательно — ~500 сек (0.5 сек на запрос)
for url in urls:
    data = requests.get(url).json()
    insert_to_db(data)

# 2. ThreadPoolExecutor(20) — ~25 сек
with ThreadPoolExecutor(20) as pool:
    results = list(pool.map(fetch, urls))
bulk_insert(results)

# 3. asyncio + aiohttp — ~10 сек
async def main():
    sem = asyncio.Semaphore(50)
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*[
            fetch_with_limit(sem, session, url) for url in urls
        ])
    bulk_insert(results)
Подход Время Ускорение
Последовательно 500 сек 1x
threading (20) 25 сек 20x
asyncio (50) 10 сек 50x

Паттерн: Producer-Consumer

Python
import asyncio
from asyncio import Queue

async def producer(queue: Queue, urls: list[str]):
    """Извлекает данные и кладёт в очередь."""
    async with aiohttp.ClientSession() as session:
        for url in urls:
            data = await fetch(session, url)
            await queue.put(data)
    await queue.put(None)  # сигнал завершения

async def consumer(queue: Queue, conn):
    """Забирает из очереди и пишет в БД пачками."""
    batch = []
    while True:
        item = await queue.get()
        if item is None:
            break
        batch.append(item)
        if len(batch) >= 100:
            await bulk_insert(conn, batch)
            batch.clear()
    if batch:
        await bulk_insert(conn, batch)

async def main():
    queue = Queue(maxsize=200)  # backpressure
    await asyncio.gather(
        producer(queue, urls),
        consumer(queue, conn)
    )

Что запомнить

Задача Решение Почему
100 API-запросов asyncio + aiohttp Минимальный overhead, максимальная конкурентность
Параллельная выгрузка из 5 БД ThreadPoolExecutor Простота, драйверы БД блокирующие
Трансформация 100 Parquet-файлов ProcessPoolExecutor CPU-bound, нужен реальный параллелизм
API-сервер с высокой нагрузкой asyncio + asyncpg Тысячи соединений, неблокирующий I/O
Простой скрипт, 10 файлов Последовательно Overhead параллелизма > выигрыша

Проверь себя


Источники