Перейти к содержанию

DE-нагрузки

Kubernetes: DE-нагрузки

Kubernetes отлично подходит для дата-инженерных задач: Jobs для batch-обработки, CronJobs для расписания, Spark operator для распределённых вычислений, KubernetesPodOperator для изоляции Airflow-задач.


Job — одноразовая задача

Job запускает Pod, ждёт завершения, и не перезапускает (в отличие от Deployment).

YAML
apiVersion: batch/v1
kind: Job
metadata:
  name: etl-load-20250315
spec:
  backoffLimit: 3                # Макс 3 попытки при ошибке
  activeDeadlineSeconds: 3600    # Таймаут 1 час
  ttlSecondsAfterFinished: 86400 # Удалить через 24 часа после завершения
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: loader
          image: myregistry/etl-loader:v2
          command: ["python", "load.py", "--date", "2025-03-15"]
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: db-secret
                  key: url
          resources:
            requests:
              memory: "1Gi"
              cpu: "500m"
            limits:
              memory: "2Gi"
              cpu: "1"

Параллельные Job'ы

YAML
spec:
  parallelism: 5          # 5 Pod'ов одновременно
  completions: 20          # Всего 20 задач
  completionMode: Indexed  # Каждый Pod получает индекс (0-19)
Python
# В контейнере
import os
index = int(os.environ["JOB_COMPLETION_INDEX"])
# Pod 0 обрабатывает partition_0, Pod 1 — partition_1, ...

CronJob — задача по расписанию

YAML
apiVersion: batch/v1
kind: CronJob
metadata:
  name: etl-daily-sync
spec:
  schedule: "0 3 * * *"           # Каждый день в 03:00 UTC
  concurrencyPolicy: Forbid        # Не запускать если предыдущий ещё работает
  successfulJobsHistoryLimit: 3    # Хранить 3 последних успешных Job'а
  failedJobsHistoryLimit: 5
  startingDeadlineSeconds: 600     # Если не стартовал за 10 мин — пропустить
  jobTemplate:
    spec:
      backoffLimit: 2
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: sync
              image: myregistry/etl-sync:latest
              command: ["python", "sync.py"]
concurrencyPolicy Описание
Allow Несколько Job'ов одновременно (default)
Forbid Пропустить если предыдущий ещё работает
Replace Остановить предыдущий, запустить новый

Spark on Kubernetes

Spark Operator

Bash
helm install spark-operator spark-operator/spark-operator \
    --namespace spark --create-namespace
YAML
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: etl-transform
spec:
  type: Python
  mode: cluster
  image: myregistry/spark-etl:v3
  mainApplicationFile: local:///app/transform.py
  sparkVersion: "3.5.0"
  driver:
    cores: 1
    memory: "2g"
    serviceAccount: spark
  executor:
    cores: 2
    instances: 4
    memory: "4g"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 2
  arguments:
    - "--date"
    - "2025-03-15"

Spark operator автоматически создаёт driver Pod и executor Pod'ы, управляет lifecycle.

spark-submit на K8s (без оператора)

Bash
spark-submit \
    --master k8s://https://k8s-api:6443 \
    --deploy-mode cluster \
    --conf spark.kubernetes.container.image=myregistry/spark:3.5 \
    --conf spark.executor.instances=4 \
    --conf spark.executor.memory=4g \
    local:///app/transform.py

KubernetesPodOperator (Airflow)

Каждая Airflow-задача запускается в отдельном Pod — полная изоляция.

Python
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

transform = KubernetesPodOperator(
    task_id="transform_orders",
    name="transform-orders",
    namespace="data-platform",
    image="myregistry/etl-transform:v2",
    cmds=["python", "transform.py"],
    arguments=["--date", "{{ ds }}"],
    env_vars={
        "DATABASE_URL": "{{ conn.etl_db.get_uri() }}",
    },
    resources={
        "request_memory": "1Gi",
        "request_cpu": "500m",
        "limit_memory": "2Gi",
    },
    is_delete_operator_pod=True,   # Удалить Pod после завершения
    get_logs=True,                  # Показать логи в Airflow UI
    startup_timeout_seconds=300,
)

Преимущества:

  • Разные Docker-образы для разных задач (Python 3.9, Spark, dbt)
  • Изоляция зависимостей — нет конфликтов pip-пакетов
  • Независимое масштабирование ресурсов на задачу

HPA — автомасштабирование

Horizontal Pod Autoscaler автоматически увеличивает/уменьшает количество Pod'ов.

YAML
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: backend-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: backend
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70    # Масштабировать при CPU > 70%
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

Init-контейнеры

Init-контейнер запускается до основного. Полезно для миграций, проверки зависимостей.

YAML
spec:
  initContainers:
    - name: wait-for-db
      image: busybox:1.36
      command: ["sh", "-c", "until nc -z db-svc 5432; do sleep 2; done"]

    - name: run-migrations
      image: myregistry/backend:v2
      command: ["alembic", "upgrade", "head"]
      env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url

  containers:
    - name: backend
      image: myregistry/backend:v2

Resource Quotas

Ограничение ресурсов на namespace — защита от «убежавшего» пайплайна.

YAML
apiVersion: v1
kind: ResourceQuota
metadata:
  name: data-platform-quota
  namespace: data-platform
spec:
  hard:
    requests.cpu: "20"
    requests.memory: "40Gi"
    limits.cpu: "40"
    limits.memory: "80Gi"
    pods: "50"

Проверь себя


Источники