Конкурентность и параллелизм¶
Зачем это DE?¶
Data Engineer работает с I/O: API, базы данных, файлы. Конкурентность позволяет не ждать каждый ответ последовательно, а запускать десятки запросов параллельно. Разница — минуты vs часы.
Три модели: threading, multiprocessing, asyncio¶
Text Only
┌──────────────────┬──────────────┬────────────────┬──────────────┐
│ │ threading │ multiprocessing │ asyncio │
├──────────────────┼──────────────┼────────────────┼──────────────┤
│ Параллелизм │ ❌ (GIL) │ ✅ (процессы) │ ❌ (1 поток) │
│ Конкурентность │ ✅ │ ✅ │ ✅ │
│ I/O-bound │ ✅ хорошо │ ⚠️ избыточно │ ✅ отлично │
│ CPU-bound │ ❌ бесполезно│ ✅ отлично │ ❌ бесполезно │
│ Память на задачу │ ~8 KB (поток)│ ~30 MB (процесс)│ ~1 KB (корутина)│
│ Типичное кол-во │ 10-50 │ CPU cores │ 1000+ │
└──────────────────┴──────────────┴────────────────┴──────────────┘
GIL — Global Interpreter Lock¶
GIL не даёт нескольким потокам выполнять Python-байткод одновременно. Потоки переключаются, но не работают параллельно.
Исключение: GIL отпускается при I/O (сеть, диск, sleep). Поэтому threading помогает для I/O-bound задач.
threading — параллельный I/O¶
ThreadPoolExecutor¶
Python
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
urls = [f"https://api.example.com/page/{i}" for i in range(100)]
def fetch(url):
resp = requests.get(url, timeout=10)
return resp.json()
# 20 потоков — 20 запросов одновременно
with ThreadPoolExecutor(max_workers=20) as pool:
futures = {pool.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
process(data)
except Exception as e:
print(f"Ошибка {url}: {e}")
Практический пример: параллельная выгрузка из нескольких БД¶
Python
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import create_engine, text
sources = {
"crm": "postgresql://crm_host/crm",
"billing": "postgresql://billing_host/billing",
"logs": "postgresql://logs_host/logs",
}
def extract_from(name, dsn):
engine = create_engine(dsn)
with engine.connect() as conn:
rows = conn.execute(text("SELECT * FROM daily_stats")).fetchall()
engine.dispose()
return name, rows
with ThreadPoolExecutor(max_workers=len(sources)) as pool:
futures = [pool.submit(extract_from, name, dsn)
for name, dsn in sources.items()]
for future in as_completed(futures):
name, rows = future.result()
print(f"{name}: {len(rows)} строк")
multiprocessing — настоящий параллелизм¶
Для CPU-bound задач (парсинг, трансформации, сериализация) — каждый процесс имеет свой GIL.
ProcessPoolExecutor¶
Python
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
files = ["chunk_001.parquet", "chunk_002.parquet", ..., "chunk_100.parquet"]
def transform_chunk(path):
df = pd.read_parquet(path)
df["revenue"] = df["quantity"] * df["price"]
df = df[df["revenue"] > 0]
return df
# Обрабатываем файлы на всех ядрах
with ProcessPoolExecutor() as pool: # max_workers = cpu_count()
results = list(pool.map(transform_chunk, files))
final_df = pd.concat(results, ignore_index=True)
Сериализация через pickle
Данные между процессами передаются через pickle. Нельзя передавать соединения БД, лямбды, генераторы. Передавай пути к файлам или сериализуемые данные.
Когда НЕ использовать multiprocessing¶
- Задач мало (overhead на создание процесса > пользы)
- Данные огромные (сериализация в pickle медленная)
- Нужен shared state (используй
multiprocessing.Managerили Redis)
asyncio — тысячи одновременных I/O¶
Основы: async/await¶
Python
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.json()
async def main():
urls = [f"https://api.example.com/page/{i}" for i in range(1000)]
async with aiohttp.ClientSession() as session:
# Запускаем ВСЕ 1000 запросов «одновременно»
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Ошибка: {result}")
else:
process(result)
asyncio.run(main())
Семафор — ограничение конкурентности¶
Python
async def fetch_with_limit(sem, session, url):
async with sem: # не более N одновременных запросов
async with session.get(url) as resp:
return await resp.json()
async def main():
sem = asyncio.Semaphore(50) # макс 50 одновременных
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(sem, session, url) for url in urls]
return await asyncio.gather(*tasks)
asyncpg — асинхронная работа с PostgreSQL¶
Python
import asyncpg
async def load_data():
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/dwh",
min_size=5, max_size=20
)
async with pool.acquire() as conn:
# Быстрый SELECT
rows = await conn.fetch("SELECT * FROM orders WHERE status = $1",
"delivered")
# Массовая вставка — быстрее psycopg2
await conn.copy_records_to_table(
"staging_orders",
records=[(1, "Alice", 100.0), (2, "Bob", 200.0)],
columns=["id", "name", "amount"]
)
await pool.close()
Сравнение на реальной задаче¶
Задача: скачать 1000 страниц API и записать в БД.
Python
# 1. Последовательно — ~500 сек (0.5 сек на запрос)
for url in urls:
data = requests.get(url).json()
insert_to_db(data)
# 2. ThreadPoolExecutor(20) — ~25 сек
with ThreadPoolExecutor(20) as pool:
results = list(pool.map(fetch, urls))
bulk_insert(results)
# 3. asyncio + aiohttp — ~10 сек
async def main():
sem = asyncio.Semaphore(50)
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[
fetch_with_limit(sem, session, url) for url in urls
])
bulk_insert(results)
| Подход | Время | Ускорение |
|---|---|---|
| Последовательно | 500 сек | 1x |
| threading (20) | 25 сек | 20x |
| asyncio (50) | 10 сек | 50x |
Паттерн: Producer-Consumer¶
Python
import asyncio
from asyncio import Queue
async def producer(queue: Queue, urls: list[str]):
"""Извлекает данные и кладёт в очередь."""
async with aiohttp.ClientSession() as session:
for url in urls:
data = await fetch(session, url)
await queue.put(data)
await queue.put(None) # сигнал завершения
async def consumer(queue: Queue, conn):
"""Забирает из очереди и пишет в БД пачками."""
batch = []
while True:
item = await queue.get()
if item is None:
break
batch.append(item)
if len(batch) >= 100:
await bulk_insert(conn, batch)
batch.clear()
if batch:
await bulk_insert(conn, batch)
async def main():
queue = Queue(maxsize=200) # backpressure
await asyncio.gather(
producer(queue, urls),
consumer(queue, conn)
)
Что запомнить¶
| Задача | Решение | Почему |
|---|---|---|
| 100 API-запросов | asyncio + aiohttp |
Минимальный overhead, максимальная конкурентность |
| Параллельная выгрузка из 5 БД | ThreadPoolExecutor |
Простота, драйверы БД блокирующие |
| Трансформация 100 Parquet-файлов | ProcessPoolExecutor |
CPU-bound, нужен реальный параллелизм |
| API-сервер с высокой нагрузкой | asyncio + asyncpg |
Тысячи соединений, неблокирующий I/O |
| Простой скрипт, 10 файлов | Последовательно | Overhead параллелизма > выигрыша |