Model Serving Altyapısı
Rehber Model Serving 05 · Deployment

A/B Test & Canary
LLM'de Güvenli Yayın.

Blue/green, canary ve shadow deployment stratejileri. İstatistiksel anlamlılık, LLM'e özgü metrikler ve FastAPI + NGINX ile production-grade canary kurulumu.

00 Model Deployment Stratejileri

Yeni bir model versiyonunu production'a almak, tam geçiş yapılmadan önce riskin kontrollü biçimde test edilmesini gerektirir.

Her deployment stratejisinin farklı risk/hız tradeoff'u vardır:

StratejiNasıl ÇalışırRiskİdeal Senaryo
RecreateEski kapat, yeni aç — downtime varÇok yüksekDev/test ortamı
Rolling UpdatePodları birer birer değiştirOrtaStateless servisler
Blue/Greenİki ortam; switch anındaDüşük — hızlı rollbackKritik servisler
CanaryKüçük yüzde trafiği yeni versiyonaÇok düşükYüksek riskli model değişikliği
ShadowYeni model paralel çalışır, response kullanılmazSıfırTahmin kalitesi ön validasyon
ML İÇİN CANARY ÖNEMİ

Yazılım deploymentlarında bug'lar determinisktir — test edilince bulunur. ML modellerinde ise production drift, distribution shift veya edge case failure'lar yalnızca canlı trafik üzerinde ortaya çıkabilir. Bu yüzden canary deployment ML servisleri için yazılım deploymentından çok daha kritiktir.

01 Traffic Splitting — Ağırlıklı Yönlendirme

NGINX veya Envoy ile gelen trafiğin belirli bir yüzdesi yeni model versiyonuna yönlendirilir.

Traffic splitting için iki temel yaklaşım vardır: (1) Stochastic splitting — her istek bağımsız olarak olasılıkla yönlendirilir; istatistiksel olarak doğru ama aynı kullanıcı farklı versiyonları görebilir. (2) User-based splitting — user_id hash'ine göre kullanıcı kalıcı olarak bir versiyona atanır; tutarlı deneyim sağlar, A/B analizi temizdir.

nginx_canary.conf
upstream model_v1 {
    server model-v1:8000 weight=90;   # %90 traffic → stable
}

upstream model_v2 {
    server model-v2:8000 weight=10;   # %10 traffic → canary
}

# Ağırlıklı upstream split
upstream model_backend {
    server model-v1:8000 weight=90;
    server model-v2:8000 weight=10;
}

server {
    listen 80;

    location /v1/inference {
        proxy_pass http://model_backend;
        proxy_set_header X-Request-ID $request_id;
        proxy_set_header Host $host;

        # Hangi backend'e gittiğini header'a ekle
        add_header X-Model-Version $upstream_addr always;
    }
}
user_hash_splitter.py
import hashlib

def get_model_version(user_id: str, canary_pct: float = 0.1) -> str:
    """
    Kullanıcıyı deterministic olarak bir model versiyonuna ata.
    Aynı user_id her zaman aynı versiyonu görür.
    canary_pct: 0.0–1.0 arasında canary yüzdesi
    """
    h     = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    bucket = (h % 100) / 100.0   # 0.00 – 0.99
    return "v2-canary" if bucket < canary_pct else "v1-stable"

# Test: 1000 kullanıcı
users   = [f"user_{i}" for i in range(1000)]
canary  = sum(1 for u in users if get_model_version(u) == "v2-canary")
print(f"Canary yüzdesi: {canary/1000:.1%}")  # ~%10

02 İstatistiksel Anlamlılık

A/B test sonuçlarının güvenilir olabilmesi için istatistiksel anlamlılık, güç analizi ve yeterli örnek büyüklüğü gerekir.

A/B test değerlendirmesinin temel kavramları:

p-valueH₀ (fark yok) varsayımı altında gözlemlenen etkinin olasılığı. p < 0.05 genellikle anlamlı kabul edilir; ML servislerde p < 0.01 daha güvenli.
MDE (Minimum Detectable Effect)Tespit etmek istediğimiz en küçük fark. %0.5'lik iyileşme tespit için %5'lik iyileşmeden çok daha fazla veri gerekir.
Güç (Power)Gerçek bir etkiyi tespit etme olasılığı. Genellikle 0.80 (yani %20 false negative riski) hedeflenir.
Örnek büyüklüğüMDE, α (significance level) ve β (power) verildiğinde hesaplanır. statsmodels ile otomatik hesap mümkün.
ab_stats.py
from scipy import stats
import numpy as np
from statsmodels.stats.power import TTestIndPower

def required_sample_size(mde=0.02, baseline=0.5, alpha=0.05, power=0.80):
    """
    A/B test için gerekli örnek büyüklüğü.
    mde     : minimum tespit edilebilir etki (oran farkı)
    baseline: kontrol grubu dönüşüm oranı
    """
    effect_size = mde / np.sqrt(baseline * (1 - baseline))
    analysis    = TTestIndPower()
    n = analysis.solve_power(
        effect_size=effect_size, alpha=alpha, power=power, ratio=1.0)
    return int(np.ceil(n))

def run_ab_test(control_conversions, control_n,
               treatment_conversions, treatment_n, alpha=0.05):
    """İki oranlı z-test ile A/B test anlamlılığı."""
    p_c = control_conversions / control_n
    p_t = treatment_conversions / treatment_n
    p_pool = (control_conversions + treatment_conversions) / (control_n + treatment_n)
    se    = np.sqrt(p_pool * (1 - p_pool) * (1/control_n + 1/treatment_n))
    z     = (p_t - p_c) / se
    p_val = 2 * (1 - stats.norm.cdf(abs(z)))
    uplift = (p_t - p_c) / p_c * 100

    print(f"Control  : {p_c:.4f} ({control_n} örnek)")
    print(f"Treatment: {p_t:.4f} ({treatment_n} örnek)")
    print(f"Uplift   : {uplift:+.2f}%")
    print(f"z-score  : {z:.3f}   p-value: {p_val:.4f}")
    print(f"Anlamlı  : {'EVET' if p_val < alpha else 'HAYIR'} (α={alpha})")
    return p_val < alpha

# Gerekli örnek büyüklüğü
n = required_sample_size(mde=0.02, baseline=0.30)
print(f"Her grup için gerekli n: {n}")

# Simüle edilmiş sonuç analizi
run_ab_test(
    control_conversions=300, control_n=1000,
    treatment_conversions=330, treatment_n=1000)

03 LLM-Specific Metrikler

LLM deployment'larında klasik web metriklerinin yanı sıra dil modeline özgü performans göstergeleri izlenmelidir.

MetrikTanımHedef
TTFT (Time to First Token)İstek gönderiminden ilk token çıkışına kadar geçen süre< 500ms (interactive)
TPS (Tokens/sec)Saniyede üretilen token sayısı — generation hızı> 30 t/s (kullanılabilir UX)
E2E Latency P9999. yüzdelik uçtan uca yanıt süresiSLA'ya göre, genellikle < 5s
Token/s throughputSistem genelinde saniyede toplam token üretimiMaliyet optimizasyonu için maksimize
Error rateTimeout, OOM, model hatası oranı< 0.1%
Output qualityROUGE, BERTScore veya LLM-as-judge skoruYeni model eskiden düşük olmamalı
llm_metrics_collector.py
import time, statistics
from dataclasses import dataclass, field
from typing import List
from collections import deque

@dataclass
class LLMRequestMetrics:
    ttft_ms: float         # time to first token
    tps: float             # tokens per second
    e2e_ms: float          # end-to-end latency
    n_tokens: int          # generated token sayısı
    model_version: str     # "v1" veya "v2"
    success: bool = True

class MetricsAggregator:
    def __init__(self, window=1000):
        self.window   = window
        self.by_model = {}   # version → deque

    def record(self, m: LLMRequestMetrics):
        v = m.model_version
        if v not in self.by_model:
            self.by_model[v] = deque(maxlen=self.window)
        self.by_model[v].append(m)

    def summary(self, version: str):
        data = list(self.by_model.get(version, []))
        if not data: return {}
        ttfts = [m.ttft_ms for m in data if m.success]
        tpss  = [m.tps      for m in data if m.success]
        e2es  = [m.e2e_ms   for m in data if m.success]
        errs  = sum(1 for m in data if not m.success)
        return {
            "n"          : len(data),
            "error_rate" : errs / len(data),
            "ttft_p50"   : statistics.median(ttfts) if ttfts else None,
            "ttft_p99"   : sorted(ttfts)[int(len(ttfts)*0.99)-1] if ttfts else None,
            "tps_mean"   : statistics.mean(tpss) if tpss else None,
            "e2e_p99"    : sorted(e2es)[int(len(e2es)*0.99)-1] if e2es else None,
        }

04 Shadow Mode

Shadow mode, yeni modeli gerçek trafiğe paralel olarak çalıştırır ama kullanıcıya yeni modelin yanıtını göndermez.

Shadow mode, zero-risk validation için idealdir: tüm production trafiği yeni modele iletilir, yanıtlar kaydedilir ve analiz edilir, ancak kullanıcı yalnızca eski modelden yanıt alır. Bu sayede yeni modelin kalitesi, latency'si ve hata oranı production koşullarında ölçülür.

shadow_mode_proxy.py
import asyncio, httpx, json, time
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

MODEL_V1  = "http://model-v1:8000"  # production model
MODEL_V2  = "http://model-v2:8000"  # shadow model
shadow_log: list = []


async def call_shadow(client, path, payload):
    """Shadow modele fire-and-forget çağrı; yanıt loglanır."""
    t0 = time.perf_counter()
    try:
        resp = await client.post(MODEL_V2 + path, json=payload, timeout=30.0)
        shadow_log.append({
            "status" : resp.status_code,
            "latency": (time.perf_counter() - t0) * 1000,
            "body"   : resp.json() if resp.status_code == 200 else None,
        })
    except Exception as e:
        shadow_log.append({"status": "error", "error": str(e)})


@app.post("/{path:path}")
async def proxy(path: str, request: Request):
    payload = await request.json()

    async with httpx.AsyncClient() as client:
        # 1. Production model — kullanıcıya dönen yanıt
        prod_resp = await client.post(
            MODEL_V1 + "/" + path, json=payload, timeout=30.0)

        # 2. Shadow model — fire-and-forget, yanıt kullanılmaz
        asyncio.create_task(call_shadow(client, "/" + path, payload))

    return JSONResponse(content=prod_resp.json(), status_code=prod_resp.status_code)


@app.get("/shadow/stats")
def shadow_stats():
    if not shadow_log: return {"count": 0}
    ok      = [l for l in shadow_log if l.get("status") == 200]
    lats    = [l["latency"] for l in ok]
    return {
        "total"      : len(shadow_log),
        "success_pct": len(ok) / len(shadow_log) * 100,
        "lat_p50_ms" : sorted(lats)[len(lats)//2] if lats else None,
        "lat_p99_ms" : sorted(lats)[int(len(lats)*0.99)] if lats else None,
    }

05 Feature Flags — Model Gating

Feature flag'ler, model değişikliklerini code deployment'tan bağımsız olarak kontrol eder; gerçek zamanlı açma/kapama ve gradual rollout imkânı sunar.

Feature flag'ler (feature toggle) ile model switching, deployment pipeline'ını değiştirmek yerine bir API çağrısı veya config güncellemesiyle yapılır. Örneğin LaunchDarkly ve Unleash açık kaynaklı alternatifleri ile tüm kullanıcılar için anında açma/kapama, belirli kullanıcı segmentleri için targeting ve percentage rollout mümkündür.

feature_flag_router.py
import redis, json, hashlib
from fastapi import FastAPI, Header
from typing import Optional

app = FastAPI()
rds = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_flag(flag_name: str) -> dict:
    """Redis'ten flag konfigürasyonunu oku."""
    raw = rds.get(f"flag:{flag_name}")
    if not raw: return {"enabled": False, "pct": 0}
    return json.loads(raw)

def user_in_rollout(user_id: str, flag_name: str) -> bool:
    """Kullanıcı canary rollout yüzdesine dahil mi?"""
    flag = get_flag(flag_name)
    if not flag["enabled"]: return False
    seed    = f"{user_id}:{flag_name}"
    bucket  = (int(hashlib.md5(seed.encode()).hexdigest(), 16) % 100)
    return bucket < flag.get("pct", 0)

@app.post("/predict")
async def predict(payload: dict, x_user_id: Optional[str] = Header(None)):
    use_v2 = user_id and user_in_rollout(x_user_id, "model_v2_rollout")
    version = "v2" if use_v2 else "v1"
    # model_registry[version].predict(payload)
    return {"version": version, "result": "..."}

# Flag ayarlama: %5 canary başlat
# rds.set("flag:model_v2_rollout", json.dumps({"enabled": True, "pct": 5}))

06 Rollback Stratejisi

Otomatik rollback, model degradation'ını insan müdahalesi olmadan tespit edip önceki versiyona döner.

Rollback tetikleyicileri: error rate ani artışı, latency P99 eşik aşımı, model output kalite skorunun düşmesi (LLM-as-judge), iş metriği regresyonu (dönüşüm oranı, tıklama oranı).

auto_rollback.py
import time, statistics, redis, json
from collections import deque

class AutoRollbackGuard:
    """
    Canary model için otomatik rollback guard.
    Metrikler eşiği aşarsa traffic splitting geri alınır.
    """
    def __init__(self, rds_client,
                 error_threshold=0.02,
                 p99_threshold_ms=3000,
                 window=200):
        self.rds             = rds_client
        self.error_threshold = error_threshold
        self.p99_threshold   = p99_threshold_ms
        self.errors          = deque(maxlen=window)
        self.latencies       = deque(maxlen=window)
        self.rolled_back     = False

    def record(self, success: bool, latency_ms: float):
        self.errors.append(0 if success else 1)
        self.latencies.append(latency_ms)
        self._check()

    def _check(self):
        if self.rolled_back or len(self.errors) < 50: return

        error_rate = sum(self.errors) / len(self.errors)
        lats_sorted = sorted(self.latencies)
        p99 = lats_sorted[int(len(lats_sorted) * 0.99)]

        if error_rate > self.error_threshold:
            self._rollback(f"Error rate {error_rate:.2%} > {self.error_threshold:.2%}")
        elif p99 > self.p99_threshold:
            self._rollback(f"P99 latency {p99:.0f}ms > {self.p99_threshold}ms")

    def _rollback(self, reason: str):
        self.rolled_back = True
        # NGINX ağırlığını sıfırla
        self.rds.set("canary:weight", "0")
        self.rds.set("canary:status", "rolled_back")
        event = {"ts": time.time(), "reason": reason}
        self.rds.lpush("rollback:log", json.dumps(event))
        print(f"ROLLBACK: {reason}")

07 Online A/B Test için Logging ve Analiz

A/B test'in geçerliliği, hangi kullanıcının hangi versiyonu gördüğünü ve sonuçlarını eksiksiz kayıt altına almaya bağlıdır.

İyi bir A/B test logging'i şunları içermelidir: request_id (istek), user_id (kullanıcı), model_version (hangi model), timestamp, response latency, output quality score, downstream conversion event (kullanıcı aksiyonu).

ab_logger.py
import json, time, uuid
from dataclasses import dataclass, asdict
from typing import Optional
import pandas as pd

@dataclass
class ABEvent:
    request_id    : str
    user_id       : str
    model_version : str
    latency_ms    : float
    output_tokens : int
    quality_score : Optional[float] = None  # LLM-as-judge skoru
    converted     : Optional[bool]  = None  # downstream conversion
    ts            : float = 0.0
    def __post_init__(self):
        if not self.ts: self.ts = time.time()

class ABAnalyzer:
    def __init__(self):
        self.events = []

    def log(self, event: ABEvent):
        self.events.append(asdict(event))

    def report(self):
        if not self.events: return
        df = pd.DataFrame(self.events)
        summary = df.groupby("model_version").agg({
            "latency_ms"    : ["mean", lambda x: x.quantile(0.99)],
            "output_tokens" : "mean",
            "quality_score" : "mean",
            "converted"     : "mean",
            "request_id"    : "count"
        })
        summary.columns = ["lat_mean","lat_p99","tokens_mean",
                           "quality","conv_rate","n"]
        print(summary.to_string())
        return summary

# Simüle edilmiş kullanım
analyzer = ABAnalyzer()
import random, numpy as np
rng = np.random.default_rng(0)

for i in range(500):
    ver = "v2" if i % 5 == 0 else "v1"
    ev  = ABEvent(
        request_id    = str(uuid.uuid4()),
        user_id       = f"u{i%100}",
        model_version = ver,
        latency_ms    = rng.normal(400 if ver=="v1" else 380, 80),
        output_tokens = int(rng.normal(120, 30)),
        quality_score = float(rng.uniform(0.7, 0.95)),
        converted     = bool(rng.random() < (0.12 if ver=="v1" else 0.15))
    )
    analyzer.log(ev)

analyzer.report()

08 Pratik: FastAPI + NGINX ile Canary Deployment

İki FastAPI model servisini Docker Compose'da başlatıp NGINX ile ağırlıklı traffic splitting yapılandırması.

docker-compose.yml
version: "3.9"
services:

  model-v1:
    image: python:3.11-slim
    volumes: ["./app:/app"]
    working_dir: /app
    command: uvicorn model_v1:app --host 0.0.0.0 --port 8000
    environment:
      MODEL_VERSION: v1
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 10s
      retries: 3

  model-v2:
    image: python:3.11-slim
    volumes: ["./app:/app"]
    working_dir: /app
    command: uvicorn model_v2:app --host 0.0.0.0 --port 8000
    environment:
      MODEL_VERSION: v2

  nginx:
    image: nginx:alpine
    ports: ["80:80"]
    volumes: ["./nginx.conf:/etc/nginx/nginx.conf:ro"]
    depends_on: [model-v1, model-v2]

  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
app/model_v1.py (ve model_v2.py)
import os, time
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()
VERSION = os.getenv("MODEL_VERSION", "v1")

class InferenceRequest(BaseModel):
    text: str
    user_id: str = "anon"

@app.get("/health")
def health(): return {"status": "ok", "version": VERSION}

@app.post("/predict")
def predict(req: InferenceRequest):
    t0 = time.perf_counter()
    # Gerçek uygulamada: model.generate(req.text)
    result = f"[{VERSION}] '{req.text}' için yanıt"
    lat    = (time.perf_counter() - t0) * 1000
    return {
        "version" : VERSION,
        "result"  : result,
        "latency" : round(lat, 2)
    }
nginx.conf — canary split (%10 v2)
events { worker_connections 1024; }

http {
    upstream backend {
        server model-v1:8000 weight=90;   # %90 → stable
        server model-v2:8000 weight=10;   # %10 → canary
    }

    log_format ab_log '$remote_addr "$request" $status '
                      '$upstream_addr $request_time';

    server {
        listen 80;
        access_log /var/log/nginx/ab.log ab_log;

        location /predict {
            proxy_pass         http://backend;
            proxy_set_header   X-Real-IP $remote_addr;
            add_header         X-Served-By $upstream_addr always;
        }

        location /health {
            proxy_pass http://backend;
        }
    }
}
terminal — test ve oran doğrulama
# Servisleri başlat
docker-compose up -d

# 100 istek gönder ve hangi versiyondan kaç tane geldiğini say
for i in $(seq 1 100); do
  curl -s -X POST http://localhost/predict \
    -H "Content-Type: application/json" \
    -d '{"text":"test","user_id":"u1"}' | grep -o '"version":"v[12]"'
done | sort | uniq -c

# Beklenen çıktı: ~90 adet v1, ~10 adet v2

# NGINX log'dan hangi backend'in çağrıldığını gör
docker exec nginx tail -20 /var/log/nginx/ab.log
BEKLENEN SONUÇ

100 istekte yaklaşık 90'ı model-v1'den, 10'u model-v2'den yanıt alır. NGINX weight düzenlenerek canary yüzdesi artırılır: 10 → 25 → 50 → 100 şeklinde kademeli rollout yapılır. Her aşama en az birkaç saat izlenir; metrikler stabil kalırsa bir sonraki aşamaya geçilir.