Перейти к содержанию

S3 и MinIO

Зачем это DE?

Объектное хранилище — фундамент Data Lake. S3 (или совместимый MinIO) — место, куда складываются сырые данные, промежуточные файлы и финальные витрины. boto3, форматы файлов, lifecycle-правила — повседневные инструменты DE.


Объектное хранилище vs файловая система

Параметр Файловая система Объектное хранилище
Структура Иерархия каталогов Плоское пространство ключей (prefix = «папка»)
Доступ Путь /data/file.csv Бакет + ключ s3://bucket/data/file.csv
Протокол POSIX (read, write, seek) HTTP API (GET, PUT, DELETE)
Масштабируемость Ограничена диском/NFS Практически бесконечная
Изменение файла Можно дописать в середину Только полная перезапись

MinIO — S3-совместимое хранилище в Docker

Запуск

YAML
# docker-compose.yml
services:
  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"   # S3 API
      - "9001:9001"   # Web UI
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    volumes:
      - minio_data:/data

volumes:
  minio_data:
Bash
docker compose up -d minio
# Web UI: http://localhost:9001
# S3 API: http://localhost:9000

Создание бакета через mc (MinIO Client)

Bash
# Установка
curl -O https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc

# Настройка
mc alias set local http://localhost:9000 minioadmin minioadmin

# Операции
mc mb local/raw-data           # создать бакет
mc ls local/raw-data           # список объектов
mc cp file.csv local/raw-data/ # загрузить файл
mc cat local/raw-data/file.csv # прочитать

boto3 — Python SDK для S3

Подключение

Python
import boto3

# AWS S3
s3 = boto3.client("s3")

# MinIO (S3-совместимый)
s3 = boto3.client(
    "s3",
    endpoint_url="http://localhost:9000",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    region_name="us-east-1"
)

Основные операции

Python
# Загрузить файл
s3.upload_file("orders.csv", "raw-data", "orders/2025/01/orders.csv")

# Скачать
s3.download_file("raw-data", "orders/2025/01/orders.csv", "/tmp/orders.csv")

# Загрузить объект в память
response = s3.get_object(Bucket="raw-data", Key="orders/2025/01/orders.csv")
content = response["Body"].read().decode("utf-8")

# Записать из памяти
import io
buffer = io.BytesIO(b"col1,col2\n1,a\n2,b")
s3.upload_fileobj(buffer, "raw-data", "test.csv")

# Список объектов
response = s3.list_objects_v2(Bucket="raw-data", Prefix="orders/2025/")
for obj in response.get("Contents", []):
    print(obj["Key"], obj["Size"])

Пагинация (> 1000 объектов)

Python
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket="raw-data", Prefix="events/"):
    for obj in page.get("Contents", []):
        print(obj["Key"])

Загрузка больших файлов (multipart)

Python
from boto3.s3.transfer import TransferConfig

config = TransferConfig(
    multipart_threshold=100 * 1024 * 1024,  # файлы > 100 MB — multipart
    multipart_chunksize=50 * 1024 * 1024,    # чанки по 50 MB
    max_concurrency=10
)

s3.upload_file("huge_file.parquet", "raw-data", "huge_file.parquet",
               Config=config)

Структура Data Lake

Партиционированный layout

Text Only
s3://data-lake/
├── raw/                          # Bronze: сырые данные as-is
│   ├── orders/
│   │   ├── year=2025/month=01/day=15/
│   │   │   └── orders_20250115.parquet
│   │   └── year=2025/month=01/day=16/
│   │       └── orders_20250116.parquet
│   └── events/
│       └── dt=2025-01-15/
│           ├── part-00000.parquet
│           └── part-00001.parquet
├── cleaned/                      # Silver: очищенные данные
│   └── orders/
│       └── year=2025/month=01/
│           └── orders_cleaned.parquet
└── analytics/                    # Gold: витрины
    └── daily_revenue/
        └── date=2025-01-15/
            └── revenue.parquet

Hive-style partitioning

Python
# Запись с партиционированием через pandas + pyarrow
import pyarrow as pa
import pyarrow.parquet as pq

df["year"] = df["order_date"].dt.year
df["month"] = df["order_date"].dt.month

table = pa.Table.from_pandas(df)
pq.write_to_dataset(
    table,
    root_path="s3://data-lake/raw/orders/",
    partition_cols=["year", "month"],
    filesystem=s3_filesystem  # pyarrow.fs.S3FileSystem
)
# Создаст: s3://data-lake/raw/orders/year=2025/month=01/part-0.parquet

Форматы файлов для Data Lake

Формат Тип Сжатие Чтение столбцов Когда использовать
CSV Строковый Нет (gzip внешне) ❌ всё целиком Обмен с внешними системами
Parquet Колоночный Snappy/ZSTD ✅ только нужные Аналитика, DWH, Spark
Avro Строковый Snappy/Deflate ❌ целиком Kafka, потоковая запись
ORC Колоночный ZLIB/Snappy Hive, старые Hadoop-кластеры
JSON Lines Строковый Нет Логи, API-ответы

Parquet — выбор по умолчанию

Для 99% аналитических задач Parquet + Snappy. Колоночное хранение + сжатие → в 5-10 раз меньше CSV.

Чтение Parquet из S3

Python
import pandas as pd

# Через pandas + s3fs
df = pd.read_parquet("s3://raw-data/orders/year=2025/month=01/",
                     storage_options={
                         "endpoint_url": "http://localhost:9000",
                         "key": "minioadmin",
                         "secret": "minioadmin"
                     })

# Через pyarrow (быстрее для больших данных)
import pyarrow.parquet as pq
import pyarrow.fs as fs

s3 = fs.S3FileSystem(
    endpoint_override="localhost:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    scheme="http"
)

dataset = pq.ParquetDataset("raw-data/orders/", filesystem=s3)
table = dataset.read(columns=["order_id", "total_amount"])  # только нужные столбцы
df = table.to_pandas()

Lifecycle-правила

Автоматическое управление жизненным циклом объектов:

Python
# Через boto3
s3.put_bucket_lifecycle_configuration(
    Bucket="raw-data",
    LifecycleConfiguration={
        "Rules": [
            {
                "ID": "delete-old-raw",
                "Filter": {"Prefix": "raw/"},
                "Status": "Enabled",
                "Expiration": {"Days": 90}  # удалить через 90 дней
            },
            {
                "ID": "archive-cleaned",
                "Filter": {"Prefix": "cleaned/"},
                "Status": "Enabled",
                "Transitions": [
                    {"Days": 30, "StorageClass": "STANDARD_IA"},  # дешёвое хранение через 30 дней
                    {"Days": 180, "StorageClass": "GLACIER"}       # архив через 180 дней
                ]
            }
        ]
    }
)

Паттерны для DE

Идемпотентная загрузка

Python
def upload_daily_data(date: str, df: pd.DataFrame):
    """Загружает данные за день — перезаписывает если уже есть."""
    key = f"cleaned/orders/date={date}/data.parquet"
    buffer = io.BytesIO()
    df.to_parquet(buffer, index=False, engine="pyarrow")
    buffer.seek(0)
    s3.upload_fileobj(buffer, "data-lake", key)

Проверка свежести данных

Python
def check_freshness(bucket, prefix, max_age_hours=24):
    """Проверяет, что последний файл не старше N часов."""
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    if "Contents" not in response:
        raise ValueError(f"Нет файлов в {prefix}")

    latest = max(response["Contents"], key=lambda x: x["LastModified"])
    age = datetime.now(timezone.utc) - latest["LastModified"]

    if age.total_seconds() > max_age_hours * 3600:
        raise ValueError(f"Данные устарели: {age}")

Что запомнить

Компонент Для чего Когда
MinIO S3 в Docker для разработки Локальная разработка, CI
boto3 Python SDK для S3 Все операции с S3
Parquet Колоночный формат Аналитика, DWH
Partitioning Организация файлов по дате/ключу Быстрый доступ к подмножеству
Lifecycle Автоудаление/архивация Управление стоимостью хранения

Проверь себя


Источники