00 BentoML Nedir?
Herhangi bir ML framework'ünden bağımsız, üretim için model paketleme ve servis çözümü.
BentoML, ML modellerini üretim için paketleyen ve servis eden açık kaynaklı bir framework'tür. PyTorch, TensorFlow, scikit-learn, HuggingFace Transformers veya özel Python kodu fark etmeksizin aynı API ile çalışır. Amaç, notebook'tan production'a geçişi standardize etmek ve tekrarlanabilir hale getirmektir.
Geliştir Model eğit → bentoml.save_model() ile kaydet Tanımla @bentoml.service ile API endpoint yaz Paketle bentoml build → Bento (tüm bağımlılıklarla) Konteyner bentoml containerize → Docker imajı Deploy Kubernetes / BentoCloud / Lambda
| Özellik | BentoML | FastAPI+Manual | Triton |
|---|---|---|---|
| Framework desteği | Herhangi | Herhangi | Belirli backend'ler |
| Adaptive batching | Dahili | Manuel | config.pbtxt |
| Model versiyonlama | Dahili store | Manuel | Dizin yapısı |
| Docker build | 1 komut | Dockerfile yaz | NVIDIA resmi image |
| Monitoring | OpenTelemetry dahili | Manuel entegrasyon | Prometheus endpoint |
pip install bentoml
pip install bentoml[io] # io extras (pydantic, PIL, pandas)
pip install bentoml[grpc] # GRPC desteği
pip install "bentoml[transformers]" # HF entegrasyonu
01 Service Tanımı
@bentoml.service dekoratörü ile HTTP endpoint tanımla — input/output şeması ve runner entegrasyonu.
import bentoml
from bentoml.io import Text, JSON
from pydantic import BaseModel
from typing import List
# Pydantic şema
class SentimentRequest(BaseModel):
texts: List[str]
threshold: float = 0.5
class SentimentResponse(BaseModel):
labels: List[str]
scores: List[float]
# Runner: modeli izole süreçte çalıştır
sentiment_runner = bentoml.sklearn.get("sentiment-model:latest").to_runner()
# Servis tanımı
svc = bentoml.Service("sentiment-service", runners=[sentiment_runner])
@svc.api(
input=bentoml.io.JSON(pydantic_model=SentimentRequest),
output=bentoml.io.JSON(pydantic_model=SentimentResponse),
route="/predict",
)
async def predict(request: SentimentRequest) -> SentimentResponse:
"""Metinlerin sentiment analizini yap."""
# Runner'a async olarak gönder (adaptive batching devrede)
results = await sentiment_runner.predict.async_run(request.texts)
labels = ["positive" if r > request.threshold else "negative" for r in results]
return SentimentResponse(labels=labels, scores=results.tolist())
@svc.api(input=Text(), output=Text(), route="/health")
def health_check(_: str) -> str:
return "OK"
bentoml serve service:svc --reload --port 3000
# Production modu (Gunicorn workers)
bentoml serve service:svc \
--production \
--port 3000 \
--workers 4
# Test et
curl -X POST http://localhost:3000/predict \
-H "Content-Type: application/json" \
-d '{"texts": ["Bu ürün harika!", "Çok kötüydü."]}'
02 Model Store
BentoML'in yerel model deposu — save, load, versiyonlama ve metadata.
import bentoml
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
# Basit bir model eğit
X = np.random.randn(1000, 10)
y = (X[:, 0] + X[:, 1] > 0).astype(int)
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", LogisticRegression(max_iter=200)),
])
pipeline.fit(X, y)
# BentoML store'a kaydet
saved_model = bentoml.sklearn.save_model(
name="sentiment-model",
model=pipeline,
signatures={"predict": {"batchable": True, "batch_dim": 0}},
labels={"team": "mlops", "stage": "production"},
metadata={"accuracy": 0.934, "training_size": 1000},
custom_objects={"threshold": 0.5},
)
print(f"Model kaydedildi: {saved_model.tag}")
# Model kaydedildi: sentiment-model:gy5dfr2ytwpk6drp
import bentoml
# En güncel versiyonu yükle
model = bentoml.sklearn.load_model("sentiment-model:latest")
# Belirli versiyon
model_v1 = bentoml.sklearn.load_model("sentiment-model:gy5dfr2ytwpk6drp")
# Model listesi
models = bentoml.models.list()
for m in models:
print(f"{m.tag} — {m.creation_time} — {m.labels}")
# Model metadata
info = bentoml.models.get("sentiment-model:latest")
print(info.metadata)
# HuggingFace model kaydet
from transformers import pipeline as hf_pipeline
hf_model = hf_pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
saved = bentoml.transformers.save_model("distilbert-sentiment", hf_model)
print(f"HF model kaydedildi: {saved.tag}")
bentoml models list
bentoml models get sentiment-model:latest
bentoml models delete sentiment-model:old_tag
bentoml models export sentiment-model:latest ./exported.bentomodel
bentoml models import ./exported.bentomodel
03 Adaptive Batching
Gelen istekleri otomatik grupla — max_batch_size ve max_latency_ms ile throughput/latency dengesi.
Adaptive batching, birden fazla eş zamanlı isteği tek bir batch olarak modele gönderir. BentoML bunu runner düzeyinde yönetir; her async_run çağrısı bir batcher kuyruğuna düşer.
import bentoml
from bentoml.io import NumpyNdarray
import numpy as np
# Modeli runner olarak al — batching konfigürasyonu burada
runner = bentoml.sklearn.get("sentiment-model:latest").to_runner(
max_batch_size=64, # maksimum batch büyüklüğü
max_latency_ms=50, # maksimum bekleme süresi (ms)
)
svc = bentoml.Service("batching-demo", runners=[runner])
@svc.api(input=NumpyNdarray(shape=(-1, 10), dtype=np.float32),
output=NumpyNdarray(dtype=np.float32))
async def predict(features: np.ndarray) -> np.ndarray:
# Her çağrı bir runner isteği oluşturur
# BentoML bunları otomatik batch'e toplar
return await runner.predict.async_run(features)
import asyncio
import aiohttp
import numpy as np
import time
async def test_throughput(concurrency: int, total: int):
url = "http://localhost:3000/predict"
data = np.random.randn(1, 10).astype(np.float32).tolist()
sem = asyncio.Semaphore(concurrency)
async def req(session):
async with sem:
async with session.post(url, json=data) as r:
return await r.json()
async with aiohttp.ClientSession() as session:
start = time.perf_counter()
await asyncio.gather(*[req(session) for _ in range(total)])
elapsed = time.perf_counter() - start
print(f"Concurrency={concurrency:3d} | {total/elapsed:6.0f} req/s")
async def main():
for c in [1, 4, 8, 16, 32, 64]:
await test_throughput(c, 200)
asyncio.run(main())
max_latency_ms değeri SLO'nuzdan küçük olmalıdır. max_batch_size ise GPU belleği ve model boyutuna göre belirlenir. Tipik olarak max_batch_size=32-64 ve max_latency_ms=20-100ms kombinasyonu iyi bir başlangıç noktasıdır.
04 Runner: Async Dispatch ve Multi-Instance
Runner, modeli izole bir süreçte çalıştırır — API ve model scaling birbirinden bağımsız olur.
import bentoml
from bentoml.io import Image, JSON
from PIL.Image import Image as PILImage
from pydantic import BaseModel
from typing import List
# İki farklı model için ayrı runner
preprocessor_runner = bentoml.picklable_model.get("image-preprocessor:latest").to_runner(
max_batch_size=128,
max_latency_ms=10,
)
classifier_runner = bentoml.pytorch.get("resnet50-classifier:latest").to_runner(
max_batch_size=32,
max_latency_ms=30,
# Her GPU'da bağımsız scaling
nvidia_gpu=1,
)
svc = bentoml.Service(
"image-pipeline",
runners=[preprocessor_runner, classifier_runner],
)
class ClassifyResponse(BaseModel):
label: str
confidence: float
top5: List[dict]
@svc.api(input=Image(), output=JSON(pydantic_model=ClassifyResponse))
async def classify(image: PILImage) -> ClassifyResponse:
# Sıralı pipeline: iki async_run çağrısı
tensor = await preprocessor_runner.preprocess.async_run(image)
logits = await classifier_runner.forward.async_run(tensor)
import torch
probs = torch.softmax(logits, dim=-1)
top5_probs, top5_ids = probs.topk(5)
return ClassifyResponse(
label=f"class_{top5_ids[0].item()}",
confidence=top5_probs[0].item(),
top5=[{"id": int(i), "prob": float(p)} for i, p in zip(top5_ids, top5_probs)],
)
import bentoml
from bentoml import Runnable
import torch
import numpy as np
class MyModelRunnable(Runnable):
SUPPORTED_RESOURCES = ("nvidia.com/gpu",)
SUPPORTS_CPU_MULTI_THREADING = True
def __init__(self):
import torch
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = torch.jit.load("/models/my_model.pt").to(self.device)
self.model.eval()
@Runnable.method(batchable=True, batch_dim=0)
def predict(self, input_tensor: np.ndarray) -> np.ndarray:
with torch.no_grad():
tensor = torch.from_numpy(input_tensor).to(self.device)
output = self.model(tensor)
return output.cpu().numpy()
# Runner oluştur
runner = bentoml.Runner(MyModelRunnable, name="my-model-runner",
max_batch_size=32, max_latency_ms=50)
svc = bentoml.Service("custom-runner-service", runners=[runner])
@svc.api(input=bentoml.io.NumpyNdarray(), output=bentoml.io.NumpyNdarray())
async def infer(data: np.ndarray) -> np.ndarray:
return await runner.predict.async_run(data)
05 Bento Build
bentofile.yaml ile tüm bağımlılıkları paketle — yeniden üretilebilir Bento arşivi oluştur.
service: "service:svc" # modül:servis_değişkeni
labels:
owner: mlops-team
stage: production
project: sentiment-api
include:
- "service.py"
- "utils/*.py"
- "config.yaml"
exclude:
- "tests/"
- "*.ipynb"
- "__pycache__"
python:
packages:
- torch==2.2.0
- transformers==4.39.0
- scikit-learn==1.4.0
- numpy>=1.24
- pandas>=2.0
requirements_txt: "./requirements.txt" # veya requirements.txt kullan
lock_packages: true # tam versiyon kilitleme
models:
- "sentiment-model:latest"
- "distilbert-sentiment:gy5dfr2ytwpk6drp"
docker:
base_image: "python:3.11-slim"
cuda_version: "12.1"
system_packages:
- libgomp1
env:
- TOKENIZERS_PARALLELISM=false
- TRANSFORMERS_CACHE=/app/cache
cd /project
bentoml build
# Başarılı çıktı:
# ██████╗░ ███████╗███╗ ██╗████████╗ ██████╗ ███╗ ███╗██╗
# ██╔══██╗██╔════╝████╗ ██║╚══██╔══╝██╔═══██╗████╗ ████║██║
# ...
# Successfully built Bento(tag="sentiment-service:a1b2c3d4")
# Bento listesi
bentoml list
# Bento detayı
bentoml get sentiment-service:latest
# Bento'yu dışa aktar
bentoml export sentiment-service:latest ./sentiment-service.bento
# Bento'yu içe aktar
bentoml import ./sentiment-service.bento
06 Containerize — Docker İmajı
Bento'yu tek komutla Docker imajına dönüştür — CUDA destekli production-ready container.
bentoml containerize sentiment-service:latest \
--image-tag myregistry/sentiment-service:v1.0 \
--platform linux/amd64
# GPU destekli (CUDA)
bentoml containerize sentiment-service:latest \
--image-tag myregistry/sentiment-service:v1.0-gpu \
--opt build-arg=BENTOML_SERVER_THREADS=4
# Çalıştır
docker run --gpus all -p 3000:3000 \
myregistry/sentiment-service:v1.0-gpu serve
# Docker registry'e push
docker push myregistry/sentiment-service:v1.0
FROM nvcr.io/nvidia/cuda:12.1.0-cudnn8-runtime-ubuntu22.04
# Sistem bağımlılıkları
RUN apt-get update && apt-get install -y \
python3.11 python3-pip libgomp1 \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# BentoML ve bağımlılıkları kur
COPY requirements.txt .
RUN pip install --no-cache-dir bentoml -r requirements.txt
# Bento arşivini içe aktar
COPY sentiment-service.bento .
RUN bentoml import sentiment-service.bento
ENV BENTOML_HOME=/app/bentoml
EXPOSE 3000
CMD ["bentoml", "serve", "sentiment-service:latest", "--production", "--port", "3000"]
07 Deployment: BentoCloud, Kubernetes, Lambda
BentoML modellerini farklı ortamlara deploy et — Kubernetes manifesto, BentoCloud CLI ve Lambda handler.
apiVersion: apps/v1
kind: Deployment
metadata:
name: sentiment-service
namespace: ml-serving
spec:
replicas: 3
selector:
matchLabels:
app: sentiment-service
template:
metadata:
labels:
app: sentiment-service
spec:
containers:
- name: sentiment-service
image: myregistry/sentiment-service:v1.0-gpu
ports:
- containerPort: 3000
resources:
requests:
memory: "2Gi"
cpu: "500m"
nvidia.com/gpu: "1"
limits:
memory: "4Gi"
cpu: "2000m"
nvidia.com/gpu: "1"
env:
- name: BENTOML_SERVER_THREADS
value: "4"
readinessProbe:
httpGet:
path: /healthz
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
livenessProbe:
httpGet:
path: /healthz
port: 3000
initialDelaySeconds: 60
periodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:
name: sentiment-service-svc
namespace: ml-serving
spec:
selector:
app: sentiment-service
ports:
- protocol: TCP
port: 80
targetPort: 3000
type: LoadBalancer
import json
import bentoml
import numpy as np
# Lambda cold start'ta model yükle
model = bentoml.sklearn.load_model("sentiment-model:latest")
def lambda_handler(event, context):
"""AWS Lambda entry point."""
try:
body = json.loads(event.get("body", "{}"))
texts = body.get("texts", [])
if not texts:
return {"statusCode": 400, "body": json.dumps({"error": "texts required"})}
# Inference (Lambda'da GPU yok, CPU ile çalışır)
# Gerçek projede feature extraction ayrı yapılır
results = model.predict(texts)
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({
"predictions": results.tolist(),
"model_version": "latest",
}),
}
except Exception as e:
return {"statusCode": 500, "body": json.dumps({"error": str(e)})}
pip install bentoml[cloud]
# BentoCloud giriş
bentoml cloud login --api-token $BENTOCLOUD_TOKEN
# Deploy et
bentoml deploy sentiment-service:latest \
--name sentiment-prod \
--scaling-min 1 \
--scaling-max 10 \
--instance-type gpu.t4.1
# Deploy durumu
bentoml deployment list
bentoml deployment get sentiment-prod
08 Monitoring — OpenTelemetry Entegrasyonu
BentoML'in dahili OpenTelemetry desteği ile request tracing, metrik toplama ve Prometheus entegrasyonu.
pip install bentoml[tracing-otlp]
bentoml serve service:svc \
--production \
--port 3000 \
--enable-metrics \
--log-level info
# OTel environment variables
export OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
export OTEL_SERVICE_NAME=sentiment-service
export OTEL_TRACES_SAMPLER=parentbased_traceidratio
export OTEL_TRACES_SAMPLER_ARG=0.1 # %10 örnekleme
import bentoml
from bentoml.io import JSON
from opentelemetry import trace, metrics
from opentelemetry.trace import Status, StatusCode
import time
# Tracer ve meter al
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
# Custom metrikler
request_counter = meter.create_counter("sentiment_requests_total")
inference_duration = meter.create_histogram("sentiment_inference_duration_ms")
runner = bentoml.transformers.get("distilbert-sentiment:latest").to_runner()
svc = bentoml.Service("monitored-service", runners=[runner])
@svc.api(input=JSON(), output=JSON())
async def predict(data: dict) -> dict:
texts = data.get("texts", [])
request_counter.add(len(texts), {"endpoint": "/predict"})
with tracer.start_as_current_span("inference") as span:
span.set_attribute("batch_size", len(texts))
start = time.perf_counter()
try:
results = await runner.async_run(texts)
duration = (time.perf_counter() - start) * 1000
inference_duration.record(duration, {"status": "success"})
span.set_status(Status(StatusCode.OK))
return {"results": results, "latency_ms": duration}
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
inference_duration.record(
(time.perf_counter() - start) * 1000, {"status": "error"}
)
raise
| Metrik | Tür | Açıklama |
|---|---|---|
| bentoml_request_total | Counter | Toplam HTTP istek sayısı |
| bentoml_request_duration_seconds | Histogram | İstek süresi dağılımı |
| bentoml_runner_request_total | Counter | Runner'a gönderilen istek sayısı |
| bentoml_runner_batch_size | Histogram | Adaptive batch boyutu dağılımı |
| bentoml_marshal_dispatch_duration | Histogram | Batcher dispatch süresi |
09 Pratik: HuggingFace → BentoML → Docker → Kubernetes
Sentiment modeli için uçtan uca pipeline: model kaydet, servis yaz, Bento oluştur, Docker imajı yap, K8s'e deploy et.
import bentoml
from transformers import pipeline
# HuggingFace sentiment pipeline
hf_model = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english",
device=-1, # CPU (GPU için: device=0)
)
# BentoML store'a kaydet
saved = bentoml.transformers.save_model(
"distilbert-sentiment",
hf_model,
signatures={"__call__": {"batchable": True, "batch_dim": 0}},
metadata={"base_model": "distilbert-base-uncased", "task": "sentiment"},
)
print(f"Kaydedildi: {saved.tag}")
import bentoml
from bentoml.io import JSON
from pydantic import BaseModel
from typing import List
class PredictRequest(BaseModel):
texts: List[str]
class PredictResult(BaseModel):
label: str
score: float
runner = bentoml.transformers.get("distilbert-sentiment:latest").to_runner(
max_batch_size=32,
max_latency_ms=20,
)
svc = bentoml.Service("sentiment-prod", runners=[runner])
@svc.api(
input=JSON(pydantic_model=PredictRequest),
output=JSON(),
route="/predict",
)
async def predict(req: PredictRequest):
results = await runner.async_run(req.texts)
return [{"label": r["label"], "score": round(r["score"], 4)} for r in results]
# Bento oluştur
bentoml build
# Docker imajı yap
bentoml containerize sentiment-prod:latest \
--image-tag gcr.io/myproject/sentiment-prod:v1.0
# Test et
docker run -p 3000:3000 gcr.io/myproject/sentiment-prod:v1.0 serve
curl -s http://localhost:3000/predict \
-H "Content-Type: application/json" \
-d '{"texts":["Muhteşem bir deneyimdi","Berbat bir ürün"]}'
# Kubernetes'e push ve deploy
docker push gcr.io/myproject/sentiment-prod:v1.0
kubectl apply -f k8s/deployment.yaml
kubectl rollout status deployment/sentiment-service -n ml-serving
# HPA (Horizontal Pod Autoscaler)
kubectl autoscale deployment sentiment-service \
--cpu-percent=70 \
--min=2 --max=20 \
-n ml-serving
Bu pipeline ile herhangi bir HuggingFace modeli 30 dakika içinde production Kubernetes ortamına taşınabilir. Adaptive batching ile tek instance'da saniyede 150-300 istek işlenebilir; HPA ile yük artışına otomatik ölçekleme sağlanır.