00 Multi-GPU Serving Stratejileri
Data parallelism, model parallelism ve tensor parallelism — hangi strateji ne zaman kullanılır?
Büyük bir modeli veya yüksek bir yükü birden fazla GPU ile karşılamak için üç temel strateji vardır. Doğru seçim, model boyutuna, yük profiline ve donanım topolojisine göre değişir.
| Strateji | Ne Bölünür? | İdeal Durum | Zorluk |
|---|---|---|---|
| Data Parallelism | Veri (batch) | Model tek GPU'ya sığıyor, yük yüksek | Düşük — bağımsız kopyalar |
| Tensor Parallelism | Ağırlık matrisleri | Model tek GPU'ya sığmıyor | Orta — all-reduce gerekli |
| Pipeline Parallelism | Model katmanları | Çok katmanlı derin modeller | Yüksek — bubble overhead |
| Expert Parallelism | MoE uzmanları | Mixture-of-Experts modeller | Yüksek — load balancing |
Karar 1 Model < tek GPU belleği? → Data Parallelism yeterli Karar 2 Model 1-4 GPU? → Tensor Parallelism (tp_size=2/4) Karar 3 Model 4+ GPU? → TP + PP kombinasyonu Karar 4 MoE model? → Expert parallelism ekle
Tensor parallelism, aynı node içindeki GPU'lar arasında NVLink ile düşük latency all-reduce yapar. Pipeline parallelism ise node'lar arası bant genişliğine daha toleranslıdır. Bu nedenle büyük cluster'larda sıklıkla TP (intra-node) + PP (inter-node) kombinasyonu kullanılır.
01 Ray Serve Mimarisi
Deployment, replica ve handle kavramları — Ray Serve'in temel yapı taşları.
Ray Serve, Ray üzerine inşa edilmiş bir model serving kütüphanesidir. Deployment bir servis birimini, replica bu servisin bir kopyasını, handle ise diğer deployment'lardan çağrı yapmak için kullanılan referansı temsil eder.
pip install "ray[serve]"
pip install "ray[serve,air]" # Air ile birlikte (ML için önerilen)
# Ray cluster başlat
ray start --head --port=6379 --dashboard-port=8265
# Worker node ekle
ray start --address=HEAD_IP:6379 --num-gpus=4
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle
import numpy as np
ray.init(address="auto") # mevcut cluster'a bağlan
@serve.deployment(
name="echo-service",
num_replicas=2,
ray_actor_options={"num_cpus": 1.0, "num_gpus": 0.5},
)
class EchoService:
def __init__(self):
self.call_count = 0
async def __call__(self, request):
self.call_count += 1
body = await request.json()
return {"echo": body, "calls": self.call_count}
# Deploy et
serve.run(EchoService.bind(), route_prefix="/echo")
# Test
import requests
resp = requests.post("http://localhost:8000/echo", json={"hello": "world"})
print(resp.json())
02 Ray Serve ile LLM Deployment
vLLM + Ray Serve kombinasyonu ile ölçeklenebilir LLM serving — her replica kendi GPU'suna sahip.
import ray
from ray import serve
from vllm import LLM, SamplingParams
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
ray.init()
app = FastAPI()
class GenerateRequest(BaseModel):
prompt: str
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 0.95
stop: Optional[List[str]] = None
@serve.deployment(
name="llm-service",
num_replicas=2, # 2 replica = 2 GPU
ray_actor_options={
"num_gpus": 1, # her replica 1 GPU
"num_cpus": 4,
"memory": 20 * 1024**3, # 20 GB RAM
},
max_ongoing_requests=100,
graceful_shutdown_timeout_s=30,
)
@serve.ingress(app)
class LLMService:
def __init__(self):
import torch
self.llm = LLM(
model="meta-llama/Meta-Llama-3-8B-Instruct",
tensor_parallel_size=1,
gpu_memory_utilization=0.90,
max_model_len=4096,
dtype="bfloat16",
enable_prefix_caching=True,
)
print(f"LLM hazır — GPU: {torch.cuda.current_device()}")
@app.post("/generate")
async def generate(self, request: GenerateRequest):
sampling = SamplingParams(
temperature=request.temperature,
top_p=request.top_p,
max_tokens=request.max_tokens,
stop=request.stop or [],
)
outputs = self.llm.generate([request.prompt], sampling)
return {
"text": outputs[0].outputs[0].text,
"tokens": len(outputs[0].outputs[0].token_ids),
}
# Deploy et
serve.run(LLMService.bind(), route_prefix="/llm")
from ray.serve.config import HTTPOptions, gRPCOptions
# Replica sayısını güncelle (zero-downtime)
serve.run(
LLMService.options(num_replicas=4).bind(),
route_prefix="/llm",
)
# Deployment durumunu sorgula
status = serve.status()
for name, dep in status.applications.items():
print(f"{name}: {dep.status}")
03 Model Sharding — Pipeline Parallelism
70B modeli 8 GPU'ya yay — Megatron-LM tarzı katman bölümlemesi ve micro-batch pipeline.
Pipeline parallelism'de model katmanları stage'lere bölünür. Her stage farklı bir GPU grubunda çalışır. Veriler stage'ler arasında micro-batch'ler halinde akar — bu sayede GPU'lar boş kalmadan ardı ardına çalışabilir.
from vllm import LLM
# 8 GPU: 4 TP × 2 PP
llm = LLM(
model="meta-llama/Meta-Llama-3-70B-Instruct",
tensor_parallel_size=4, # 4 GPU per PP stage
pipeline_parallel_size=2, # 2 stages = 8 GPU total
max_model_len=4096,
gpu_memory_utilization=0.95,
dtype="bfloat16",
distributed_executor_backend="ray", # Ray ile dağıtık
)
import torch
import torch.distributed as dist
from torch.distributed.pipeline.sync import Pipe
def build_pipeline_model(model_config, pp_size: int):
"""
Modeli pp_size kadar stage'e böl.
Her stage bir GPU'ya atanır.
"""
layers = build_transformer_layers(model_config) # katmanları oluştur
n_layers = len(layers)
layers_per_stage = n_layers // pp_size
stages = []
for i in range(pp_size):
start = i * layers_per_stage
end = start + layers_per_stage if i < pp_size - 1 else n_layers
stage_layers = torch.nn.Sequential(*layers[start:end])
stages.append(stage_layers.to(f"cuda:{i}"))
# Micro-batch boyutu: toplam batch / pipeline stage sayısı
micro_batch_size = 4
pipe_model = Pipe(
torch.nn.Sequential(*stages),
chunks=micro_batch_size,
)
return pipe_model
import ray
from ray import serve
@serve.deployment(
num_replicas=1,
ray_actor_options={"num_gpus": 0}, # Koordinatör: GPU yok
)
class PipelineCoordinator:
def __init__(self, model_handle):
self.model = model_handle
async def __call__(self, request):
data = await request.json()
return await self.model.infer.remote(data["prompt"])
@serve.deployment(
num_replicas=1,
ray_actor_options={"num_gpus": 8}, # Tüm GPU'lar bu replica'ya
)
class LargeModelShard:
def __init__(self):
# vLLM içsel olarak 8 GPU'yu pipeline parallel kullanır
from vllm import LLM
self.llm = LLM(
model="meta-llama/Meta-Llama-3-70B-Instruct",
tensor_parallel_size=4,
pipeline_parallel_size=2,
distributed_executor_backend="ray",
)
def infer(self, prompt: str):
from vllm import SamplingParams
outputs = self.llm.generate([prompt], SamplingParams(max_tokens=256))
return outputs[0].outputs[0].text
shard = LargeModelShard.bind()
coordinator = PipelineCoordinator.bind(shard)
serve.run(coordinator, route_prefix="/generate")
04 Autoscaling Politikaları
Ray Serve autoscale policy — target_num_ongoing_requests ile CPU/GPU kullanımına göre replica yönetimi.
from ray import serve
from ray.serve.config import AutoscalingConfig
@serve.deployment(
name="autoscaled-llm",
autoscaling_config=AutoscalingConfig(
min_replicas=1,
max_replicas=8,
initial_replicas=2,
# Hedef: her replica'da max 5 eş zamanlı istek
target_num_ongoing_requests=5,
# Scale-up: 15s boyunca hedefin üzerindeyse replica ekle
upscale_delay_s=15,
# Scale-down: 300s boyunca hedefin altındaysa replica azalt
downscale_delay_s=300,
# Smoothing: ani değişimleri engelle
upscale_smoothing_factor=0.5,
downscale_smoothing_factor=1.0,
# Metrik lookback penceresi
look_back_period_s=30,
),
ray_actor_options={"num_gpus": 1},
max_ongoing_requests=10,
)
class AutoscaledLLM:
def __init__(self):
from vllm import LLM
self.llm = LLM(model="meta-llama/Meta-Llama-3-8B-Instruct",
gpu_memory_utilization=0.85)
async def __call__(self, request):
from vllm import SamplingParams
body = await request.json()
out = self.llm.generate([body["prompt"]], SamplingParams(max_tokens=256))
return {"text": out[0].outputs[0].text}
import ray
import time
ray.init(address="auto")
def monitor_replicas(deployment_name: str, duration_s: int = 60):
from ray.serve.context import _get_serve_router
start = time.time()
while time.time() - start < duration_s:
status = serve.status()
for app_name, app_status in status.applications.items():
for dep_name, dep_status in app_status.deployments.items():
if deployment_name in dep_name:
print(f"[{time.strftime('%H:%M:%S')}] "
f"Replicas: {dep_status.replica_states}")
time.sleep(5)
monitor_replicas("autoscaled-llm", duration_s=120)
05 Spot Instance Stratejisi
Preemption handling, checkpoint stratejisi ve spot/on-demand hibrit cluster ile maliyet optimizasyonu.
Spot (preemptible) instance'lar on-demand'e kıyasla %60-90 ucuzdur; ancak herhangi bir anda geri alınabilir. LLM serving için strateji, on-demand instance'larda minimum kapasiteyi korurken spot'larla yük artışını karşılamaktır.
cluster_name: llm-cluster
head_node_type: head.r6i.4xlarge
available_node_types:
# On-demand: minimum kapasite garantisi
worker.ondemand.a100:
node_config:
InstanceType: p4d.24xlarge # 8x A100
ImageId: ami-0abcdef123456
resources:
CPU: 96
GPU: 8
memory: 1099511627776 # 1TB
min_workers: 1
max_workers: 2
# Spot: maliyet tasarrufu
worker.spot.a10g:
node_config:
InstanceType: g5.12xlarge # 4x A10G
ImageId: ami-0abcdef123456
InstanceMarketOptions:
MarketType: spot
SpotOptions:
MaxPrice: "4.50"
SpotInstanceType: persistent
resources:
CPU: 48
GPU: 4
memory: 192937051136
min_workers: 0
max_workers: 8
spot: true
worker_start_ray_commands:
- ray start --address=$RAY_HEAD_IP:6379 \
--num-gpus=$NUM_GPUS \
--resources='{"spot": 1}'
from ray import serve
@serve.deployment(
autoscaling_config={
"min_replicas": 2, # On-demand'de minimum 2
"max_replicas": 10,
"target_num_ongoing_requests": 5,
},
ray_actor_options={
"num_gpus": 1,
"resources": {}, # Spot veya on-demand kabul eder
},
)
class FaultTolerantLLM:
def __init__(self):
import signal, os
# SIGTERM yakalandığında checkpoint al
signal.signal(signal.SIGTERM, self._graceful_shutdown)
from vllm import LLM
self.llm = LLM(
model="meta-llama/Meta-Llama-3-8B-Instruct",
gpu_memory_utilization=0.85,
)
self.request_count = 0
def _graceful_shutdown(self, signum, frame):
"""Spot preemption — temiz kapanış."""
import json, os
state = {"request_count": self.request_count}
with open("/tmp/checkpoint.json", "w") as f:
json.dump(state, f)
print(f"Graceful shutdown — {self.request_count} istek işlendi.")
async def __call__(self, request):
self.request_count += 1
body = await request.json()
from vllm import SamplingParams
out = self.llm.generate([body["prompt"]], SamplingParams(max_tokens=256))
return {"text": out[0].outputs[0].text, "req_id": self.request_count}
AWS'de A100 on-demand ~$32/saat, spot ise $8-12/saat. 2 on-demand + 6 spot konfigürasyonu ile yük %80 spot üzerinde karşılanabilir; bu yapı %65 maliyet tasarrufu sağlar. Kritik model inference için on-demand guaranteed kapasitesi mutlaka tutulmalıdır.
06 GPU Memory Optimizasyonu
Activation checkpointing, gradient offloading ve mixed precision ile VRAM kullanımını minimize et.
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint_sequential
class OptimizedTransformerLayer(nn.Module):
def __init__(self, d_model: int, n_heads: int, ffn_dim: int):
super().__init__()
self.attention = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
self.ffn = nn.Sequential(
nn.Linear(d_model, ffn_dim),
nn.GELU(),
nn.Linear(ffn_dim, d_model),
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x):
# Activation checkpointing: forward sırasında aktivasyonları saklamaz
# backward sırasında yeniden hesaplar → VRAM tasarrufu
attn_out, _ = self.attention(x, x, x)
x = self.norm1(x + attn_out)
ffn_out = self.ffn(x)
return self.norm2(x + ffn_out)
class LargeTransformer(nn.Module):
def __init__(self, n_layers=32, d_model=4096, n_heads=32, ffn_dim=16384):
super().__init__()
self.layers = nn.ModuleList([
OptimizedTransformerLayer(d_model, n_heads, ffn_dim)
for _ in range(n_layers)
])
def forward(self, x):
# checkpoint_sequential: katmanları gruplara böl, her grup checkpoint'le
segments = 4 # 4 gruba böl → VRAM %75 azalır, %30 daha yavaş
return checkpoint_sequential(self.layers, segments, x)
import deepspeed
ds_config = {
"bf16": {"enabled": True},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": True,
},
"offload_param": {
"device": "cpu",
"pin_memory": True,
},
"overlap_comm": True,
"contiguous_gradients": True,
"reduce_bucket_size": 5e7,
"stage3_prefetch_bucket_size": 5e7,
"stage3_param_persistence_threshold": 1e5,
},
"gradient_accumulation_steps": 4,
"train_micro_batch_size_per_gpu": 2,
}
# DeepSpeed engine ile model wrap et
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config=ds_config,
)
| Teknik | VRAM Tasarrufu | Hız Etkisi | Kullanım |
|---|---|---|---|
| FP16/BF16 | %50 | +%20-50 hız | Her zaman |
| Activation Checkpointing | %60-80 | -%20-30 yavaş | Büyük modeller |
| Gradient Offload (CPU) | %30-50 | -%40-60 yavaş | VRAM çok kısıtlıysa |
| ZeRO Stage 3 | %85+ | -%30-50 yavaş | Multi-node training |
| Flash Attention 2 | %30 | +%30-60 hız | Long context |
07 NCCL ve Communication Backend
GPU'lar arası iletişim için NCCL — all-reduce, all-gather ve broadcast operasyonları.
NCCL (NVIDIA Collective Communications Library), çok GPU'lu sistemlerde tensor iletişimini optimize eden kütüphanedir. PyTorch distributed, vLLM ve DeepSpeed dahili olarak NCCL kullanır.
import torch
import torch.distributed as dist
import os
def setup_nccl(rank: int, world_size: int):
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
os.environ["NCCL_DEBUG"] = "INFO" # debug için
os.environ["NCCL_SOCKET_IFNAME"] = "eth0" # network interface
dist.init_process_group(
backend="nccl",
rank=rank,
world_size=world_size,
)
torch.cuda.set_device(rank)
def all_reduce_example(rank: int, world_size: int):
setup_nccl(rank, world_size)
# Her GPU kendi tensorunu oluşturur
tensor = torch.ones(1024, 1024, device=f"cuda:{rank}") * (rank + 1)
print(f"Rank {rank}: tensor sum before = {tensor.sum().item()}")
# All-reduce: tüm GPU'lardaki tensorları topla
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
print(f"Rank {rank}: tensor sum after = {tensor.sum().item()}")
# All-gather: tüm GPU'lardaki tensorları birleştir
gathered = [torch.zeros(1024, 1024, device=f"cuda:{rank}") for _ in range(world_size)]
local = torch.randn(1024, 1024, device=f"cuda:{rank}")
dist.all_gather(gathered, local)
dist.destroy_process_group()
# Çalıştır: torchrun --nproc_per_node=4 script.py
# NVLink varsa — en hızlı path
export NCCL_P2P_DISABLE=0
export NCCL_SHM_DISABLE=0
# NVLink yoksa InfiniBand kullan
export NCCL_IB_DISABLE=0
export NCCL_IB_HCA=mlx5_0
# Büyük mesaj için socket
export NCCL_SOCKET_NTHREADS=8
export NCCL_NSOCKS_PERTHREAD=4
# Debug loglama
export NCCL_DEBUG=WARN # INFO, WARN, ERROR
# NCCL bandwidth testi
./nccl-tests/build/all_reduce_perf \
-b 8 -e 512M -f 2 -g 4
08 Profiling: NVTX, Nsight, dcgm-exporter
GPU utilization, memory bandwidth ve kernel hotspot'larını bul — Prometheus ile sürekli izleme.
import torch
import nvtx # pip install nvtx
@nvtx.annotate("tokenization", color="blue")
def tokenize_batch(texts, tokenizer):
return tokenizer(texts, padding=True, truncation=True,
return_tensors="pt", max_length=512)
@nvtx.annotate("model_forward", color="green")
def model_forward(model, inputs):
with torch.no_grad():
return model(**inputs)
@nvtx.annotate("postprocess", color="red")
def postprocess(outputs):
return outputs.logits.softmax(-1).cpu().numpy()
# NVTX ile profil al:
# nsys profile --trace cuda,nvtx python script.py
# Full trace: CUDA + NVTX + OS events
nsys profile \
--trace=cuda,nvtx,osrt \
--output=profile_report \
--force-overwrite=true \
python inference_script.py
# Raporu aç
nsys-ui profile_report.nsys-rep
# CLI özeti
nsys stats profile_report.nsys-rep \
--report gputrace \
--format csv \
--output gputrace.csv
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: dcgm-exporter
namespace: monitoring
spec:
selector:
matchLabels:
app: dcgm-exporter
template:
metadata:
labels:
app: dcgm-exporter
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9400"
spec:
containers:
- name: dcgm-exporter
image: nvcr.io/nvidia/k8s/dcgm-exporter:3.2.6-3.1.9-ubuntu22.04
ports:
- containerPort: 9400
env:
- name: DCGM_EXPORTER_KUBERNETES_GPU_ID_TYPE
value: uid
securityContext:
privileged: true
resources:
limits:
nvidia.com/gpu: 1
| Metrik | DCGM Etiketi | İdeal Değer |
|---|---|---|
| GPU Utilization | DCGM_FI_DEV_GPU_UTIL | > 80% |
| Memory Utilization | DCGM_FI_DEV_MEM_COPY_UTIL | > 70% |
| VRAM Used | DCGM_FI_DEV_FB_USED | Model gereksinimine göre |
| SM Clock | DCGM_FI_DEV_SM_CLOCK | Boost clock'a yakın |
| NVLink Bandwidth | DCGM_FI_PROF_NVLINK_TX_BYTES | Donanım limiti × 0.8 |
| PCIe Bandwidth | DCGM_FI_PROF_PCIE_TX_BYTES | < 16 GB/s |
09 Pratik: Ray Serve ile 70B Model Pipeline Parallelism
8 GPU cluster'da 70B Llama-3 modelini pipeline + tensor parallelism ile deploy et ve autoscaling demosu çalıştır.
import ray
from ray import serve
from vllm import LLM, SamplingParams
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional, List
import asyncio
ray.init(address="auto")
app = FastAPI(title="70B LLM Service")
class ChatRequest(BaseModel):
messages: List[dict]
max_tokens: int = 512
temperature: float = 0.7
stream: bool = False
@serve.deployment(
name="llama3-70b",
num_replicas=1, # 1 replica = 8 GPU (tamamını kullan)
ray_actor_options={
"num_gpus": 8,
"num_cpus": 16,
"memory": 200 * 1024**3, # 200 GB RAM
},
max_ongoing_requests=50,
graceful_shutdown_timeout_s=120,
health_check_period_s=30,
)
@serve.ingress(app)
class Llama3_70B:
def __init__(self):
self.llm = LLM(
model="meta-llama/Meta-Llama-3-70B-Instruct",
tensor_parallel_size=4,
pipeline_parallel_size=2,
distributed_executor_backend="ray",
gpu_memory_utilization=0.95,
max_model_len=8192,
dtype="bfloat16",
enable_prefix_caching=True,
enforce_eager=False,
)
self.tokenizer = self.llm.get_tokenizer()
print("Llama-3 70B hazır — TP=4, PP=2")
@app.post("/chat")
async def chat(self, request: ChatRequest):
prompt = self.tokenizer.apply_chat_template(
request.messages,
tokenize=False,
add_generation_prompt=True,
)
params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens,
)
outputs = self.llm.generate([prompt], params)
text = outputs[0].outputs[0].text
tokens = len(outputs[0].outputs[0].token_ids)
return {"content": text, "usage": {"completion_tokens": tokens}}
@app.get("/health")
def health(self):
return {"status": "ok", "model": "llama-3-70b"}
serve.run(Llama3_70B.bind(), route_prefix="/")
import asyncio, aiohttp, time, statistics
async def load_test(target_rps: int = 5, duration_s: int = 60):
url = "http://localhost:8000/chat"
payload = {
"messages": [{"role": "user", "content": "Python'da async programlamayı açıkla."}],
"max_tokens": 256,
"temperature": 0.7,
}
latencies = []
errors = 0
interval = 1.0 / target_rps
async def req(session, req_id):
start = time.perf_counter()
try:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(60)) as r:
result = await r.json()
latencies.append(time.perf_counter() - start)
except Exception as e:
nonlocal errors
errors += 1
async with aiohttp.ClientSession() as session:
start = time.time()
tasks = []
req_id = 0
while time.time() - start < duration_s:
tasks.append(asyncio.create_task(req(session, req_id)))
req_id += 1
await asyncio.sleep(interval)
await asyncio.gather(*tasks)
print(f"Toplam: {req_id} istek, {errors} hata")
print(f"P50: {statistics.median(latencies):.2f}s")
print(f"P95: {sorted(latencies)[int(0.95*len(latencies))]:.2f}s")
print(f"P99: {sorted(latencies)[int(0.99*len(latencies))]:.2f}s")
asyncio.run(load_test(target_rps=3, duration_s=120))
Llama-3 70B, 8× A100 80GB (TP=4, PP=2) konfigürasyonuyla tipik olarak 300-500 token/s throughput, 2-5s TTFT (p50) sağlar. Autoscaling, 15 dakikalık warm-up gerektirdiğinden kademeli yük artışı ile test edilmelidir. Spot instance preemption oranı düşük yükte <%5 kalır.