00 Model Deployment Stratejileri
Yeni bir model versiyonunu production'a almak, tam geçiş yapılmadan önce riskin kontrollü biçimde test edilmesini gerektirir.
Her deployment stratejisinin farklı risk/hız tradeoff'u vardır:
Yazılım deploymentlarında bug'lar determinisktir — test edilince bulunur. ML modellerinde ise production drift, distribution shift veya edge case failure'lar yalnızca canlı trafik üzerinde ortaya çıkabilir. Bu yüzden canary deployment ML servisleri için yazılım deploymentından çok daha kritiktir.
01 Traffic Splitting — Ağırlıklı Yönlendirme
NGINX veya Envoy ile gelen trafiğin belirli bir yüzdesi yeni model versiyonuna yönlendirilir.
Traffic splitting için iki temel yaklaşım vardır: (1) Stochastic splitting — her istek bağımsız olarak olasılıkla yönlendirilir; istatistiksel olarak doğru ama aynı kullanıcı farklı versiyonları görebilir. (2) User-based splitting — user_id hash'ine göre kullanıcı kalıcı olarak bir versiyona atanır; tutarlı deneyim sağlar, A/B analizi temizdir.
upstream model_v1 {
server model-v1:8000 weight=90; # %90 traffic → stable
}
upstream model_v2 {
server model-v2:8000 weight=10; # %10 traffic → canary
}
# Ağırlıklı upstream split
upstream model_backend {
server model-v1:8000 weight=90;
server model-v2:8000 weight=10;
}
server {
listen 80;
location /v1/inference {
proxy_pass http://model_backend;
proxy_set_header X-Request-ID $request_id;
proxy_set_header Host $host;
# Hangi backend'e gittiğini header'a ekle
add_header X-Model-Version $upstream_addr always;
}
}
import hashlib
def get_model_version(user_id: str, canary_pct: float = 0.1) -> str:
"""
Kullanıcıyı deterministic olarak bir model versiyonuna ata.
Aynı user_id her zaman aynı versiyonu görür.
canary_pct: 0.0–1.0 arasında canary yüzdesi
"""
h = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
bucket = (h % 100) / 100.0 # 0.00 – 0.99
return "v2-canary" if bucket < canary_pct else "v1-stable"
# Test: 1000 kullanıcı
users = [f"user_{i}" for i in range(1000)]
canary = sum(1 for u in users if get_model_version(u) == "v2-canary")
print(f"Canary yüzdesi: {canary/1000:.1%}") # ~%10
02 İstatistiksel Anlamlılık
A/B test sonuçlarının güvenilir olabilmesi için istatistiksel anlamlılık, güç analizi ve yeterli örnek büyüklüğü gerekir.
A/B test değerlendirmesinin temel kavramları:
from scipy import stats
import numpy as np
from statsmodels.stats.power import TTestIndPower
def required_sample_size(mde=0.02, baseline=0.5, alpha=0.05, power=0.80):
"""
A/B test için gerekli örnek büyüklüğü.
mde : minimum tespit edilebilir etki (oran farkı)
baseline: kontrol grubu dönüşüm oranı
"""
effect_size = mde / np.sqrt(baseline * (1 - baseline))
analysis = TTestIndPower()
n = analysis.solve_power(
effect_size=effect_size, alpha=alpha, power=power, ratio=1.0)
return int(np.ceil(n))
def run_ab_test(control_conversions, control_n,
treatment_conversions, treatment_n, alpha=0.05):
"""İki oranlı z-test ile A/B test anlamlılığı."""
p_c = control_conversions / control_n
p_t = treatment_conversions / treatment_n
p_pool = (control_conversions + treatment_conversions) / (control_n + treatment_n)
se = np.sqrt(p_pool * (1 - p_pool) * (1/control_n + 1/treatment_n))
z = (p_t - p_c) / se
p_val = 2 * (1 - stats.norm.cdf(abs(z)))
uplift = (p_t - p_c) / p_c * 100
print(f"Control : {p_c:.4f} ({control_n} örnek)")
print(f"Treatment: {p_t:.4f} ({treatment_n} örnek)")
print(f"Uplift : {uplift:+.2f}%")
print(f"z-score : {z:.3f} p-value: {p_val:.4f}")
print(f"Anlamlı : {'EVET' if p_val < alpha else 'HAYIR'} (α={alpha})")
return p_val < alpha
# Gerekli örnek büyüklüğü
n = required_sample_size(mde=0.02, baseline=0.30)
print(f"Her grup için gerekli n: {n}")
# Simüle edilmiş sonuç analizi
run_ab_test(
control_conversions=300, control_n=1000,
treatment_conversions=330, treatment_n=1000)
03 LLM-Specific Metrikler
LLM deployment'larında klasik web metriklerinin yanı sıra dil modeline özgü performans göstergeleri izlenmelidir.
import time, statistics
from dataclasses import dataclass, field
from typing import List
from collections import deque
@dataclass
class LLMRequestMetrics:
ttft_ms: float # time to first token
tps: float # tokens per second
e2e_ms: float # end-to-end latency
n_tokens: int # generated token sayısı
model_version: str # "v1" veya "v2"
success: bool = True
class MetricsAggregator:
def __init__(self, window=1000):
self.window = window
self.by_model = {} # version → deque
def record(self, m: LLMRequestMetrics):
v = m.model_version
if v not in self.by_model:
self.by_model[v] = deque(maxlen=self.window)
self.by_model[v].append(m)
def summary(self, version: str):
data = list(self.by_model.get(version, []))
if not data: return {}
ttfts = [m.ttft_ms for m in data if m.success]
tpss = [m.tps for m in data if m.success]
e2es = [m.e2e_ms for m in data if m.success]
errs = sum(1 for m in data if not m.success)
return {
"n" : len(data),
"error_rate" : errs / len(data),
"ttft_p50" : statistics.median(ttfts) if ttfts else None,
"ttft_p99" : sorted(ttfts)[int(len(ttfts)*0.99)-1] if ttfts else None,
"tps_mean" : statistics.mean(tpss) if tpss else None,
"e2e_p99" : sorted(e2es)[int(len(e2es)*0.99)-1] if e2es else None,
}
04 Shadow Mode
Shadow mode, yeni modeli gerçek trafiğe paralel olarak çalıştırır ama kullanıcıya yeni modelin yanıtını göndermez.
Shadow mode, zero-risk validation için idealdir: tüm production trafiği yeni modele iletilir, yanıtlar kaydedilir ve analiz edilir, ancak kullanıcı yalnızca eski modelden yanıt alır. Bu sayede yeni modelin kalitesi, latency'si ve hata oranı production koşullarında ölçülür.
import asyncio, httpx, json, time
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
MODEL_V1 = "http://model-v1:8000" # production model
MODEL_V2 = "http://model-v2:8000" # shadow model
shadow_log: list = []
async def call_shadow(client, path, payload):
"""Shadow modele fire-and-forget çağrı; yanıt loglanır."""
t0 = time.perf_counter()
try:
resp = await client.post(MODEL_V2 + path, json=payload, timeout=30.0)
shadow_log.append({
"status" : resp.status_code,
"latency": (time.perf_counter() - t0) * 1000,
"body" : resp.json() if resp.status_code == 200 else None,
})
except Exception as e:
shadow_log.append({"status": "error", "error": str(e)})
@app.post("/{path:path}")
async def proxy(path: str, request: Request):
payload = await request.json()
async with httpx.AsyncClient() as client:
# 1. Production model — kullanıcıya dönen yanıt
prod_resp = await client.post(
MODEL_V1 + "/" + path, json=payload, timeout=30.0)
# 2. Shadow model — fire-and-forget, yanıt kullanılmaz
asyncio.create_task(call_shadow(client, "/" + path, payload))
return JSONResponse(content=prod_resp.json(), status_code=prod_resp.status_code)
@app.get("/shadow/stats")
def shadow_stats():
if not shadow_log: return {"count": 0}
ok = [l for l in shadow_log if l.get("status") == 200]
lats = [l["latency"] for l in ok]
return {
"total" : len(shadow_log),
"success_pct": len(ok) / len(shadow_log) * 100,
"lat_p50_ms" : sorted(lats)[len(lats)//2] if lats else None,
"lat_p99_ms" : sorted(lats)[int(len(lats)*0.99)] if lats else None,
}
05 Feature Flags — Model Gating
Feature flag'ler, model değişikliklerini code deployment'tan bağımsız olarak kontrol eder; gerçek zamanlı açma/kapama ve gradual rollout imkânı sunar.
Feature flag'ler (feature toggle) ile model switching, deployment pipeline'ını değiştirmek yerine bir API çağrısı veya config güncellemesiyle yapılır. Örneğin LaunchDarkly ve Unleash açık kaynaklı alternatifleri ile tüm kullanıcılar için anında açma/kapama, belirli kullanıcı segmentleri için targeting ve percentage rollout mümkündür.
import redis, json, hashlib
from fastapi import FastAPI, Header
from typing import Optional
app = FastAPI()
rds = redis.Redis(host="localhost", port=6379, decode_responses=True)
def get_flag(flag_name: str) -> dict:
"""Redis'ten flag konfigürasyonunu oku."""
raw = rds.get(f"flag:{flag_name}")
if not raw: return {"enabled": False, "pct": 0}
return json.loads(raw)
def user_in_rollout(user_id: str, flag_name: str) -> bool:
"""Kullanıcı canary rollout yüzdesine dahil mi?"""
flag = get_flag(flag_name)
if not flag["enabled"]: return False
seed = f"{user_id}:{flag_name}"
bucket = (int(hashlib.md5(seed.encode()).hexdigest(), 16) % 100)
return bucket < flag.get("pct", 0)
@app.post("/predict")
async def predict(payload: dict, x_user_id: Optional[str] = Header(None)):
use_v2 = user_id and user_in_rollout(x_user_id, "model_v2_rollout")
version = "v2" if use_v2 else "v1"
# model_registry[version].predict(payload)
return {"version": version, "result": "..."}
# Flag ayarlama: %5 canary başlat
# rds.set("flag:model_v2_rollout", json.dumps({"enabled": True, "pct": 5}))
06 Rollback Stratejisi
Otomatik rollback, model degradation'ını insan müdahalesi olmadan tespit edip önceki versiyona döner.
Rollback tetikleyicileri: error rate ani artışı, latency P99 eşik aşımı, model output kalite skorunun düşmesi (LLM-as-judge), iş metriği regresyonu (dönüşüm oranı, tıklama oranı).
import time, statistics, redis, json
from collections import deque
class AutoRollbackGuard:
"""
Canary model için otomatik rollback guard.
Metrikler eşiği aşarsa traffic splitting geri alınır.
"""
def __init__(self, rds_client,
error_threshold=0.02,
p99_threshold_ms=3000,
window=200):
self.rds = rds_client
self.error_threshold = error_threshold
self.p99_threshold = p99_threshold_ms
self.errors = deque(maxlen=window)
self.latencies = deque(maxlen=window)
self.rolled_back = False
def record(self, success: bool, latency_ms: float):
self.errors.append(0 if success else 1)
self.latencies.append(latency_ms)
self._check()
def _check(self):
if self.rolled_back or len(self.errors) < 50: return
error_rate = sum(self.errors) / len(self.errors)
lats_sorted = sorted(self.latencies)
p99 = lats_sorted[int(len(lats_sorted) * 0.99)]
if error_rate > self.error_threshold:
self._rollback(f"Error rate {error_rate:.2%} > {self.error_threshold:.2%}")
elif p99 > self.p99_threshold:
self._rollback(f"P99 latency {p99:.0f}ms > {self.p99_threshold}ms")
def _rollback(self, reason: str):
self.rolled_back = True
# NGINX ağırlığını sıfırla
self.rds.set("canary:weight", "0")
self.rds.set("canary:status", "rolled_back")
event = {"ts": time.time(), "reason": reason}
self.rds.lpush("rollback:log", json.dumps(event))
print(f"ROLLBACK: {reason}")
07 Online A/B Test için Logging ve Analiz
A/B test'in geçerliliği, hangi kullanıcının hangi versiyonu gördüğünü ve sonuçlarını eksiksiz kayıt altına almaya bağlıdır.
İyi bir A/B test logging'i şunları içermelidir: request_id (istek), user_id (kullanıcı), model_version (hangi model), timestamp, response latency, output quality score, downstream conversion event (kullanıcı aksiyonu).
import json, time, uuid
from dataclasses import dataclass, asdict
from typing import Optional
import pandas as pd
@dataclass
class ABEvent:
request_id : str
user_id : str
model_version : str
latency_ms : float
output_tokens : int
quality_score : Optional[float] = None # LLM-as-judge skoru
converted : Optional[bool] = None # downstream conversion
ts : float = 0.0
def __post_init__(self):
if not self.ts: self.ts = time.time()
class ABAnalyzer:
def __init__(self):
self.events = []
def log(self, event: ABEvent):
self.events.append(asdict(event))
def report(self):
if not self.events: return
df = pd.DataFrame(self.events)
summary = df.groupby("model_version").agg({
"latency_ms" : ["mean", lambda x: x.quantile(0.99)],
"output_tokens" : "mean",
"quality_score" : "mean",
"converted" : "mean",
"request_id" : "count"
})
summary.columns = ["lat_mean","lat_p99","tokens_mean",
"quality","conv_rate","n"]
print(summary.to_string())
return summary
# Simüle edilmiş kullanım
analyzer = ABAnalyzer()
import random, numpy as np
rng = np.random.default_rng(0)
for i in range(500):
ver = "v2" if i % 5 == 0 else "v1"
ev = ABEvent(
request_id = str(uuid.uuid4()),
user_id = f"u{i%100}",
model_version = ver,
latency_ms = rng.normal(400 if ver=="v1" else 380, 80),
output_tokens = int(rng.normal(120, 30)),
quality_score = float(rng.uniform(0.7, 0.95)),
converted = bool(rng.random() < (0.12 if ver=="v1" else 0.15))
)
analyzer.log(ev)
analyzer.report()
08 Pratik: FastAPI + NGINX ile Canary Deployment
İki FastAPI model servisini Docker Compose'da başlatıp NGINX ile ağırlıklı traffic splitting yapılandırması.
version: "3.9"
services:
model-v1:
image: python:3.11-slim
volumes: ["./app:/app"]
working_dir: /app
command: uvicorn model_v1:app --host 0.0.0.0 --port 8000
environment:
MODEL_VERSION: v1
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
retries: 3
model-v2:
image: python:3.11-slim
volumes: ["./app:/app"]
working_dir: /app
command: uvicorn model_v2:app --host 0.0.0.0 --port 8000
environment:
MODEL_VERSION: v2
nginx:
image: nginx:alpine
ports: ["80:80"]
volumes: ["./nginx.conf:/etc/nginx/nginx.conf:ro"]
depends_on: [model-v1, model-v2]
redis:
image: redis:7-alpine
ports: ["6379:6379"]
import os, time
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
VERSION = os.getenv("MODEL_VERSION", "v1")
class InferenceRequest(BaseModel):
text: str
user_id: str = "anon"
@app.get("/health")
def health(): return {"status": "ok", "version": VERSION}
@app.post("/predict")
def predict(req: InferenceRequest):
t0 = time.perf_counter()
# Gerçek uygulamada: model.generate(req.text)
result = f"[{VERSION}] '{req.text}' için yanıt"
lat = (time.perf_counter() - t0) * 1000
return {
"version" : VERSION,
"result" : result,
"latency" : round(lat, 2)
}
events { worker_connections 1024; }
http {
upstream backend {
server model-v1:8000 weight=90; # %90 → stable
server model-v2:8000 weight=10; # %10 → canary
}
log_format ab_log '$remote_addr "$request" $status '
'$upstream_addr $request_time';
server {
listen 80;
access_log /var/log/nginx/ab.log ab_log;
location /predict {
proxy_pass http://backend;
proxy_set_header X-Real-IP $remote_addr;
add_header X-Served-By $upstream_addr always;
}
location /health {
proxy_pass http://backend;
}
}
}
# Servisleri başlat
docker-compose up -d
# 100 istek gönder ve hangi versiyondan kaç tane geldiğini say
for i in $(seq 1 100); do
curl -s -X POST http://localhost/predict \
-H "Content-Type: application/json" \
-d '{"text":"test","user_id":"u1"}' | grep -o '"version":"v[12]"'
done | sort | uniq -c
# Beklenen çıktı: ~90 adet v1, ~10 adet v2
# NGINX log'dan hangi backend'in çağrıldığını gör
docker exec nginx tail -20 /var/log/nginx/ab.log
100 istekte yaklaşık 90'ı model-v1'den, 10'u model-v2'den yanıt alır. NGINX weight düzenlenerek canary yüzdesi artırılır: 10 → 25 → 50 → 100 şeklinde kademeli rollout yapılır. Her aşama en az birkaç saat izlenir; metrikler stabil kalırsa bir sonraki aşamaya geçilir.