Tüm rehberler
RehberYapay Zeka13 · Uygulama

MLOps
& Model Deployment.

Modeli notebook'tan üretime taşı. FastAPI ile inference endpoint, Docker ile konteynerleştirme, Triton Inference Server ile GPU serving. Model registry, versiyonlama ve canary deployment.

00 MLOps Nedir?

MLOps, makine öğrenmesi modellerini güvenilir ve tekrarlanabilir şekilde üretime taşımak için DevOps pratiklerini ML yaşam döngüsüne uygular.

Klasik yazılım deployment'ında tek bir bağımlılık vardır: kod. Bir ML modelini dağıtırken ise üç bağımlılık birbirinden ayrılamaz: kod (model mimarisi, inference mantığı), veri (eğitim seti, preprocessing pipeline), model ağırlıkları (checkpoint, quantization durumu). Bu üçlü bağımlılık, geleneksel CI/CD araçlarının ML için yetersiz kalmasının temel sebebidir.

ML Lifecycle

01 Veri          → toplama, temizleme, etiketleme, versiyon
02 Eğitim        → hyperparameter search, experiment tracking
03 Validasyon    → offline metrikler, business metrikler
04 Deployment    → packaging, serving, A/B test
05 Monitoring    → drift, latency, kalite izleme
06 Retraining    → trigger belirleme, pipeline yeniden çalıştırma

MLOps Olgunluk Seviyeleri

SeviyeÖzellikAraçlar
Level 0Tamamen manuel. Notebook'tan kopyala yapıştır. No reproducibility.Jupyter, pickle
Level 1Eğitim pipeline'ı otomatik. Model deployment hâlâ manuel.MLflow, DVC
Level 2CI/CD pipeline var. Deploy otomatik ama retraining trigger manuel.GitHub Actions, Docker
Level 3Tam otomasyon. Drift → retraining trigger → canary deploy → monitoring döngüsü.Kubeflow, Vertex AI, SageMaker

MLOps Araç Seti

Experiment TrackingMLflow, Weights & Biases, Neptune — deney parametrelerini ve metrikleri logla.
Data VersioningDVC, LakeFS — veri setlerini git gibi versiyonla, büyük dosyaları uzak storage'da tut.
Model RegistryMLflow Model Registry, HuggingFace Hub — model versiyonları ve lifecycle yönetimi.
ServingFastAPI, Triton, TorchServe, BentoML — modeli REST veya gRPC API olarak sun.
OrchestrationKubeflow Pipelines, Prefect, Airflow — pipeline adımlarını sırala ve bağımlılıkları yönet.
Feature StoreFeast, Tecton — eğitim ve inference için tutarlı feature hesaplama.

01 FastAPI ile Inference Endpoint

Modeli REST API olarak sunmanın en hızlı ve production-ready yolu: async FastAPI, Pydantic validasyon, health check ve batch inference.

FastAPI, Python'un en performanslı web framework'lerinden biridir. Async destek, otomatik OpenAPI dokümantasyon ve Pydantic entegrasyonu ile ML inference endpoint'i yazmayı minimalist tutar. uvicorn ile saniyeler içinde production-grade servis çalıştırabilirsin.

serve.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from contextlib import asynccontextmanager
import torch, time
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from prometheus_client import Counter, Histogram, generate_latest
from starlette.responses import Response
import logging

logger = logging.getLogger(__name__)

# ── Prometheus metrikleri ──────────────────────────────────────────
REQUEST_COUNT = Counter("inference_requests_total", "Toplam istek", ["status"])
LATENCY = Histogram("inference_latency_ms", "Latency (ms)", buckets=[10,50,100,250,500,1000])

# ── Pydantic şemaları ──────────────────────────────────────────────
class PredictRequest(BaseModel):
    text: str = Field(..., min_length=1, max_length=512, description="Sınıflandırılacak metin")
    top_k: int = Field(default=1, ge=1, le=5)

class BatchPredictRequest(BaseModel):
    texts: list[str] = Field(..., min_length=1, max_length=32)
    top_k: int = Field(default=1, ge=1, le=5)

class PredictResponse(BaseModel):
    label: str
    score: float
    latency_ms: float

class BatchPredictResponse(BaseModel):
    predictions: list[PredictResponse]
    total_latency_ms: float

# ── Model yükleme (lifespan) ───────────────────────────────────────
model_store: dict = {}

@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup
    model_name = "distilbert-base-uncased-finetuned-sst-2-english"
    logger.info(f"Model yükleniyor: {model_name}")
    model_store["tokenizer"] = AutoTokenizer.from_pretrained(model_name)
    model_store["model"] = AutoModelForSequenceClassification.from_pretrained(model_name)
    model_store["model"].eval()
    if torch.cuda.is_available():
        model_store["model"] = model_store["model"].cuda()
    logger.info("Model hazır.")
    yield
    # shutdown
    model_store.clear()

app = FastAPI(title="ML Inference API", version="1.0.0", lifespan=lifespan)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

# ── Endpoints ─────────────────────────────────────────────────────
@app.get("/health")
async def health():
    model_loaded = "model" in model_store
    if not model_loaded:
        raise HTTPException(status_code=503, detail="Model henüz yüklenmedi")
    return {"status": "ok", "model": "loaded"}

@app.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type="text/plain")

@app.post("/predict", response_model=PredictResponse)
async def predict(req: PredictRequest):
    t0 = time.perf_counter()
    try:
        tokenizer = model_store["tokenizer"]
        model     = model_store["model"]
        inputs = tokenizer(req.text, return_tensors="pt", truncation=True, max_length=512)
        if next(model.parameters()).is_cuda:
            inputs = {k: v.cuda() for k, v in inputs.items()}
        with torch.no_grad():
            logits = model(**inputs).logits
        probs = torch.nn.functional.softmax(logits, dim=-1)[0]
        idx   = int(probs.argmax())
        label = model.config.id2label[idx]
        score = float(probs[idx])
        latency_ms = (time.perf_counter() - t0) * 1000
        LATENCY.observe(latency_ms)
        REQUEST_COUNT.labels(status="ok").inc()
        return PredictResponse(label=label, score=score, latency_ms=round(latency_ms, 2))
    except Exception as e:
        REQUEST_COUNT.labels(status="error").inc()
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/predict/batch", response_model=BatchPredictResponse)
async def predict_batch(req: BatchPredictRequest):
    t0 = time.perf_counter()
    results = []
    tokenizer = model_store["tokenizer"]
    model     = model_store["model"]
    inputs = tokenizer(
        req.texts, return_tensors="pt", padding=True,
        truncation=True, max_length=512,
    )
    if next(model.parameters()).is_cuda:
        inputs = {k: v.cuda() for k, v in inputs.items()}
    with torch.no_grad():
        logits = model(**inputs).logits
    probs_all = torch.nn.functional.softmax(logits, dim=-1)
    for probs in probs_all:
        idx   = int(probs.argmax())
        results.append(PredictResponse(
            label=model.config.id2label[idx],
            score=float(probs[idx]),
            latency_ms=0,
        ))
    total_ms = (time.perf_counter() - t0) * 1000
    return BatchPredictResponse(predictions=results, total_latency_ms=round(total_ms, 2))
terminal
# Geliştirme modunda çalıştır
uvicorn serve:app --host 0.0.0.0 --port 8000 --reload

# Üretim için (workers = 2 × CPU + 1)
uvicorn serve:app --host 0.0.0.0 --port 8000 --workers 4 --no-access-log

02 Model Serialization

Modeli doğru formatta kaydetmek, farklı ortamlarda çalıştırabilmek ve inference hızını iyileştirmek için kritiktir.

Bir PyTorch modelini pickle ile kaydetmek başlangıç için işe yarar ama üretimde sorunludur: Python versiyonuna bağımlıdır, güvenlik açığı içerebilir ve farklı framework'lerle kullanılamaz. Production deployment için üç temel format vardır.

Format Karşılaştırması

FormatBoyutPython Gerekli?HızFramework
state_dict (.pt)1xEvetBaselineYalnızca PyTorch
TorchScript (.pt)1xHayır+%10–20PyTorch C++
ONNX (.onnx)~0.9xHayır+%20–50Her framework
TensorRT (.trt)~0.5xHayır+%200–400NVIDIA GPU only

PyTorch → TorchScript → ONNX

export_model.py
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from pathlib import Path

MODEL_NAME = "distilbert-base-uncased-finetuned-sst-2-english"
OUT_DIR    = Path("./model_artifacts")
OUT_DIR.mkdir(exist_ok=True)

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model     = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.eval()

# ── 1. state_dict (en basit, PyTorch only) ────────────────────────
torch.save(model.state_dict(), OUT_DIR / "model_state_dict.pt")

# ── 2. TorchScript (graph capture) ────────────────────────────────
try:
    dummy_input = tokenizer("örnek metin", return_tensors="pt")
    scripted = torch.jit.trace(
        model,
        (dummy_input["input_ids"], dummy_input["attention_mask"]),
    )
    scripted.save(str(OUT_DIR / "model_scripted.pt"))
    print("TorchScript kaydedildi.")
except Exception as e:
    print(f"TorchScript başarısız: {e}")

# ── 3. ONNX export ────────────────────────────────────────────────
dummy = tokenizer("örnek metin", return_tensors="pt", max_length=128, truncation=True)

torch.onnx.export(
    model,
    (dummy["input_ids"], dummy["attention_mask"]),
    str(OUT_DIR / "model.onnx"),
    input_names=["input_ids", "attention_mask"],
    output_names=["logits"],
    dynamic_axes={
        "input_ids":      {0: "batch", 1: "seq_len"},
        "attention_mask": {0: "batch", 1: "seq_len"},
        "logits":         {0: "batch"},
    },
    opset_version=17,
    do_constant_folding=True,
)
print("ONNX kaydedildi.")

# ── 4. ONNX Runtime ile inference doğrulama ────────────────────────
import onnxruntime as ort
import numpy as np

sess = ort.InferenceSession(
    str(OUT_DIR / "model.onnx"),
    providers=["CUDAExecutionProvider", "CPUExecutionProvider"],
)
ort_inputs = {
    "input_ids":      dummy["input_ids"].numpy(),
    "attention_mask": dummy["attention_mask"].numpy(),
}
ort_logits = sess.run(["logits"], ort_inputs)[0]
print(f"ONNX çıktı: {ort_logits}")

# HuggingFace Optimum ile daha kolay ONNX export
# from optimum.onnxruntime import ORTModelForSequenceClassification
# ort_model = ORTModelForSequenceClassification.from_pretrained(MODEL_NAME, export=True)
# ort_model.save_pretrained(str(OUT_DIR / "optimum_onnx"))

03 Docker ile Konteynerleştirme

Modeli taşınabilir ve tekrarlanabilir kılmak için Docker; GPU desteği için NVIDIA CUDA base image ve çok aşamalı build.

Docker, inference servisini "benim makinemde çalışıyor" tuzağından çıkarır. Base image seçimi kritiktir: python:3.11-slim minimal CPU ortamı sağlar; GPU gerekiyorsa nvidia/cuda:12.3.0-cudnn9-runtime-ubuntu22.04 veya pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime kullanılmalıdır.

CPU Dockerfile

Dockerfile
# ── Stage 1: builder ──────────────────────────────────────────────
FROM python:3.11-slim AS builder

WORKDIR /build
RUN apt-get update && apt-get install -y --no-install-recommends \
        gcc g++ && \
    rm -rf /var/lib/apt/lists/*

# Önce sadece requirements kopyala → layer cache optimizasyonu
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# ── Stage 2: runtime ──────────────────────────────────────────────
FROM python:3.11-slim AS runtime

WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    MODEL_DIR=/app/model_artifacts \
    PORT=8000

# Builder'dan kurulu paketleri kopyala
COPY --from=builder /install /usr/local

# Uygulama kodunu kopyala
COPY serve.py .
COPY model_artifacts/ ./model_artifacts/

RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD python -c "import httpx; httpx.get('http://localhost:${PORT}/health').raise_for_status()"

EXPOSE $PORT
CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

GPU Dockerfile

Dockerfile.gpu
FROM pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime

WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    NVIDIA_VISIBLE_DEVICES=all \
    NVIDIA_DRIVER_CAPABILITIES=compute,utility

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY serve.py model_artifacts/ ./

CMD ["uvicorn", "serve:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

docker-compose.yml
version: "3.9"
services:
  inference:
    build:
      context: .
      dockerfile: Dockerfile
    ports: ["8000:8000"]
    environment:
      - PORT=8000
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    deploy:
      resources:
        limits:
          cpus: "2.0"
          memory: "4G"

  inference-gpu:
    build:
      context: .
      dockerfile: Dockerfile.gpu
    ports: ["8001:8000"]
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
NOT

.dockerignore dosyasına __pycache__, .git, *.ipynb, tests/, .env ekle. Model ağırlıkları büyükse Docker layer'a koymak yerine başlangıçta S3/GCS'den indir; bu image boyutunu önemli ölçüde azaltır.

04 Triton Inference Server

NVIDIA Triton, yüksek throughput GPU serving için tasarlanmış production-grade inference sunucusudur; dynamic batching ve concurrent execution ile GPU'yu maksimize eder.

Triton Inference Server, NVIDIA'nın açık kaynak model sunucusudur. PyTorch, ONNX, TensorRT, TensorFlow ve OpenVINO modellerini aynı sunucu üzerinde çalıştırabilir. HTTP ve gRPC endpoint'leri otomatik açar; dynamic batching ile düşük batch size'ları otomatik birleştirir.

Model Repository Yapısı

model_repository/
model_repository/
├── sentiment_onnx/
│   ├── config.pbtxt
│   └── 1/
│       └── model.onnx
└── sentiment_ensemble/
    ├── config.pbtxt
    └── 1/
        └── (ensemble — dosya yok)
model_repository/sentiment_onnx/config.pbtxt
name: "sentiment_onnx"
backend: "onnxruntime"
max_batch_size: 32

# Dynamic batching: 100ms içinde gelen istekleri birleştir
dynamic_batching {
  preferred_batch_size: [ 4, 8, 16 ]
  max_queue_delay_microseconds: 100000
}

input [
  {
    name: "input_ids"
    data_type: TYPE_INT64
    dims: [ -1 ]   # dinamik sequence length
  },
  {
    name: "attention_mask"
    data_type: TYPE_INT64
    dims: [ -1 ]
  }
]

output [
  {
    name: "logits"
    data_type: TYPE_FP32
    dims: [ 2 ]
  }
]

instance_group [
  {
    count: 2               # 2 eşzamanlı model instance
    kind: KIND_GPU
    gpus: [ 0 ]
  }
]

Triton ile Sunucu Başlatma

terminal
# Docker ile Triton başlat
docker run --gpus all --rm \
  -p 8000:8000 -p 8001:8001 -p 8002:8002 \
  -v $(pwd)/model_repository:/models \
  nvcr.io/nvidia/tritonserver:24.03-py3 \
  tritonserver --model-repository=/models \
               --strict-model-config=false \
               --log-verbose=1

# Sağlık kontrolü
curl http://localhost:8000/v2/health/ready

Python Client

triton_client.py
import numpy as np
import tritonclient.http as httpclient
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
client = httpclient.InferenceServerClient(url="localhost:8000")

def triton_predict(texts: list[str]) -> list[str]:
    enc = tokenizer(
        texts, padding=True, truncation=True,
        max_length=128, return_tensors="np",
    )

    input_ids = httpclient.InferInput(
        "input_ids", enc["input_ids"].shape, "INT64"
    )
    input_ids.set_data_from_numpy(enc["input_ids"].astype(np.int64))

    attention_mask = httpclient.InferInput(
        "attention_mask", enc["attention_mask"].shape, "INT64"
    )
    attention_mask.set_data_from_numpy(enc["attention_mask"].astype(np.int64))

    output = httpclient.InferRequestedOutput("logits")
    response = client.infer(
        "sentiment_onnx",
        inputs=[input_ids, attention_mask],
        outputs=[output],
    )
    logits = response.as_numpy("logits")  # (batch, 2)
    labels = ["NEGATIVE", "POSITIVE"]
    return [labels[np.argmax(row)] for row in logits]

if __name__ == "__main__":
    results = triton_predict(["Bu harika!", "Çok kötüydü."])
    print(results)  # ['POSITIVE', 'NEGATIVE']

05 Model Registry — MLflow

MLflow, deney takibinden model registry'ye tek bir açık kaynak platformda ML lifecycle'ı kapsar.

MLflow dört bileşenden oluşur: Tracking (deney ve metrik kaydı), Projects (ortam tanımı), Models (model paketleme ve serving), Registry (versiyon ve lifecycle yönetimi). Model Registry ile modeller Staging → Production akışında ilerler; her versiyon metadata ile birlikte saklanır.

terminal
pip install mlflow

# MLflow UI başlat (SQLite backend)
mlflow server \
  --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root ./mlruns \
  --host 0.0.0.0 --port 5000

Eğitim ve Logging

train_and_log.py
import mlflow
import mlflow.pytorch
from mlflow.models import infer_signature
import torch, numpy as np
from transformers import AutoModelForSequenceClassification, AutoTokenizer

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("sentiment-classifier")

with mlflow.start_run(run_name="distilbert-finetune-v3") as run:
    # Parametreleri logla
    params = {
        "model_name": "distilbert-base-uncased",
        "learning_rate": 2e-5,
        "epochs": 3,
        "batch_size": 32,
        "max_length": 128,
    }
    mlflow.log_params(params)

    # Eğitim döngüsü simülasyonu
    for epoch in range(3):
        train_loss = 0.8 - epoch * 0.2 + np.random.uniform(-0.05, 0.05)
        val_acc    = 0.85 + epoch * 0.03
        mlflow.log_metrics({
            "train_loss": train_loss,
            "val_accuracy": val_acc,
        }, step=epoch)

    # Son metrikler
    mlflow.log_metric("test_f1", 0.934)
    mlflow.log_metric("test_accuracy", 0.921)

    # Model kaydet
    model = AutoModelForSequenceClassification.from_pretrained(
        "distilbert-base-uncased-finetuned-sst-2-english"
    )
    tokenizer = AutoTokenizer.from_pretrained(
        "distilbert-base-uncased-finetuned-sst-2-english"
    )

    # İmza tanımla: input/output şeması
    dummy_input  = np.array([[101, 2023, 2003, 102]])
    dummy_output = np.array([[0.1, 0.9]])
    signature = infer_signature(dummy_input, dummy_output)

    mlflow.pytorch.log_model(
        pytorch_model=model,
        artifact_path="model",
        signature=signature,
        registered_model_name="SentimentClassifier",
    )
    print(f"Run ID: {run.info.run_id}")

# ── Model versiyonunu Staging'e al ────────────────────────────────
client = mlflow.MlflowClient()
latest = client.get_latest_versions("SentimentClassifier", stages=["None"])[0]
client.transition_model_version_stage(
    name="SentimentClassifier",
    version=latest.version,
    stage="Staging",
    archive_existing_versions=False,
)
print(f"Versiyon {latest.version} Staging'e alındı.")

# ── Production'a geçiş (CI/CD tetikler) ──────────────────────────
def promote_to_production(model_name: str, version: str):
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production",
        archive_existing_versions=True,  # eski production'ı arşivle
    )
    print(f"v{version} Production'a alındı.")
NOT

MLflow Model Registry production'a geçişi otomatik yapmaz; bir insan onayı veya CI gate gerektirir. mlflow.get_model_version_download_uri() ile modeli artifact store'dan çekip Triton veya FastAPI sunucuna konuşlandır.

06 CI/CD Pipeline

Model deployment'ı el ile yapmak hata üretir; GitHub Actions ile lint → test → build → push → deploy akışını tamamen otomatize et.

ML CI/CD pipeline'ı klasik yazılım pipeline'ından farklıdır: kod testleri yanında model validasyon gate'i eklemek gerekir. Yeni model belirli metrik eşiğini geçemezse deploy bloklanmalıdır.

.github/workflows/deploy.yml
name: ML Deploy Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
  REGISTRY: ghcr.io
  IMAGE_NAME: $/inference

jobs:
  lint-and-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Python kur
        uses: actions/setup-python@v5
        with:
          python-version: "3.11"
          cache: pip

      - name: Bağımlılıkları yükle
        run: pip install -r requirements.txt -r requirements-dev.txt

      - name: Ruff lint
        run: ruff check . --output-format=github

      - name: Mypy tip kontrolü
        run: mypy serve.py --ignore-missing-imports

      - name: Pytest
        run: pytest tests/ -v --tb=short

  model-validation:
    runs-on: ubuntu-latest
    needs: lint-and-test
    steps:
      - uses: actions/checkout@v4

      - name: Model indir (MLflow artifact)
        env:
          MLFLOW_TRACKING_URI: $
        run: |
          pip install mlflow
          python scripts/download_model.py --stage Production

      - name: Validasyon gate
        run: |
          python scripts/validate_model.py \
            --model-path ./model_artifacts \
            --min-accuracy 0.90 \
            --max-latency-ms 200
        # Bu adım başarısız olursa pipeline durur

  build-and-push:
    runs-on: ubuntu-latest
    needs: model-validation
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    permissions:
      contents: read
      packages: write
    steps:
      - uses: actions/checkout@v4

      - name: Docker login (GHCR)
        uses: docker/login-action@v3
        with:
          registry: $
          username: $
          password: $

      - name: Docker meta
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: $/$
          tags: |
            type=sha,prefix=sha-
            type=raw,value=latest

      - name: Build ve push
        uses: docker/build-push-action@v5
        with:
          context: .
          push: true
          tags: $
          cache-from: type=gha
          cache-to: type=gha,mode=max

  deploy-canary:
    runs-on: ubuntu-latest
    needs: build-and-push
    environment: production
    steps:
      - name: Canary deploy (%5 trafik)
        run: |
          echo "Canary deploy: $"
          # kubectl set image deploy/inference inference=$IMAGE_TAG
          # kubectl scale deploy/inference-canary --replicas=1

Validasyon Script

scripts/validate_model.py
import argparse, sys, time
import numpy as np
from pathlib import Path

def validate(model_path: str, min_accuracy: float, max_latency_ms: float):
    import onnxruntime as ort
    from transformers import AutoTokenizer

    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
    sess = ort.InferenceSession(f"{model_path}/model.onnx")

    # Smoke test verisi
    test_cases = [
        ("I love this product!", "POSITIVE"),
        ("This is terrible.",   "NEGATIVE"),
        ("Amazing quality!",     "POSITIVE"),
        ("Waste of money.",      "NEGATIVE"),
    ]

    correct, latencies = 0, []
    for text, expected in test_cases:
        enc = tokenizer(text, return_tensors="np", max_length=128, truncation=True)
        t0 = time.perf_counter()
        logits = sess.run(["logits"], {
            "input_ids": enc["input_ids"].astype(np.int64),
            "attention_mask": enc["attention_mask"].astype(np.int64),
        })[0]
        latency = (time.perf_counter() - t0) * 1000
        latencies.append(latency)
        pred = ["NEGATIVE", "POSITIVE"][np.argmax(logits[0])]
        if pred == expected:
            correct += 1

    accuracy = correct / len(test_cases)
    p95_latency = np.percentile(latencies, 95)

    print(f"Accuracy: {accuracy:.2%} (min: {min_accuracy:.2%})")
    print(f"P95 Latency: {p95_latency:.1f}ms (max: {max_latency_ms}ms)")

    if accuracy < min_accuracy:
        print(f"HATA: Accuracy eşiği aşılamadı.", file=sys.stderr)
        sys.exit(1)
    if p95_latency > max_latency_ms:
        print(f"HATA: Latency eşiği aşıldı.", file=sys.stderr)
        sys.exit(1)
    print("Validasyon başarılı.")

if __name__ == "__main__":
    p = argparse.ArgumentParser()
    p.add_argument("--model-path", required=True)
    p.add_argument("--min-accuracy", type=float, default=0.90)
    p.add_argument("--max-latency-ms", type=float, default=200.0)
    args = p.parse_args()
    validate(args.model_path, args.min_accuracy, args.max_latency_ms)

07 Canary Deployment

Yeni modeli tüm trafiğe açmak yerine kademeli rollout ile riskleri minimize et; sorun çıkarsa saniyeler içinde geri al.

Canary deployment, yeni versiyonu önce küçük bir trafik dilimine (%5) göndererek hata oranı ve latency'yi izleme stratejisidir. Metrikler sağlıklıysa trafik artırılır (%25 → %50 → %100). Sorun çıkarsa yalnızca canary rollback yapılır; tüm kullanıcılar etkilenmez.

Nginx ile Traffic Splitting

nginx.conf
upstream inference_stable {
    server inference-stable:8000 weight=95;
}

upstream inference_canary {
    server inference-canary:8000 weight=5;
}

# Split map: 5% canary, 95% stable
split_clients "${request_id}" $backend {
    5%   inference_canary;
    *    inference_stable;
}

server {
    listen 80;

    location /predict {
        proxy_pass http://$backend;
        proxy_set_header X-Deployment-Version $backend;
        proxy_set_header Host $host;
        proxy_connect_timeout 5s;
        proxy_read_timeout    30s;
    }

    location /health {
        proxy_pass http://inference_stable;
    }
}

Kademeli Rollout Script

rollout.sh
#!/bin/bash
# Canary kademeli rollout: 5% → 25% → 50% → 100%

CANARY_IMAGE="ghcr.io/org/inference:sha-abc123"
STABLE_IMAGE="ghcr.io/org/inference:sha-def456"
ERROR_THRESHOLD="0.02"   # %2 hata oranı eşiği
LATENCY_P95_THRESHOLD="2000"  # 2000ms

check_metrics() {
    local weight=$1
    echo "Canary ağırlığı: ${weight}% — metrikler izleniyor (5 dakika)..."
    sleep 300

    # Prometheus'tan metrik sorgula
    ERROR_RATE=$(curl -s "http://prometheus:9090/api/v1/query?query=\
rate(inference_requests_total{status='error',version='canary'}[5m])/\
rate(inference_requests_total{version='canary'}[5m])" \
        | jq -r '.data.result[0].value[1] // "0"')

    if (( $(echo "$ERROR_RATE > $ERROR_THRESHOLD" | bc -l) )); then
        echo "ROLLBACK: Hata oranı yüksek ($ERROR_RATE > $ERROR_THRESHOLD)"
        return 1
    fi
    return 0
}

rollout() {
    for weight in 5 25 50 100; do
        echo "Canary trafik: ${weight}%"
        # docker service update ile ağırlık güncelle (Swarm)
        # kubectl ile Kubernetes'te replicas ayarla
        docker service update \
            --replicas-max-per-node 1 \
            --label-add "canary.weight=${weight}" \
            inference_canary

        if ! check_metrics $weight; then
            echo "Rollback yapılıyor..."
            docker service scale inference_canary=0
            exit 1
        fi
        echo "${weight}% geçti, devam ediliyor."
    done
    echo "Tam rollout tamamlandı!"
}

rollout
DİKKAT

Canary deployment sırasında iki model versiyonu aynı anda çalışır. Kullanıcı oturumlarının tutarlı bir versiyona yönlendirilmesi için sticky session (kullanıcı ID hash'ine göre) kullan; aksi halde aynı kullanıcı farklı model davranışlarıyla karşılaşır.

08 A/B Testing

İki model versiyonundan hangisinin gerçekten daha iyi olduğunu istatistiksel olarak kanıtla; varsayıma değil veriye dayan.

Canary deployment ile A/B test kavramsal olarak benzerdir ama amaçları farklıdır: canary risk yönetimini hedefler, A/B test ise iki versiyonun belirli bir metrik üzerindeki etkisini karşılaştırır. A/B testte kullanıcılar deterministik olarak gruplara atanır; istatistiksel anlamlılık hesaplanır.

Kullanıcı Atama Mantığı

ab_assignment.py
import hashlib
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
import json
from pathlib import Path

class Variant(Enum):
    CONTROL   = "A"  # mevcut model
    TREATMENT = "B"  # yeni model

@dataclass
class ABEvent:
    user_id: str
    variant: str
    metric_name: str
    metric_value: float
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())

def assign_variant(
    user_id: str,
    experiment_name: str,
    control_ratio: float = 0.5,
) -> Variant:
    """Kullanıcıyı deterministik olarak gruba ata (hash tabanlı)."""
    key = f"{experiment_name}:{user_id}"
    digest = hashlib.md5(key.encode()).hexdigest()
    # Hash'in ilk 8 hex karakterini [0, 1) aralığına dönüştür
    bucket = int(digest[:8], 16) / 0xFFFFFFFF
    return Variant.CONTROL if bucket < control_ratio else Variant.TREATMENT

class ABExperiment:
    def __init__(self, name: str, log_path: str = "ab_events.jsonl"):
        self.name     = name
        self.log_path = Path(log_path)
        self._events: list[ABEvent] = []

    def get_variant(self, user_id: str) -> Variant:
        return assign_variant(user_id, self.name)

    def log_metric(self, user_id: str, metric_name: str, value: float):
        variant = self.get_variant(user_id)
        event = ABEvent(
            user_id=user_id,
            variant=variant.value,
            metric_name=metric_name,
            metric_value=value,
        )
        self._events.append(event)
        with self.log_path.open("a") as f:
            f.write(json.dumps(event.__dict__) + "\n")
        return event

İstatistiksel Anlamlılık Testi

ab_analysis.py
import numpy as np
from scipy import stats
from dataclasses import dataclass
import json
from pathlib import Path

@dataclass
class ABResult:
    n_control: int
    n_treatment: int
    mean_control: float
    mean_treatment: float
    relative_lift: float
    p_value: float
    is_significant: bool
    confidence_interval: tuple[float, float]

def analyze_experiment(
    log_path: str,
    metric_name: str,
    alpha: float = 0.05,
) -> ABResult:
    """JSONL log dosyasından A/B test sonucunu analiz et."""
    control, treatment = [], []

    with Path(log_path).open() as f:
        for line in f:
            event = json.loads(line)
            if event["metric_name"] != metric_name:
                continue
            if event["variant"] == "A":
                control.append(event["metric_value"])
            else:
                treatment.append(event["metric_value"])

    ctrl  = np.array(control)
    treat = np.array(treatment)

    # Welch's t-test (eşit varyans varsayımı yok)
    t_stat, p_value = stats.ttest_ind(treat, ctrl, equal_var=False)

    # %95 güven aralığı (treatment - control fark)
    diff = treat.mean() - ctrl.mean()
    se   = np.sqrt(treat.var(ddof=1)/len(treat) + ctrl.var(ddof=1)/len(ctrl))
    ci   = stats.norm.interval(0.95, loc=diff, scale=se)

    lift = (treat.mean() - ctrl.mean()) / ctrl.mean() * 100

    return ABResult(
        n_control=len(ctrl),
        n_treatment=len(treat),
        mean_control=round(float(ctrl.mean()), 4),
        mean_treatment=round(float(treat.mean()), 4),
        relative_lift=round(lift, 2),
        p_value=round(float(p_value), 4),
        is_significant=p_value < alpha,
        confidence_interval=(round(ci[0], 4), round(ci[1], 4)),
    )

def minimum_sample_size(
    baseline_rate: float,
    mde: float,         # Minimum Detectable Effect
    alpha: float = 0.05,
    power: float = 0.80,
) -> int:
    """Her grup için minimum örneklem boyutunu hesapla."""
    z_alpha = stats.norm.ppf(1 - alpha / 2)
    z_beta  = stats.norm.ppf(power)
    p1 = baseline_rate
    p2 = baseline_rate * (1 + mde)
    p_bar = (p1 + p2) / 2
    n = ((z_alpha * np.sqrt(2 * p_bar * (1 - p_bar))
          + z_beta * np.sqrt(p1*(1-p1) + p2*(1-p2))) / (p2 - p1)) ** 2
    return int(np.ceil(n))

# Kullanım örneği
if __name__ == "__main__":
    # Minimum örneklem: baseline %20 CTR, %10 iyileşme beklentisi
    n = minimum_sample_size(baseline_rate=0.20, mde=0.10)
    print(f"Her grup için minimum örneklem: {n} kullanıcı")

    # Test (gerçek logdan)
    # result = analyze_experiment("ab_events.jsonl", "satisfaction_score")
    # print(result)
NOT

A/B testi istatistiksel anlamlılığa ulaşmadan sonlandırma peeking problemidir ve yanlış pozitif üretir. Minimum örneklem boyutuna ulaşmadan sonucu yorumlama. Ayrıca Bonferroni düzeltmesi: birden fazla metrik test ediyorsan alpha / n_metrics kullan.

MLOps Araçlarını Birbirine Bağlamak

01 MLflow        → yeni model eğit, metrikleri kaydet
02 CI/CD Gate    → accuracy & latency eşiği geç
03 Docker Build  → image oluştur, registry'e push
04 Canary        → %5 trafikle canlıya al
05 A/B Test      → istatistiksel anlamlılık için n gün bekle
06 Full Rollout  → kazanan modeli %100'e çıkar
07 OTel + Grafana → canlıda izle, drift alarm kur