DE-нагрузки
Kubernetes: DE-нагрузки¶
Kubernetes отлично подходит для дата-инженерных задач: Jobs для batch-обработки, CronJobs для расписания, Spark operator для распределённых вычислений, KubernetesPodOperator для изоляции Airflow-задач.
Job — одноразовая задача¶
Job запускает Pod, ждёт завершения, и не перезапускает (в отличие от Deployment).
YAML
apiVersion: batch/v1
kind: Job
metadata:
name: etl-load-20250315
spec:
backoffLimit: 3 # Макс 3 попытки при ошибке
activeDeadlineSeconds: 3600 # Таймаут 1 час
ttlSecondsAfterFinished: 86400 # Удалить через 24 часа после завершения
template:
spec:
restartPolicy: Never
containers:
- name: loader
image: myregistry/etl-loader:v2
command: ["python", "load.py", "--date", "2025-03-15"]
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1"
Параллельные Job'ы¶
YAML
spec:
parallelism: 5 # 5 Pod'ов одновременно
completions: 20 # Всего 20 задач
completionMode: Indexed # Каждый Pod получает индекс (0-19)
Python
# В контейнере
import os
index = int(os.environ["JOB_COMPLETION_INDEX"])
# Pod 0 обрабатывает partition_0, Pod 1 — partition_1, ...
CronJob — задача по расписанию¶
YAML
apiVersion: batch/v1
kind: CronJob
metadata:
name: etl-daily-sync
spec:
schedule: "0 3 * * *" # Каждый день в 03:00 UTC
concurrencyPolicy: Forbid # Не запускать если предыдущий ещё работает
successfulJobsHistoryLimit: 3 # Хранить 3 последних успешных Job'а
failedJobsHistoryLimit: 5
startingDeadlineSeconds: 600 # Если не стартовал за 10 мин — пропустить
jobTemplate:
spec:
backoffLimit: 2
template:
spec:
restartPolicy: Never
containers:
- name: sync
image: myregistry/etl-sync:latest
command: ["python", "sync.py"]
| concurrencyPolicy | Описание |
|---|---|
Allow |
Несколько Job'ов одновременно (default) |
Forbid |
Пропустить если предыдущий ещё работает |
Replace |
Остановить предыдущий, запустить новый |
Spark on Kubernetes¶
Spark Operator¶
Bash
helm install spark-operator spark-operator/spark-operator \
--namespace spark --create-namespace
YAML
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: etl-transform
spec:
type: Python
mode: cluster
image: myregistry/spark-etl:v3
mainApplicationFile: local:///app/transform.py
sparkVersion: "3.5.0"
driver:
cores: 1
memory: "2g"
serviceAccount: spark
executor:
cores: 2
instances: 4
memory: "4g"
restartPolicy:
type: OnFailure
onFailureRetries: 2
arguments:
- "--date"
- "2025-03-15"
Spark operator автоматически создаёт driver Pod и executor Pod'ы, управляет lifecycle.
spark-submit на K8s (без оператора)¶
Bash
spark-submit \
--master k8s://https://k8s-api:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=myregistry/spark:3.5 \
--conf spark.executor.instances=4 \
--conf spark.executor.memory=4g \
local:///app/transform.py
KubernetesPodOperator (Airflow)¶
Каждая Airflow-задача запускается в отдельном Pod — полная изоляция.
Python
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
transform = KubernetesPodOperator(
task_id="transform_orders",
name="transform-orders",
namespace="data-platform",
image="myregistry/etl-transform:v2",
cmds=["python", "transform.py"],
arguments=["--date", "{{ ds }}"],
env_vars={
"DATABASE_URL": "{{ conn.etl_db.get_uri() }}",
},
resources={
"request_memory": "1Gi",
"request_cpu": "500m",
"limit_memory": "2Gi",
},
is_delete_operator_pod=True, # Удалить Pod после завершения
get_logs=True, # Показать логи в Airflow UI
startup_timeout_seconds=300,
)
Преимущества:
- Разные Docker-образы для разных задач (Python 3.9, Spark, dbt)
- Изоляция зависимостей — нет конфликтов pip-пакетов
- Независимое масштабирование ресурсов на задачу
HPA — автомасштабирование¶
Horizontal Pod Autoscaler автоматически увеличивает/уменьшает количество Pod'ов.
YAML
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: backend-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: backend
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70 # Масштабировать при CPU > 70%
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Init-контейнеры¶
Init-контейнер запускается до основного. Полезно для миграций, проверки зависимостей.
YAML
spec:
initContainers:
- name: wait-for-db
image: busybox:1.36
command: ["sh", "-c", "until nc -z db-svc 5432; do sleep 2; done"]
- name: run-migrations
image: myregistry/backend:v2
command: ["alembic", "upgrade", "head"]
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
containers:
- name: backend
image: myregistry/backend:v2
Resource Quotas¶
Ограничение ресурсов на namespace — защита от «убежавшего» пайплайна.
YAML
apiVersion: v1
kind: ResourceQuota
metadata:
name: data-platform-quota
namespace: data-platform
spec:
hard:
requests.cpu: "20"
requests.memory: "40Gi"
limits.cpu: "40"
limits.memory: "80Gi"
pods: "50"
Проверь себя¶
Источники¶
- Kubernetes: Jobs — batch-задачи
- Kubernetes: CronJob — расписание
- Spark Operator — Spark на K8s
- Airflow: KubernetesPodOperator — изолированные задачи
- Kubernetes: HPA — автомасштабирование