S3 и MinIO¶
Зачем это DE?¶
Объектное хранилище — фундамент Data Lake. S3 (или совместимый MinIO) — место, куда складываются сырые данные, промежуточные файлы и финальные витрины. boto3, форматы файлов, lifecycle-правила — повседневные инструменты DE.
Объектное хранилище vs файловая система¶
| Параметр | Файловая система | Объектное хранилище |
|---|---|---|
| Структура | Иерархия каталогов | Плоское пространство ключей (prefix = «папка») |
| Доступ | Путь /data/file.csv |
Бакет + ключ s3://bucket/data/file.csv |
| Протокол | POSIX (read, write, seek) | HTTP API (GET, PUT, DELETE) |
| Масштабируемость | Ограничена диском/NFS | Практически бесконечная |
| Изменение файла | Можно дописать в середину | Только полная перезапись |
MinIO — S3-совместимое хранилище в Docker¶
Запуск¶
YAML
# docker-compose.yml
services:
minio:
image: minio/minio:latest
command: server /data --console-address ":9001"
ports:
- "9000:9000" # S3 API
- "9001:9001" # Web UI
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio_data:/data
volumes:
minio_data:
Создание бакета через mc (MinIO Client)¶
Bash
# Установка
curl -O https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
# Настройка
mc alias set local http://localhost:9000 minioadmin minioadmin
# Операции
mc mb local/raw-data # создать бакет
mc ls local/raw-data # список объектов
mc cp file.csv local/raw-data/ # загрузить файл
mc cat local/raw-data/file.csv # прочитать
boto3 — Python SDK для S3¶
Подключение¶
Python
import boto3
# AWS S3
s3 = boto3.client("s3")
# MinIO (S3-совместимый)
s3 = boto3.client(
"s3",
endpoint_url="http://localhost:9000",
aws_access_key_id="minioadmin",
aws_secret_access_key="minioadmin",
region_name="us-east-1"
)
Основные операции¶
Python
# Загрузить файл
s3.upload_file("orders.csv", "raw-data", "orders/2025/01/orders.csv")
# Скачать
s3.download_file("raw-data", "orders/2025/01/orders.csv", "/tmp/orders.csv")
# Загрузить объект в память
response = s3.get_object(Bucket="raw-data", Key="orders/2025/01/orders.csv")
content = response["Body"].read().decode("utf-8")
# Записать из памяти
import io
buffer = io.BytesIO(b"col1,col2\n1,a\n2,b")
s3.upload_fileobj(buffer, "raw-data", "test.csv")
# Список объектов
response = s3.list_objects_v2(Bucket="raw-data", Prefix="orders/2025/")
for obj in response.get("Contents", []):
print(obj["Key"], obj["Size"])
Пагинация (> 1000 объектов)¶
Python
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket="raw-data", Prefix="events/"):
for obj in page.get("Contents", []):
print(obj["Key"])
Загрузка больших файлов (multipart)¶
Python
from boto3.s3.transfer import TransferConfig
config = TransferConfig(
multipart_threshold=100 * 1024 * 1024, # файлы > 100 MB — multipart
multipart_chunksize=50 * 1024 * 1024, # чанки по 50 MB
max_concurrency=10
)
s3.upload_file("huge_file.parquet", "raw-data", "huge_file.parquet",
Config=config)
Структура Data Lake¶
Партиционированный layout¶
Text Only
s3://data-lake/
├── raw/ # Bronze: сырые данные as-is
│ ├── orders/
│ │ ├── year=2025/month=01/day=15/
│ │ │ └── orders_20250115.parquet
│ │ └── year=2025/month=01/day=16/
│ │ └── orders_20250116.parquet
│ └── events/
│ └── dt=2025-01-15/
│ ├── part-00000.parquet
│ └── part-00001.parquet
├── cleaned/ # Silver: очищенные данные
│ └── orders/
│ └── year=2025/month=01/
│ └── orders_cleaned.parquet
└── analytics/ # Gold: витрины
└── daily_revenue/
└── date=2025-01-15/
└── revenue.parquet
Hive-style partitioning¶
Python
# Запись с партиционированием через pandas + pyarrow
import pyarrow as pa
import pyarrow.parquet as pq
df["year"] = df["order_date"].dt.year
df["month"] = df["order_date"].dt.month
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path="s3://data-lake/raw/orders/",
partition_cols=["year", "month"],
filesystem=s3_filesystem # pyarrow.fs.S3FileSystem
)
# Создаст: s3://data-lake/raw/orders/year=2025/month=01/part-0.parquet
Форматы файлов для Data Lake¶
| Формат | Тип | Сжатие | Чтение столбцов | Когда использовать |
|---|---|---|---|---|
| CSV | Строковый | Нет (gzip внешне) | ❌ всё целиком | Обмен с внешними системами |
| Parquet | Колоночный | Snappy/ZSTD | ✅ только нужные | Аналитика, DWH, Spark |
| Avro | Строковый | Snappy/Deflate | ❌ целиком | Kafka, потоковая запись |
| ORC | Колоночный | ZLIB/Snappy | ✅ | Hive, старые Hadoop-кластеры |
| JSON Lines | Строковый | Нет | ❌ | Логи, API-ответы |
Parquet — выбор по умолчанию
Для 99% аналитических задач Parquet + Snappy. Колоночное хранение + сжатие → в 5-10 раз меньше CSV.
Чтение Parquet из S3¶
Python
import pandas as pd
# Через pandas + s3fs
df = pd.read_parquet("s3://raw-data/orders/year=2025/month=01/",
storage_options={
"endpoint_url": "http://localhost:9000",
"key": "minioadmin",
"secret": "minioadmin"
})
# Через pyarrow (быстрее для больших данных)
import pyarrow.parquet as pq
import pyarrow.fs as fs
s3 = fs.S3FileSystem(
endpoint_override="localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
scheme="http"
)
dataset = pq.ParquetDataset("raw-data/orders/", filesystem=s3)
table = dataset.read(columns=["order_id", "total_amount"]) # только нужные столбцы
df = table.to_pandas()
Lifecycle-правила¶
Автоматическое управление жизненным циклом объектов:
Python
# Через boto3
s3.put_bucket_lifecycle_configuration(
Bucket="raw-data",
LifecycleConfiguration={
"Rules": [
{
"ID": "delete-old-raw",
"Filter": {"Prefix": "raw/"},
"Status": "Enabled",
"Expiration": {"Days": 90} # удалить через 90 дней
},
{
"ID": "archive-cleaned",
"Filter": {"Prefix": "cleaned/"},
"Status": "Enabled",
"Transitions": [
{"Days": 30, "StorageClass": "STANDARD_IA"}, # дешёвое хранение через 30 дней
{"Days": 180, "StorageClass": "GLACIER"} # архив через 180 дней
]
}
]
}
)
Паттерны для DE¶
Идемпотентная загрузка¶
Python
def upload_daily_data(date: str, df: pd.DataFrame):
"""Загружает данные за день — перезаписывает если уже есть."""
key = f"cleaned/orders/date={date}/data.parquet"
buffer = io.BytesIO()
df.to_parquet(buffer, index=False, engine="pyarrow")
buffer.seek(0)
s3.upload_fileobj(buffer, "data-lake", key)
Проверка свежести данных¶
Python
def check_freshness(bucket, prefix, max_age_hours=24):
"""Проверяет, что последний файл не старше N часов."""
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" not in response:
raise ValueError(f"Нет файлов в {prefix}")
latest = max(response["Contents"], key=lambda x: x["LastModified"])
age = datetime.now(timezone.utc) - latest["LastModified"]
if age.total_seconds() > max_age_hours * 3600:
raise ValueError(f"Данные устарели: {age}")
Что запомнить¶
| Компонент | Для чего | Когда |
|---|---|---|
| MinIO | S3 в Docker для разработки | Локальная разработка, CI |
| boto3 | Python SDK для S3 | Все операции с S3 |
| Parquet | Колоночный формат | Аналитика, DWH |
| Partitioning | Организация файлов по дате/ключу | Быстрый доступ к подмножеству |
| Lifecycle | Автоудаление/архивация | Управление стоимостью хранения |