00 MLOps Nedir?
MLOps, makine öğrenmesi modellerini güvenilir ve tekrarlanabilir şekilde üretime taşımak için DevOps pratiklerini ML yaşam döngüsüne uygular.
Klasik yazılım deployment'ında tek bir bağımlılık vardır: kod. Bir ML modelini dağıtırken ise üç bağımlılık birbirinden ayrılamaz: kod (model mimarisi, inference mantığı), veri (eğitim seti, preprocessing pipeline), model ağırlıkları (checkpoint, quantization durumu). Bu üçlü bağımlılık, geleneksel CI/CD araçlarının ML için yetersiz kalmasının temel sebebidir.
ML Lifecycle
01 Veri → toplama, temizleme, etiketleme, versiyon 02 Eğitim → hyperparameter search, experiment tracking 03 Validasyon → offline metrikler, business metrikler 04 Deployment → packaging, serving, A/B test 05 Monitoring → drift, latency, kalite izleme 06 Retraining → trigger belirleme, pipeline yeniden çalıştırma
MLOps Olgunluk Seviyeleri
| Seviye | Özellik | Araçlar |
|---|---|---|
| Level 0 | Tamamen manuel. Notebook'tan kopyala yapıştır. No reproducibility. | Jupyter, pickle |
| Level 1 | Eğitim pipeline'ı otomatik. Model deployment hâlâ manuel. | MLflow, DVC |
| Level 2 | CI/CD pipeline var. Deploy otomatik ama retraining trigger manuel. | GitHub Actions, Docker |
| Level 3 | Tam otomasyon. Drift → retraining trigger → canary deploy → monitoring döngüsü. | Kubeflow, Vertex AI, SageMaker |
MLOps Araç Seti
01 FastAPI ile Inference Endpoint
Modeli REST API olarak sunmanın en hızlı ve production-ready yolu: async FastAPI, Pydantic validasyon, health check ve batch inference.
FastAPI, Python'un en performanslı web framework'lerinden biridir. Async destek, otomatik OpenAPI dokümantasyon ve Pydantic entegrasyonu ile ML inference endpoint'i yazmayı minimalist tutar. uvicorn ile saniyeler içinde production-grade servis çalıştırabilirsin.
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from contextlib import asynccontextmanager
import torch, time
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from prometheus_client import Counter, Histogram, generate_latest
from starlette.responses import Response
import logging
logger = logging.getLogger(__name__)
# ── Prometheus metrikleri ──────────────────────────────────────────
REQUEST_COUNT = Counter("inference_requests_total", "Toplam istek", ["status"])
LATENCY = Histogram("inference_latency_ms", "Latency (ms)", buckets=[10,50,100,250,500,1000])
# ── Pydantic şemaları ──────────────────────────────────────────────
class PredictRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=512, description="Sınıflandırılacak metin")
top_k: int = Field(default=1, ge=1, le=5)
class BatchPredictRequest(BaseModel):
texts: list[str] = Field(..., min_length=1, max_length=32)
top_k: int = Field(default=1, ge=1, le=5)
class PredictResponse(BaseModel):
label: str
score: float
latency_ms: float
class BatchPredictResponse(BaseModel):
predictions: list[PredictResponse]
total_latency_ms: float
# ── Model yükleme (lifespan) ───────────────────────────────────────
model_store: dict = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
model_name = "distilbert-base-uncased-finetuned-sst-2-english"
logger.info(f"Model yükleniyor: {model_name}")
model_store["tokenizer"] = AutoTokenizer.from_pretrained(model_name)
model_store["model"] = AutoModelForSequenceClassification.from_pretrained(model_name)
model_store["model"].eval()
if torch.cuda.is_available():
model_store["model"] = model_store["model"].cuda()
logger.info("Model hazır.")
yield
# shutdown
model_store.clear()
app = FastAPI(title="ML Inference API", version="1.0.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
# ── Endpoints ─────────────────────────────────────────────────────
@app.get("/health")
async def health():
model_loaded = "model" in model_store
if not model_loaded:
raise HTTPException(status_code=503, detail="Model henüz yüklenmedi")
return {"status": "ok", "model": "loaded"}
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")
@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
t0 = time.perf_counter()
try:
tokenizer = model_store["tokenizer"]
model = model_store["model"]
inputs = tokenizer(req.text, return_tensors="pt", truncation=True, max_length=512)
if next(model.parameters()).is_cuda:
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
logits = model(**inputs).logits
probs = torch.nn.functional.softmax(logits, dim=-1)[0]
idx = int(probs.argmax())
label = model.config.id2label[idx]
score = float(probs[idx])
latency_ms = (time.perf_counter() - t0) * 1000
LATENCY.observe(latency_ms)
REQUEST_COUNT.labels(status="ok").inc()
return PredictResponse(label=label, score=score, latency_ms=round(latency_ms, 2))
except Exception as e:
REQUEST_COUNT.labels(status="error").inc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch", response_model=BatchPredictResponse)
async def predict_batch(req: BatchPredictRequest):
t0 = time.perf_counter()
results = []
tokenizer = model_store["tokenizer"]
model = model_store["model"]
inputs = tokenizer(
req.texts, return_tensors="pt", padding=True,
truncation=True, max_length=512,
)
if next(model.parameters()).is_cuda:
inputs = {k: v.cuda() for k, v in inputs.items()}
with torch.no_grad():
logits = model(**inputs).logits
probs_all = torch.nn.functional.softmax(logits, dim=-1)
for probs in probs_all:
idx = int(probs.argmax())
results.append(PredictResponse(
label=model.config.id2label[idx],
score=float(probs[idx]),
latency_ms=0,
))
total_ms = (time.perf_counter() - t0) * 1000
return BatchPredictResponse(predictions=results, total_latency_ms=round(total_ms, 2))
# Geliştirme modunda çalıştır
uvicorn serve:app --host 0.0.0.0 --port 8000 --reload
# Üretim için (workers = 2 × CPU + 1)
uvicorn serve:app --host 0.0.0.0 --port 8000 --workers 4 --no-access-log
02 Model Serialization
Modeli doğru formatta kaydetmek, farklı ortamlarda çalıştırabilmek ve inference hızını iyileştirmek için kritiktir.
Bir PyTorch modelini pickle ile kaydetmek başlangıç için işe yarar ama üretimde sorunludur: Python versiyonuna bağımlıdır, güvenlik açığı içerebilir ve farklı framework'lerle kullanılamaz. Production deployment için üç temel format vardır.
Format Karşılaştırması
| Format | Boyut | Python Gerekli? | Hız | Framework |
|---|---|---|---|---|
| state_dict (.pt) | 1x | Evet | Baseline | Yalnızca PyTorch |
| TorchScript (.pt) | 1x | Hayır | +%10–20 | PyTorch C++ |
| ONNX (.onnx) | ~0.9x | Hayır | +%20–50 | Her framework |
| TensorRT (.trt) | ~0.5x | Hayır | +%200–400 | NVIDIA GPU only |
PyTorch → TorchScript → ONNX
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from pathlib import Path
MODEL_NAME = "distilbert-base-uncased-finetuned-sst-2-english"
OUT_DIR = Path("./model_artifacts")
OUT_DIR.mkdir(exist_ok=True)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.eval()
# ── 1. state_dict (en basit, PyTorch only) ────────────────────────
torch.save(model.state_dict(), OUT_DIR / "model_state_dict.pt")
# ── 2. TorchScript (graph capture) ────────────────────────────────
try:
dummy_input = tokenizer("örnek metin", return_tensors="pt")
scripted = torch.jit.trace(
model,
(dummy_input["input_ids"], dummy_input["attention_mask"]),
)
scripted.save(str(OUT_DIR / "model_scripted.pt"))
print("TorchScript kaydedildi.")
except Exception as e:
print(f"TorchScript başarısız: {e}")
# ── 3. ONNX export ────────────────────────────────────────────────
dummy = tokenizer("örnek metin", return_tensors="pt", max_length=128, truncation=True)
torch.onnx.export(
model,
(dummy["input_ids"], dummy["attention_mask"]),
str(OUT_DIR / "model.onnx"),
input_names=["input_ids", "attention_mask"],
output_names=["logits"],
dynamic_axes={
"input_ids": {0: "batch", 1: "seq_len"},
"attention_mask": {0: "batch", 1: "seq_len"},
"logits": {0: "batch"},
},
opset_version=17,
do_constant_folding=True,
)
print("ONNX kaydedildi.")
# ── 4. ONNX Runtime ile inference doğrulama ────────────────────────
import onnxruntime as ort
import numpy as np
sess = ort.InferenceSession(
str(OUT_DIR / "model.onnx"),
providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
ort_inputs = {
"input_ids": dummy["input_ids"].numpy(),
"attention_mask": dummy["attention_mask"].numpy(),
}
ort_logits = sess.run(["logits"], ort_inputs)[0]
print(f"ONNX çıktı: {ort_logits}")
# HuggingFace Optimum ile daha kolay ONNX export
# from optimum.onnxruntime import ORTModelForSequenceClassification
# ort_model = ORTModelForSequenceClassification.from_pretrained(MODEL_NAME, export=True)
# ort_model.save_pretrained(str(OUT_DIR / "optimum_onnx"))
03 Docker ile Konteynerleştirme
Modeli taşınabilir ve tekrarlanabilir kılmak için Docker; GPU desteği için NVIDIA CUDA base image ve çok aşamalı build.
Docker, inference servisini "benim makinemde çalışıyor" tuzağından çıkarır. Base image seçimi kritiktir: python:3.11-slim minimal CPU ortamı sağlar; GPU gerekiyorsa nvidia/cuda:12.3.0-cudnn9-runtime-ubuntu22.04 veya pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime kullanılmalıdır.
CPU Dockerfile
# ── Stage 1: builder ──────────────────────────────────────────────
FROM python:3.11-slim AS builder
WORKDIR /build
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc g++ && \
rm -rf /var/lib/apt/lists/*
# Önce sadece requirements kopyala → layer cache optimizasyonu
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# ── Stage 2: runtime ──────────────────────────────────────────────
FROM python:3.11-slim AS runtime
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
MODEL_DIR=/app/model_artifacts \
PORT=8000
# Builder'dan kurulu paketleri kopyala
COPY --from=builder /install /usr/local
# Uygulama kodunu kopyala
COPY serve.py .
COPY model_artifacts/ ./model_artifacts/
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:${PORT}/health').raise_for_status()"
EXPOSE $PORT
CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
GPU Dockerfile
FROM pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
NVIDIA_VISIBLE_DEVICES=all \
NVIDIA_DRIVER_CAPABILITIES=compute,utility
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY serve.py model_artifacts/ ./
CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: "3.9"
services:
inference:
build:
context: .
dockerfile: Dockerfile
ports: ["8000:8000"]
environment:
- PORT=8000
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
deploy:
resources:
limits:
cpus: "2.0"
memory: "4G"
inference-gpu:
build:
context: .
dockerfile: Dockerfile.gpu
ports: ["8001:8000"]
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
.dockerignore dosyasına __pycache__, .git, *.ipynb, tests/, .env ekle. Model ağırlıkları büyükse Docker layer'a koymak yerine başlangıçta S3/GCS'den indir; bu image boyutunu önemli ölçüde azaltır.
04 Triton Inference Server
NVIDIA Triton, yüksek throughput GPU serving için tasarlanmış production-grade inference sunucusudur; dynamic batching ve concurrent execution ile GPU'yu maksimize eder.
Triton Inference Server, NVIDIA'nın açık kaynak model sunucusudur. PyTorch, ONNX, TensorRT, TensorFlow ve OpenVINO modellerini aynı sunucu üzerinde çalıştırabilir. HTTP ve gRPC endpoint'leri otomatik açar; dynamic batching ile düşük batch size'ları otomatik birleştirir.
Model Repository Yapısı
model_repository/
├── sentiment_onnx/
│ ├── config.pbtxt
│ └── 1/
│ └── model.onnx
└── sentiment_ensemble/
├── config.pbtxt
└── 1/
└── (ensemble — dosya yok)
name: "sentiment_onnx"
backend: "onnxruntime"
max_batch_size: 32
# Dynamic batching: 100ms içinde gelen istekleri birleştir
dynamic_batching {
preferred_batch_size: [ 4, 8, 16 ]
max_queue_delay_microseconds: 100000
}
input [
{
name: "input_ids"
data_type: TYPE_INT64
dims: [ -1 ] # dinamik sequence length
},
{
name: "attention_mask"
data_type: TYPE_INT64
dims: [ -1 ]
}
]
output [
{
name: "logits"
data_type: TYPE_FP32
dims: [ 2 ]
}
]
instance_group [
{
count: 2 # 2 eşzamanlı model instance
kind: KIND_GPU
gpus: [ 0 ]
}
]
Triton ile Sunucu Başlatma
# Docker ile Triton başlat
docker run --gpus all --rm \
-p 8000:8000 -p 8001:8001 -p 8002:8002 \
-v $(pwd)/model_repository:/models \
nvcr.io/nvidia/tritonserver:24.03-py3 \
tritonserver --model-repository=/models \
--strict-model-config=false \
--log-verbose=1
# Sağlık kontrolü
curl http://localhost:8000/v2/health/ready
Python Client
import numpy as np
import tritonclient.http as httpclient
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
client = httpclient.InferenceServerClient(url="localhost:8000")
def triton_predict(texts: list[str]) -> list[str]:
enc = tokenizer(
texts, padding=True, truncation=True,
max_length=128, return_tensors="np",
)
input_ids = httpclient.InferInput(
"input_ids", enc["input_ids"].shape, "INT64"
)
input_ids.set_data_from_numpy(enc["input_ids"].astype(np.int64))
attention_mask = httpclient.InferInput(
"attention_mask", enc["attention_mask"].shape, "INT64"
)
attention_mask.set_data_from_numpy(enc["attention_mask"].astype(np.int64))
output = httpclient.InferRequestedOutput("logits")
response = client.infer(
"sentiment_onnx",
inputs=[input_ids, attention_mask],
outputs=[output],
)
logits = response.as_numpy("logits") # (batch, 2)
labels = ["NEGATIVE", "POSITIVE"]
return [labels[np.argmax(row)] for row in logits]
if __name__ == "__main__":
results = triton_predict(["Bu harika!", "Çok kötüydü."])
print(results) # ['POSITIVE', 'NEGATIVE']
05 Model Registry — MLflow
MLflow, deney takibinden model registry'ye tek bir açık kaynak platformda ML lifecycle'ı kapsar.
MLflow dört bileşenden oluşur: Tracking (deney ve metrik kaydı), Projects (ortam tanımı), Models (model paketleme ve serving), Registry (versiyon ve lifecycle yönetimi). Model Registry ile modeller Staging → Production akışında ilerler; her versiyon metadata ile birlikte saklanır.
pip install mlflow
# MLflow UI başlat (SQLite backend)
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlruns \
--host 0.0.0.0 --port 5000
Eğitim ve Logging
import mlflow
import mlflow.pytorch
from mlflow.models import infer_signature
import torch, numpy as np
from transformers import AutoModelForSequenceClassification, AutoTokenizer
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("sentiment-classifier")
with mlflow.start_run(run_name="distilbert-finetune-v3") as run:
# Parametreleri logla
params = {
"model_name": "distilbert-base-uncased",
"learning_rate": 2e-5,
"epochs": 3,
"batch_size": 32,
"max_length": 128,
}
mlflow.log_params(params)
# Eğitim döngüsü simülasyonu
for epoch in range(3):
train_loss = 0.8 - epoch * 0.2 + np.random.uniform(-0.05, 0.05)
val_acc = 0.85 + epoch * 0.03
mlflow.log_metrics({
"train_loss": train_loss,
"val_accuracy": val_acc,
}, step=epoch)
# Son metrikler
mlflow.log_metric("test_f1", 0.934)
mlflow.log_metric("test_accuracy", 0.921)
# Model kaydet
model = AutoModelForSequenceClassification.from_pretrained(
"distilbert-base-uncased-finetuned-sst-2-english"
)
tokenizer = AutoTokenizer.from_pretrained(
"distilbert-base-uncased-finetuned-sst-2-english"
)
# İmza tanımla: input/output şeması
dummy_input = np.array([[101, 2023, 2003, 102]])
dummy_output = np.array([[0.1, 0.9]])
signature = infer_signature(dummy_input, dummy_output)
mlflow.pytorch.log_model(
pytorch_model=model,
artifact_path="model",
signature=signature,
registered_model_name="SentimentClassifier",
)
print(f"Run ID: {run.info.run_id}")
# ── Model versiyonunu Staging'e al ────────────────────────────────
client = mlflow.MlflowClient()
latest = client.get_latest_versions("SentimentClassifier", stages=["None"])[0]
client.transition_model_version_stage(
name="SentimentClassifier",
version=latest.version,
stage="Staging",
archive_existing_versions=False,
)
print(f"Versiyon {latest.version} Staging'e alındı.")
# ── Production'a geçiş (CI/CD tetikler) ──────────────────────────
def promote_to_production(model_name: str, version: str):
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=True, # eski production'ı arşivle
)
print(f"v{version} Production'a alındı.")
MLflow Model Registry production'a geçişi otomatik yapmaz; bir insan onayı veya CI gate gerektirir. mlflow.get_model_version_download_uri() ile modeli artifact store'dan çekip Triton veya FastAPI sunucuna konuşlandır.
06 CI/CD Pipeline
Model deployment'ı el ile yapmak hata üretir; GitHub Actions ile lint → test → build → push → deploy akışını tamamen otomatize et.
ML CI/CD pipeline'ı klasik yazılım pipeline'ından farklıdır: kod testleri yanında model validasyon gate'i eklemek gerekir. Yeni model belirli metrik eşiğini geçemezse deploy bloklanmalıdır.
name: ML Deploy Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: $/inference
jobs:
lint-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Python kur
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: pip
- name: Bağımlılıkları yükle
run: pip install -r requirements.txt -r requirements-dev.txt
- name: Ruff lint
run: ruff check . --output-format=github
- name: Mypy tip kontrolü
run: mypy serve.py --ignore-missing-imports
- name: Pytest
run: pytest tests/ -v --tb=short
model-validation:
runs-on: ubuntu-latest
needs: lint-and-test
steps:
- uses: actions/checkout@v4
- name: Model indir (MLflow artifact)
env:
MLFLOW_TRACKING_URI: $
run: |
pip install mlflow
python scripts/download_model.py --stage Production
- name: Validasyon gate
run: |
python scripts/validate_model.py \
--model-path ./model_artifacts \
--min-accuracy 0.90 \
--max-latency-ms 200
# Bu adım başarısız olursa pipeline durur
build-and-push:
runs-on: ubuntu-latest
needs: model-validation
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Docker login (GHCR)
uses: docker/login-action@v3
with:
registry: $
username: $
password: $
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: $/$
tags: |
type=sha,prefix=sha-
type=raw,value=latest
- name: Build ve push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: $
cache-from: type=gha
cache-to: type=gha,mode=max
deploy-canary:
runs-on: ubuntu-latest
needs: build-and-push
environment: production
steps:
- name: Canary deploy (%5 trafik)
run: |
echo "Canary deploy: $"
# kubectl set image deploy/inference inference=$IMAGE_TAG
# kubectl scale deploy/inference-canary --replicas=1
Validasyon Script
import argparse, sys, time
import numpy as np
from pathlib import Path
def validate(model_path: str, min_accuracy: float, max_latency_ms: float):
import onnxruntime as ort
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
sess = ort.InferenceSession(f"{model_path}/model.onnx")
# Smoke test verisi
test_cases = [
("I love this product!", "POSITIVE"),
("This is terrible.", "NEGATIVE"),
("Amazing quality!", "POSITIVE"),
("Waste of money.", "NEGATIVE"),
]
correct, latencies = 0, []
for text, expected in test_cases:
enc = tokenizer(text, return_tensors="np", max_length=128, truncation=True)
t0 = time.perf_counter()
logits = sess.run(["logits"], {
"input_ids": enc["input_ids"].astype(np.int64),
"attention_mask": enc["attention_mask"].astype(np.int64),
})[0]
latency = (time.perf_counter() - t0) * 1000
latencies.append(latency)
pred = ["NEGATIVE", "POSITIVE"][np.argmax(logits[0])]
if pred == expected:
correct += 1
accuracy = correct / len(test_cases)
p95_latency = np.percentile(latencies, 95)
print(f"Accuracy: {accuracy:.2%} (min: {min_accuracy:.2%})")
print(f"P95 Latency: {p95_latency:.1f}ms (max: {max_latency_ms}ms)")
if accuracy < min_accuracy:
print(f"HATA: Accuracy eşiği aşılamadı.", file=sys.stderr)
sys.exit(1)
if p95_latency > max_latency_ms:
print(f"HATA: Latency eşiği aşıldı.", file=sys.stderr)
sys.exit(1)
print("Validasyon başarılı.")
if __name__ == "__main__":
p = argparse.ArgumentParser()
p.add_argument("--model-path", required=True)
p.add_argument("--min-accuracy", type=float, default=0.90)
p.add_argument("--max-latency-ms", type=float, default=200.0)
args = p.parse_args()
validate(args.model_path, args.min_accuracy, args.max_latency_ms)
07 Canary Deployment
Yeni modeli tüm trafiğe açmak yerine kademeli rollout ile riskleri minimize et; sorun çıkarsa saniyeler içinde geri al.
Canary deployment, yeni versiyonu önce küçük bir trafik dilimine (%5) göndererek hata oranı ve latency'yi izleme stratejisidir. Metrikler sağlıklıysa trafik artırılır (%25 → %50 → %100). Sorun çıkarsa yalnızca canary rollback yapılır; tüm kullanıcılar etkilenmez.
Nginx ile Traffic Splitting
upstream inference_stable {
server inference-stable:8000 weight=95;
}
upstream inference_canary {
server inference-canary:8000 weight=5;
}
# Split map: 5% canary, 95% stable
split_clients "${request_id}" $backend {
5% inference_canary;
* inference_stable;
}
server {
listen 80;
location /predict {
proxy_pass http://$backend;
proxy_set_header X-Deployment-Version $backend;
proxy_set_header Host $host;
proxy_connect_timeout 5s;
proxy_read_timeout 30s;
}
location /health {
proxy_pass http://inference_stable;
}
}
Kademeli Rollout Script
#!/bin/bash
# Canary kademeli rollout: 5% → 25% → 50% → 100%
CANARY_IMAGE="ghcr.io/org/inference:sha-abc123"
STABLE_IMAGE="ghcr.io/org/inference:sha-def456"
ERROR_THRESHOLD="0.02" # %2 hata oranı eşiği
LATENCY_P95_THRESHOLD="2000" # 2000ms
check_metrics() {
local weight=$1
echo "Canary ağırlığı: ${weight}% — metrikler izleniyor (5 dakika)..."
sleep 300
# Prometheus'tan metrik sorgula
ERROR_RATE=$(curl -s "http://prometheus:9090/api/v1/query?query=\
rate(inference_requests_total{status='error',version='canary'}[5m])/\
rate(inference_requests_total{version='canary'}[5m])" \
| jq -r '.data.result[0].value[1] // "0"')
if (( $(echo "$ERROR_RATE > $ERROR_THRESHOLD" | bc -l) )); then
echo "ROLLBACK: Hata oranı yüksek ($ERROR_RATE > $ERROR_THRESHOLD)"
return 1
fi
return 0
}
rollout() {
for weight in 5 25 50 100; do
echo "Canary trafik: ${weight}%"
# docker service update ile ağırlık güncelle (Swarm)
# kubectl ile Kubernetes'te replicas ayarla
docker service update \
--replicas-max-per-node 1 \
--label-add "canary.weight=${weight}" \
inference_canary
if ! check_metrics $weight; then
echo "Rollback yapılıyor..."
docker service scale inference_canary=0
exit 1
fi
echo "${weight}% geçti, devam ediliyor."
done
echo "Tam rollout tamamlandı!"
}
rollout
Canary deployment sırasında iki model versiyonu aynı anda çalışır. Kullanıcı oturumlarının tutarlı bir versiyona yönlendirilmesi için sticky session (kullanıcı ID hash'ine göre) kullan; aksi halde aynı kullanıcı farklı model davranışlarıyla karşılaşır.
08 A/B Testing
İki model versiyonundan hangisinin gerçekten daha iyi olduğunu istatistiksel olarak kanıtla; varsayıma değil veriye dayan.
Canary deployment ile A/B test kavramsal olarak benzerdir ama amaçları farklıdır: canary risk yönetimini hedefler, A/B test ise iki versiyonun belirli bir metrik üzerindeki etkisini karşılaştırır. A/B testte kullanıcılar deterministik olarak gruplara atanır; istatistiksel anlamlılık hesaplanır.
Kullanıcı Atama Mantığı
import hashlib
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
import json
from pathlib import Path
class Variant(Enum):
CONTROL = "A" # mevcut model
TREATMENT = "B" # yeni model
@dataclass
class ABEvent:
user_id: str
variant: str
metric_name: str
metric_value: float
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
def assign_variant(
user_id: str,
experiment_name: str,
control_ratio: float = 0.5,
) -> Variant:
"""Kullanıcıyı deterministik olarak gruba ata (hash tabanlı)."""
key = f"{experiment_name}:{user_id}"
digest = hashlib.md5(key.encode()).hexdigest()
# Hash'in ilk 8 hex karakterini [0, 1) aralığına dönüştür
bucket = int(digest[:8], 16) / 0xFFFFFFFF
return Variant.CONTROL if bucket < control_ratio else Variant.TREATMENT
class ABExperiment:
def __init__(self, name: str, log_path: str = "ab_events.jsonl"):
self.name = name
self.log_path = Path(log_path)
self._events: list[ABEvent] = []
def get_variant(self, user_id: str) -> Variant:
return assign_variant(user_id, self.name)
def log_metric(self, user_id: str, metric_name: str, value: float):
variant = self.get_variant(user_id)
event = ABEvent(
user_id=user_id,
variant=variant.value,
metric_name=metric_name,
metric_value=value,
)
self._events.append(event)
with self.log_path.open("a") as f:
f.write(json.dumps(event.__dict__) + "\n")
return event
İstatistiksel Anlamlılık Testi
import numpy as np
from scipy import stats
from dataclasses import dataclass
import json
from pathlib import Path
@dataclass
class ABResult:
n_control: int
n_treatment: int
mean_control: float
mean_treatment: float
relative_lift: float
p_value: float
is_significant: bool
confidence_interval: tuple[float, float]
def analyze_experiment(
log_path: str,
metric_name: str,
alpha: float = 0.05,
) -> ABResult:
"""JSONL log dosyasından A/B test sonucunu analiz et."""
control, treatment = [], []
with Path(log_path).open() as f:
for line in f:
event = json.loads(line)
if event["metric_name"] != metric_name:
continue
if event["variant"] == "A":
control.append(event["metric_value"])
else:
treatment.append(event["metric_value"])
ctrl = np.array(control)
treat = np.array(treatment)
# Welch's t-test (eşit varyans varsayımı yok)
t_stat, p_value = stats.ttest_ind(treat, ctrl, equal_var=False)
# %95 güven aralığı (treatment - control fark)
diff = treat.mean() - ctrl.mean()
se = np.sqrt(treat.var(ddof=1)/len(treat) + ctrl.var(ddof=1)/len(ctrl))
ci = stats.norm.interval(0.95, loc=diff, scale=se)
lift = (treat.mean() - ctrl.mean()) / ctrl.mean() * 100
return ABResult(
n_control=len(ctrl),
n_treatment=len(treat),
mean_control=round(float(ctrl.mean()), 4),
mean_treatment=round(float(treat.mean()), 4),
relative_lift=round(lift, 2),
p_value=round(float(p_value), 4),
is_significant=p_value < alpha,
confidence_interval=(round(ci[0], 4), round(ci[1], 4)),
)
def minimum_sample_size(
baseline_rate: float,
mde: float, # Minimum Detectable Effect
alpha: float = 0.05,
power: float = 0.80,
) -> int:
"""Her grup için minimum örneklem boyutunu hesapla."""
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + mde)
p_bar = (p1 + p2) / 2
n = ((z_alpha * np.sqrt(2 * p_bar * (1 - p_bar))
+ z_beta * np.sqrt(p1*(1-p1) + p2*(1-p2))) / (p2 - p1)) ** 2
return int(np.ceil(n))
# Kullanım örneği
if __name__ == "__main__":
# Minimum örneklem: baseline %20 CTR, %10 iyileşme beklentisi
n = minimum_sample_size(baseline_rate=0.20, mde=0.10)
print(f"Her grup için minimum örneklem: {n} kullanıcı")
# Test (gerçek logdan)
# result = analyze_experiment("ab_events.jsonl", "satisfaction_score")
# print(result)
A/B testi istatistiksel anlamlılığa ulaşmadan sonlandırma peeking problemidir ve yanlış pozitif üretir. Minimum örneklem boyutuna ulaşmadan sonucu yorumlama. Ayrıca Bonferroni düzeltmesi: birden fazla metrik test ediyorsan alpha / n_metrics kullan.
MLOps Araçlarını Birbirine Bağlamak
01 MLflow → yeni model eğit, metrikleri kaydet 02 CI/CD Gate → accuracy & latency eşiği geç 03 Docker Build → image oluştur, registry'e push 04 Canary → %5 trafikle canlıya al 05 A/B Test → istatistiksel anlamlılık için n gün bekle 06 Full Rollout → kazanan modeli %100'e çıkar 07 OTel + Grafana → canlıda izle, drift alarm kur