00 Federated Learning Nedir? — Merkezi vs Dağıtık Eğitim
Federated Learning, veriyi asla merkezi sunucuya göndermeden her cihazda yerel eğitim yapıp yalnızca model güncellemelerini paylaşır.
Klasik makine öğrenmesi sürecinde tüm veri bir sunucuda toplanır ve model bu merkezi küme üzerinde eğitilir. Bu yaklaşım kritik problemler doğurur: hastane verileri HIPAA/KVKK kapsamındadır, telefon kullanım verileri kişisel bilgi içerir, bankacılık verileri müşteri gizliliğini gerektirir. Federated Learning (FL), 2017'de Google tarafından önerilen çözümdür: model ağırlıkları veya gradyanlar paylaşılır; ham veri asla cihazı terk etmez.
FL'nin temel akışı: sunucu global modeli seçilen istemcilere gönderir; her istemci yerel verisiyle birkaç epoch eğitim yapar; istemciler yalnızca model güncellemelerini geri gönderir; sunucu güncellemeleri toplar. Bu döngü iletişim turları boyunca tekrarlanır. Google Gboard yazılım tahmini, Apple siri kişiselleştirmesi ve tıbbi görüntü analizinde gerçek kullanım örnekleri mevcuttur.
| Özellik | Merkezi Eğitim | Federated Learning |
|---|---|---|
| Veri konumu | Merkezi sunucu | İstemci cihazlar |
| Gizlilik | Düşük | Yüksek |
| Bant genişliği | Veri boyutu (GB) | Model boyutu (MB) |
| Heterojenlik | IID | Non-IID (sorun) |
| Ölçeklenebilirlik | Merkezi GPU cluster | Milyonlarca uç cihaz |
FL, gizliliği garanti etmez — gradyanlardan orijinal veri kısmen geri üretilebilir (gradient inversion attack). Gerçek gizlilik garantisi için Differential Privacy veya Secure Aggregation gereklidir.
01 FedAvg Algoritması — Matematiksel Formülasyon
McMahan et al. 2017'de yayımlanan FedAvg, veri miktarıyla ağırlıklı model ortalaması alarak FL'nin temel algoritmasını tanımlar.
FedAvg'ın optimize ettiği hedef: min_θ F(θ) = Σ_k (n_k/n) F_k(θ). Burada F_k istemci k'nın yerel kayıp fonksiyonu, n_k yerel veri miktarı. Her iletişim turunda: sunucu C_t istemci alt kümesi seçer; her istemci k, θ'dan başlayarak E epoch SGD çalıştırır; sonuçlar n_k/n ağırlıklarıyla ortalaması alınır.
FedAvg'ın teorik analizi: IID verilerle yeteri kadar tur sonunda global minimuma yakınsama kanıtlanmıştır. Ancak non-IID veride her istemci kendi yerel optimumuna doğru çekildiğinden client drift problemi ortaya çıkar ve global model bozulur. Pratik parametreler: C (seçim fraksiyonu, tipik 0.1), E (yerel epochs, tipik 1-5), B (batch size).
import numpy as np
from copy import deepcopy
from typing import List, Tuple
def fedavg(
server_params: List[np.ndarray],
client_updates: List[Tuple[List[np.ndarray], int]]
) -> List[np.ndarray]:
"""
FedAvg agregasyonu.
client_updates: [(params_k, n_k), ...] listesi
"""
total_samples = sum(n for _, n in client_updates)
aggregated = [np.zeros_like(p) for p in server_params]
for client_params, n_k in client_updates:
weight = n_k / total_samples
for i, param in enumerate(client_params):
aggregated[i] += weight * param
return aggregated
# ── Simülasyon ────────────────────────────────────────────────
def simulate_fedavg(n_clients=10, rounds=20, frac=0.5):
# Basit lineer model: W ∈ ℝ^{100×1}
server_W = np.zeros((100, 1))
# Farklı boyutlarda istemci veri setleri
client_data = [
(np.random.randn(np.random.randint(50, 200), 100),
np.random.randn(np.random.randint(50, 200), 1))
for _ in range(n_clients)
]
for rnd in range(rounds):
n_sel = max(1, int(n_clients * frac))
selected = np.random.choice(n_clients, n_sel, replace=False)
updates = []
for k in selected:
X_k, y_k = client_data[k]
local_W = deepcopy(server_W)
# 5 epoch SGD
for _ in range(5):
grad = X_k.T @ (X_k @ local_W - y_k) / len(y_k)
local_W -= 0.01 * grad
updates.append(([local_W], len(y_k)))
server_W = fedavg([server_W], updates)[0]
loss = np.mean((client_data[0][0] @ server_W - client_data[0][1])**2)
if rnd % 5 == 0:
print(f"Round {rnd+1:3d} | MSE: {loss:.4f}")
simulate_fedavg()
02 Flower Mimarisi — Server, Client, Strategy
Flower, framework-agnostik FL kütüphanesidir; PyTorch, TensorFlow veya scikit-learn gibi herhangi bir ML kütüphanesiyle çalışır.
Flower'ın üç temel bileşeni vardır. Server: iletişimi yönetir, stratejiyi çalıştırır, round döngüsünü kontrol eder. Client: NumPyClient veya Client arayüzünü uygular; get_parameters, set_parameters, fit, evaluate metotları zorunludur. Strategy: agregasyon mantığı; FedAvg, FedProx, FedAdam, QFedAvg, FaultTolerantFedAvg gibi hazır stratejiler veya Strategy arayüzünden özel implementasyon.
import flwr as fl
import torch
import torch.nn as nn
import numpy as np
from collections import OrderedDict
from typing import List, Tuple, Dict
class FlowerClient(fl.client.NumPyClient):
def __init__(self, model: nn.Module,
train_loader, val_loader,
device: str = "cpu"):
self.model = model.to(device)
self.train_loader = train_loader
self.val_loader = val_loader
self.device = device
self.criterion = nn.CrossEntropyLoss()
def get_parameters(self, config: Dict) -> List[np.ndarray]:
self.model.eval()
return [val.cpu().numpy()
for _, val in self.model.state_dict().items()]
def set_parameters(self, parameters: List[np.ndarray]) -> None:
params_dict = zip(
self.model.state_dict().keys(), parameters
)
state_dict = OrderedDict({
k: torch.tensor(v) for k, v in params_dict
})
self.model.load_state_dict(state_dict, strict=True)
def fit(self, parameters, config):
self.set_parameters(parameters)
epochs = int(config.get("local_epochs", 1))
lr = float(config.get("lr", 0.01))
opt = torch.optim.SGD(self.model.parameters(), lr=lr, momentum=0.9)
self.model.train()
for _ in range(epochs):
for X, y in self.train_loader:
X, y = X.to(self.device), y.to(self.device)
opt.zero_grad()
self.criterion(self.model(X), y).backward()
opt.step()
n = len(self.train_loader.dataset)
return self.get_parameters(config={}), n, {}
def evaluate(self, parameters, config):
self.set_parameters(parameters)
self.model.eval()
loss = correct = total = 0
with torch.no_grad():
for X, y in self.val_loader:
X, y = X.to(self.device), y.to(self.device)
logits = self.model(X)
loss += self.criterion(logits, y).item()
correct += (logits.argmax(1) == y).sum().item()
total += len(y)
return loss / len(self.val_loader), total, {"accuracy": correct/total}
03 İlk Flower Simülasyonu — FashionMNIST ile 10 Client
Flower'ın simülasyon modu, gerçek ağa ihtiyaç duymadan tek makinede çoklu istemci simüle eder; başlangıç için idealdir.
Flower simülasyonu (flwr.simulation.start_simulation) gerçek gRPC bağlantısı kurmaz; her istemciyi aynı süreçte çalıştırır ve Ray ile paralelleştirebilir. 10 istemci, 5 tur FedAvg ile FashionMNIST üzerinde temel FL deneyi aşağıda gösterilmiştir.
import flwr as fl
import torch, torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np
from collections import OrderedDict
# ── Basit CNN ────────────────────────────────────────────────
class CNN(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Conv2d(1, 32, 3), nn.ReLU(),
nn.Conv2d(32, 64, 3), nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten(),
nn.Linear(64 * 4 * 4, 128), nn.ReLU(),
nn.Linear(128, 10),
)
def forward(self, x): return self.net(x)
# ── FashionMNIST IID partition ────────────────────────────────
NUM_CLIENTS = 10
transform = transforms.Compose([transforms.ToTensor()])
trainset = datasets.FashionMNIST(".", train=True, download=True, transform=transform)
testset = datasets.FashionMNIST(".", train=False, download=True, transform=transform)
# Veriyi eşit böl
partition_size = len(trainset) // NUM_CLIENTS
partitions = [
Subset(trainset, range(i * partition_size, (i+1) * partition_size))
for i in range(NUM_CLIENTS)
]
# ── Client factory ───────────────────────────────────────────
def client_fn(cid: str) -> fl.client.NumPyClient:
model = CNN()
client_id = int(cid)
train_loader = DataLoader(partitions[client_id], batch_size=32, shuffle=True)
val_loader = DataLoader(testset, batch_size=64)
return FlowerClient(model, train_loader, val_loader)
# ── Strateji: FedAvg ──────────────────────────────────────────
strategy = fl.server.strategy.FedAvg(
fraction_fit=0.5,
fraction_evaluate=0.3,
min_fit_clients=5,
min_evaluate_clients=3,
min_available_clients=NUM_CLIENTS,
on_fit_config_fn=lambda r: {"local_epochs": 2},
)
# ── Simülasyonu başlat ───────────────────────────────────────
history = fl.simulation.start_simulation(
client_fn=client_fn,
num_clients=NUM_CLIENTS,
config=fl.server.ServerConfig(num_rounds=10),
strategy=strategy,
client_resources={"num_cpus": 1, "num_gpus": 0},
)
print(f"Son tur loss: {history.losses_distributed[-1]}")
print(f"Son tur accuracy: {history.metrics_distributed['accuracy'][-1]}")
04 Non-IID Veri Sorunu — Dirichlet Dağılımı & Stratified Sampling
Gerçek FL sistemlerinde istemci verileri IID değildir; bir telefon kullanıcısı belirli sınıflara ağırlıklı veriye sahiptir. Bu durum FedAvg performansını önemli ölçüde düşürür.
Dirichlet dağılımı, Non-IID partition simüle etmek için standart yöntemdir. α parametresi heterojenliği kontrol eder: α → ∞ IID'ye, α → 0 her istemcide tek sınıfa yaklaşır. α = 0.5 orta derece non-IID için yaygın kullanımdır. Non-IID'nin etkisi: yerel model kendi sınıflarına overfit olur, FedAvg agregasyonu bunu dengelemeye çalışır ama performans düşer.
import numpy as np
from torch.utils.data import Subset
from torchvision import datasets, transforms
def dirichlet_partition(dataset, n_clients: int, alpha: float = 0.5,
seed: int = 42):
"""
Dirichlet dağılımıyla Non-IID partition.
alpha düşük → daha heterojen
"""
rng = np.random.default_rng(seed)
labels = np.array([dataset[i][1] for i in range(len(dataset))])
n_classes = int(labels.max()) + 1
# Her sınıf için indeksleri topla
class_indices = [np.where(labels == c)[0] for c in range(n_classes)]
client_indices = [[] for _ in range(n_clients)]
for c in range(n_classes):
rng.shuffle(class_indices[c])
# Dirichlet dağılımı: her sınıf istemciler arasında nasıl bölünür?
proportions = rng.dirichlet(np.repeat(alpha, n_clients))
cumulative = np.cumsum(proportions * len(class_indices[c])).astype(int)
splits = np.split(class_indices[c], cumulative[:-1])
for k, split in enumerate(splits):
client_indices[k].extend(split.tolist())
return [Subset(dataset, idx) for idx in client_indices]
# ── Kullanım ────────────────────────────────────────────────
trainset = datasets.FashionMNIST(".", train=True, download=True,
transform=transforms.ToTensor())
# IID (alpha büyük)
iid_partitions = dirichlet_partition(trainset, n_clients=10, alpha=100)
# Non-IID (alpha küçük)
noniid_partitions = dirichlet_partition(trainset, n_clients=10, alpha=0.3)
# Sınıf dağılımını göster
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
for title, parts, ax in zip(["IID (α=100)", "Non-IID (α=0.3)"],
[iid_partitions, noniid_partitions], axes):
dist = np.zeros((10, 10))
for k, part in enumerate(parts):
for _, label in part:
dist[k][label] += 1
ax.imshow(dist, aspect="auto", cmap="Blues")
ax.set_title(title)
plt.tight_layout()
05 Differential Privacy — Gaussian Mechanism & clip_grad
Differential Privacy (DP), gradyanlar üzerine matematiksel gürültü ekleyerek tek bir istemcinin varlığını gizler ve gizlilik garantisi sağlar.
(ε, δ)-Differential Privacy: bir algoritma, herhangi iki komşu veritabanı için çıktı dağılımları arasındaki fark ε ve δ parametreleriyle sınırlandırılmışsa DP sağlar. FL bağlamında: her istemci gradyanlarını clipping (L2 normunu C ile sınırla) ve Gaussian gürültü (σ × C standart sapmayla N(0,1) ekle) adımlarından geçirir. Flower, DifferentialPrivacyClientSideAdaptiveClipping stratejisiyle yerleşik DP desteği sunar.
import torch, numpy as np
import flwr as fl
# ── Gradient clipping + Gaussian gürültü ────────────────────
def dp_noise(params: list, clip_C: float = 1.0,
noise_mult: float = 0.1) -> list:
"""
clip_C: L2 norm sınırı
noise_mult: σ = noise_mult × clip_C
"""
# 1. Global L2 norm hesapla
total_norm = np.sqrt(sum(np.sum(p**2) for p in params))
# 2. Clip (scale factor)
scale = min(1.0, clip_C / (total_norm + 1e-8))
clipped = [p * scale for p in params]
# 3. Gaussian gürültü ekle
sigma = noise_mult * clip_C
noised = [p + np.random.normal(0, sigma, p.shape)
for p in clipped]
return noised
# ── DP-aware FlowerClient ─────────────────────────────────────
class DPFlowerClient(FlowerClient):
def __init__(self, *args, clip_C=1.0, noise_mult=0.1, **kwargs):
super().__init__(*args, **kwargs)
self.clip_C = clip_C
self.noise_mult = noise_mult
def fit(self, parameters, config):
new_params, n_samples, metrics = super().fit(parameters, config)
# Gradient farkı = new - old params → clip + noise
delta = [n - o for n, o in zip(new_params, parameters)]
dp_delta = dp_noise(delta, self.clip_C, self.noise_mult)
dp_params = [o + d for o, d in zip(parameters, dp_delta)]
return dp_params, n_samples, metrics
# ── Flower DP strateji (yerleşik) ────────────────────────────
from flwr.server.strategy import DifferentialPrivacyClientSideAdaptiveClipping
dp_strategy = DifferentialPrivacyClientSideAdaptiveClipping(
strategy=fl.server.strategy.FedAvg(
fraction_fit=0.5, min_fit_clients=5
),
noise_multiplier=0.1,
num_sampled_clients=5,
initial_clipping_norm=1.0,
)
DP gürültüsü arttıkça gizlilik artar ama model doğruluğu düşer. noise_mult = 0.01 ile başlayın ve ε değerini (privacy budget) Opacus'un accountant'ı ile ölçün. Tipik hedef: ε < 10, δ < 1/n_data_points.
06 Secure Aggregation — HE ve Secret Sharing Temelleri
Secure Aggregation, sunucunun bireysel model güncellemelerini görmeden toplamı hesaplamasına olanak tanır; kriptografik gizlilik sağlar.
Gradient inversion saldırıları, sunucunun bireysel gradyanlardan orijinal veriyi kısmen geri üretebileceğini göstermiştir. Secure Aggregation, bunu önler: sunucu yalnızca toplamı öğrenir, hiçbir istemcinin gönderisini ayrı ayrı göremez. İki temel yaklaşım vardır: Secret Sharing (Shamir's Secret Sharing) — her istemci gönderisini parçalara böler, her parça farklı tarafa gider; Homomorphic Encryption (HE) — şifreli değerler üzerinde toplama yapılır, sunucu şifreleme anahtarına sahip değildir.
import numpy as np
# ── Basit maskeli toplama (pairwise masking) ─────────────────
# İstemci i ve j, aralarında paylaşılan maske r_ij kullanır:
# i gönderir: g_i + r_ij, j gönderir: g_j - r_ij
# Toplam: (g_i + r_ij) + (g_j - r_ij) = g_i + g_j ← r iptal olur
def pairwise_masked_sum(gradients: list, seed: int = 0) -> np.ndarray:
"""
n istemci için basit mask-based secure aggregation simülasyonu.
"""
n = len(gradients)
dim = gradients[0].shape
rng = np.random.default_rng(seed)
masked = [g.copy() for g in gradients]
for i in range(n):
for j in range(i + 1, n):
# i ve j arasında paylaşılan rastgele maske
mask = rng.normal(0, 1, dim)
masked[i] += mask
masked[j] -= mask
# Sunucu: maskeli değerleri topla
total = sum(masked) # = sum(gradients) + tüm maskeler iptal
return total / n
# Doğrulama
grads = [np.random.randn(10) for _ in range(5)]
true_avg = np.mean(grads, axis=0)
secure_avg = pairwise_masked_sum(grads)
print(f"Fark (sıfır olmalı): {np.max(np.abs(true_avg - secure_avg)):.8f}")
# ── Tenseal ile HE toplama ────────────────────────────────────
# pip install tenseal
try:
import tenseal as ts
ctx = ts.context(ts.SCHEME_TYPE.CKKS, poly_modulus_degree=8192,
coeff_mod_bit_sizes=[60, 40, 40, 60])
ctx.generate_galois_keys()
ctx.global_scale = 2**40
g1 = ts.ckks_vector(ctx, [1.0, 2.0, 3.0])
g2 = ts.ckks_vector(ctx, [4.0, 5.0, 6.0])
encrypted_sum = g1 + g2 # şifreli toplama
result = encrypted_sum.decrypt()
print(f"HE toplama: {result}") # [5, 7, 9]
except ImportError:
print("tenseal kurulu değil: pip install tenseal")
07 Personalized FL — FedProx, pFedMe & Local Fine-tuning
Tek global model tüm istemciler için optimal değildir; kişiselleştirilmiş FL her istemciye adaptif model sağlar.
FedProx, FedAvg'ı yerel kayıp fonksiyonuna proksimal terim ekleyerek genişletir: F_k(θ) + (μ/2)||θ - θ_global||². Bu terim yerel modelin globalden çok uzaklaşmasını engeller ve client drift'i azaltır. pFedMe, her istemci için ayrı kişisel parametre öğrenir ve Moreau-Yosida düzenlileştirmesiyle global modelle bağ kurar. Local Fine-tuning (Meta-Learning ile): global model, birkaç gradient adımıyla hızlı kişiselleştirmeye imkan tanıyacak şekilde MAML benzeri meta-objective ile eğitilir.
import torch, torch.nn as nn
import flwr as fl
import numpy as np
from collections import OrderedDict
# ── FedProx istemcisi ────────────────────────────────────────
class FedProxClient(fl.client.NumPyClient):
def __init__(self, model, train_loader, val_loader,
mu: float = 0.01):
self.model = model
self.train_loader = train_loader
self.val_loader = val_loader
self.mu = mu
self.global_params = None
def get_parameters(self, config):
return [v.cpu().numpy()
for v in self.model.state_dict().values()]
def set_parameters(self, params):
state = OrderedDict({
k: torch.tensor(v)
for k, v in zip(self.model.state_dict().keys(), params)
})
self.model.load_state_dict(state)
self.global_params = [p.clone().detach()
for p in self.model.parameters()]
def fit(self, parameters, config):
self.set_parameters(parameters)
opt = torch.optim.SGD(self.model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
self.model.train()
for _ in range(3):
for X, y in self.train_loader:
opt.zero_grad()
loss = criterion(self.model(X), y)
# FedProx proximal term
prox = sum(
((p - g)**2).sum()
for p, g in zip(self.model.parameters(),
self.global_params)
)
(loss + (self.mu / 2) * prox).backward()
opt.step()
n = len(self.train_loader.dataset)
return self.get_parameters(config={}), n, {}
def evaluate(self, parameters, config):
self.set_parameters(parameters)
self.model.eval()
criterion = nn.CrossEntropyLoss()
loss = correct = total = 0
with torch.no_grad():
for X, y in self.val_loader:
out = self.model(X)
loss += criterion(out, y).item()
correct += (out.argmax(1) == y).sum().item()
total += len(y)
return loss, total, {"accuracy": correct/total}
08 Üretim — Gerçek Cihazlar: Android/Raspberry Pi & TFLite
Simülasyondan üretime geçiş, Android cihazlar veya Raspberry Pi gibi sınırlı donanım üzerinde Flower istemcisi ve TFLite modeliyle gerçekleştirilir.
Gerçek cihaz FL'de ek zorluklar ortaya çıkar: sistem heterojenliği — farklı donanım gücü, batarya ve bant genişliği; müsaitlik — cihazlar her zaman çevrimiçi değil; model boyutu — mobil cihazlara büyük model gönderilemez. Flower, gRPC üzerinden gerçek cihaz bağlantısını destekler. TFLite modelleri quantization ile boyutunu 2-4x küçültür.
# Raspberry Pi / edge device Flower istemcisi
# pip install flwr tflite-runtime numpy
import flwr as fl
import numpy as np
import tflite_runtime.interpreter as tflite
class EdgeClient(fl.client.NumPyClient):
def __init__(self, model_path: str, data_path: str):
self.model_path = model_path
self.X_train, self.y_train = np.load(data_path, allow_pickle=True)
def _get_interpreter(self):
interp = tflite.Interpreter(model_path=self.model_path)
interp.allocate_tensors()
return interp
def get_parameters(self, config):
interp = self._get_interpreter()
tensors = []
for detail in interp.get_tensor_details():
if "weight" in detail["name"]:
tensors.append(interp.tensor(detail["index"])())
return tensors
def fit(self, parameters, config):
# TFLite on-device training (TF Lite Training API)
# Basit örnek: gradient update simülasyonu
n = len(self.X_train)
print(f"Yerel eğitim: {n} örnek")
return parameters, n, {"platform": "raspberry_pi"}
def evaluate(self, parameters, config):
interp = self._get_interpreter()
input_idx = interp.get_input_details()[0]["index"]
output_idx = interp.get_output_details()[0]["index"]
correct = 0
for x, y in zip(self.X_train, self.y_train):
interp.set_tensor(input_idx, x[np.newaxis])
interp.invoke()
pred = interp.get_tensor(output_idx).argmax()
correct += (pred == y)
acc = correct / len(self.y_train)
return 0.0, len(self.y_train), {"accuracy": acc}
# Sunucuya bağlan
# python raspberry_pi_client.py --server_address 192.168.1.100:8080
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--server_address", default="127.0.0.1:8080")
args = parser.parse_args()
fl.client.start_client(
server_address=args.server_address,
client=EdgeClient("model.tflite", "local_data.npy"),
)
Gerçek FL deployment checklist: (1) TLS ile şifreli iletişim. (2) Cihaz kimlik doğrulama. (3) Model boyutu sıkıştırma (quantization). (4) Async FL — çevrimdışı cihazlar beklenmez. (5) Merkezi izleme — her tur metriklerini kaydedin. Flower'ın SuperLink/SuperNode mimarisi bu ölçeklenebilir deployment için tasarlanmıştır.