00 Batch vs Streaming
Batch processing verinin tamamını bekler; streaming ise veri geldiği anda işler. Hangi yaklaşımın ne zaman kullanılacağını belirlemek sistem tasarımının temelidir.
Batch işleme, büyük veri kümelerini topluca işler: nightly model retraining, haftalık rapor üretimi, büyük ETL dönüşümleri. Basit, debug edilebilir ve yüksek throughput sunar. Ancak gecikme kaçınılmazdır; en iyi durumda bir sonraki batch'e kadar beklenir.
Streaming işleme, olaylar geldiği anda milisaniyeler içinde işler. Gerçek zamanlı fraud detection, canlı öneri sistemleri, IoT sensör anomali tespiti gibi gecikmenin iş sonucunu doğrudan etkilediği senaryolarda zorunludur.
"Gecikme toleransınız nedir?" sorusu mimari seçimini belirler. Saatler: pure batch. Dakikalar: micro-batch (Spark Structured Streaming). Saniyeler veya altı: gerçek streaming (Flink, Kafka Streams).
01 Kafka Temelleri
Apache Kafka, dağıtık, yüksek verimli, dayanıklı bir mesaj kuyruğu ve event streaming platformudur.
Kafka'nın temel kavramları: Topic — mantıksal mesaj kanalı. Partition — topic'in paralel bölümü; her partition sıralı, değiştirilemez log'dur. Consumer Group — aynı topic'i farklı bölümlerden okuyan consumer koleksiyonu; paralel okuma sağlar. Offset — partition içindeki mesaj konumu; consumer'ın nerede kaldığını belirler.
# Docker Compose ile hızlı kurulum
cat > docker-compose.yml << EOF
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
EOF
docker-compose up -d
# Topic oluştur
docker exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic ml-events \
--partitions 6 \
--replication-factor 1
from kafka import KafkaProducer, KafkaConsumer
import json, time
# Producer: ML olaylarını Kafka'ya gönder
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None
)
for i in range(100):
event = {
"user_id": f"user_{i % 20}",
"features": [i * 0.1, i * 0.2, i * 0.3],
"timestamp": time.time()
}
producer.send("ml-events", key=event["user_id"], value=event)
producer.flush()
print("100 olay gönderildi")
# Consumer: olayları oku ve işle
consumer = KafkaConsumer(
"ml-events",
bootstrap_servers=["localhost:9092"],
group_id="ml-inference-group",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=True
)
for msg in consumer:
event = msg.value
# Gerçek uygulamada: model.predict(event["features"])
print(f"Partition:{msg.partition} Offset:{msg.offset} user:{event['user_id']}")
02 Flink ile Stateful Stream Processing
Apache Flink, tam olarak bir kez işleme semantiği ve zengin windowing desteğiyle üretim sınıfı stateful streaming sağlar.
Flink'in ML kullanım senaryoları: (1) Feature engineering — window aggregations ile rolling mean, std, count. (2) Anomali tespiti — öğrenilen modellerle gerçek zamanlı puanlama. (3) Model outputs'u sink'e yazma — Redis, Elasticsearch, veritabanı. (4) Concept drift tespiti — istatistik pencerelerinin karşılaştırılması.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
KafkaSource, KafkaOffsetsInitializer)
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.functions import MapFunction
import json
class MLInferenceMap(MapFunction):
"""Her Kafka mesajı için model inference uygular."""
def open(self, runtime_context):
# Model yükleme — her task worker'da bir kez yapılır
import pickle
with open("model.pkl", "rb") as f:
self.model = pickle.load(f)
def map(self, value):
event = json.loads(value)
feats = event["features"]
score = self.model.predict_proba([feats])[0][1]
result = {**event, "score": float(score)}
return json.dumps(result)
def build_pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("ml-events") \
.set_group_id("flink-ml-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
stream = env.from_source(
source, WatermarkStrategy.no_watermarks(), "kafka-source")
scored_stream = stream.map(MLInferenceMap())
# Sonuçları stdout'a yaz (prod'da Kafka sink veya Redis)
scored_stream.print()
env.execute("ML Inference Pipeline")
if __name__ == "__main__":
build_pipeline()
03 Bytewax — Python-Native Stream ML
Bytewax, Rust çekirdeği üzerine inşa edilmiş Python-native stream processing kütüphanesidir; ML pipeline'ları için Flink'e göre çok daha düşük giriş eşiği sunar.
Flink güçlü ama JVM ekosistemi gerektirir; Python ile çalışmak ek boilerplate ve sınırlı performans demektir. Bytewax tamamen Python'da yazılır, Rust ile hızlandırılır ve dataflow API'si ML pipeline'ları için optimize edilmiştir.
pip install bytewax kafka-python
from bytewax.dataflow import Dataflow
from bytewax.inputs import KafkaInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.execution import run_main
import json, numpy as np
# Basit anomali skoru: z-score tabanlı
running_stats = {"mean": 0.0, "var": 1.0, "count": 0}
def parse_event(key_bytes, value_bytes):
return "all", json.loads(value_bytes)
def score_event(event):
feats = np.array(event["features"])
val = feats.mean()
# Online Welford mean/var güncelleme
running_stats["count"] += 1
n = running_stats["count"]
delta = val - running_stats["mean"]
running_stats["mean"] += delta / n
running_stats["var"] = (running_stats["var"] * (n-1) + delta**2) / n
z_score = (val - running_stats["mean"]) / (running_stats["var"]**0.5 + 1e-9)
return {**event, "z_score": round(float(z_score), 3)}
def filter_anomaly(event):
return abs(event["z_score"]) > 2.5 # 2.5σ anomali eşiği
flow = Dataflow()
flow.input("kafka-in", KafkaInputConfig(
brokers=["localhost:9092"], topic="ml-events"))
flow.map(lambda x: parse_event(*x))
flow.map(lambda kv: (kv[0], score_event(kv[1])))
flow.filter(lambda kv: filter_anomaly(kv[1]))
flow.map(lambda kv: (kv[0], json.dumps(kv[1])))
flow.output("stdout", StdOutputConfig())
run_main(flow)
04 Online Learning — River ve Vowpal Wabbit
Online learning algoritmaları, her veri noktasını gördükten hemen sonra modeli günceller; sürekli değişen dağılımlar için batch retraining döngüsünden kaçınır.
River: Python'da online ML için kapsamlı kütüphane. Incremental decision tree, Hoeffding Tree, online linear modeller, drift detector'lar — hepsi tek API ile. Her örnek: model.learn_one(x, y) ile güncellenir, model.predict_one(x) ile tahmin döner.
Vowpal Wabbit (VW): Microsoft Research'ün online learning kütüphanesi. Milyarlarca örnekle çalışabilir, hashing trick ile çok yüksek boyutlu feature'ları işler. Reklam tıklama tahmini gibi large-scale online senaryolarda endüstri standardıdır.
from river import linear_model, preprocessing, metrics, stream, datasets
# Phishing website detection — online binary classification
dataset = datasets.Phishing()
model = preprocessing.StandardScaler() | linear_model.LogisticRegression(
optimizer=__import__("river.optim", fromlist=["SGD"]).SGD(lr=0.01))
metric = metrics.Accuracy()
rolling = metrics.Rolling(metrics.Accuracy(), window_size=500)
for i, (x, y) in enumerate(dataset):
y_pred = model.predict_one(x)
metric.update(y, y_pred)
rolling.update(y, y_pred)
model.learn_one(x, y) # önce predict, sonra learn (prequential evaluation)
if (i + 1) % 500 == 0:
print(f"[{i+1:5d}] Kümülatif:{metric.get():.3f} Rolling-500:{rolling.get():.3f}")
print(f"Final accuracy: {metric.get():.4f}")
05 Feature Freshness — Online Feature Store
Feature freshness, bir modelin inference sırasında kullandığı feature'ların ne kadar güncel olduğunu tanımlar; stale feature'lar prediction kalitesini doğrudan düşürür.
Klasik offline feature store'lar (Hive, S3) saatlik veya günlük batch güncellemelerle çalışır. Gerçek zamanlı uygulamalar için bu gecikme kabul edilemez. Online feature store, streaming pipeline'dan gelen güncel feature'ları milisaniyeler içinde sunabilmelidir.
import redis, json, time
from typing import Dict, Any
class OnlineFeatureStore:
"""Redis tabanlı online feature store. TTL ile otomatik stale temizleme."""
def __init__(self, host="localhost", port=6379, default_ttl=3600):
self.r = redis.Redis(host=host, port=port, decode_responses=True)
self.default_ttl = default_ttl
def write(self, entity_id: str, features: Dict[str, Any], ttl: int = None):
key = f"feat:{entity_id}"
payload = {**features, "_ts": time.time()}
self.r.setex(key, ttl or self.default_ttl, json.dumps(payload))
def read(self, entity_id: str) -> Dict[str, Any]:
key = f"feat:{entity_id}"
data = self.r.get(key)
if data is None:
return {}
return json.loads(data)
def freshness_seconds(self, entity_id: str) -> float:
feat = self.read(entity_id)
if not feat: return float("inf")
return time.time() - feat["_ts"]
# Kullanım örneği
fs = OnlineFeatureStore()
fs.write("user_123", {"age": 28, "avg_spend_7d": 142.5, "session_count": 7})
feats = fs.read("user_123")
print(f"Feature freshness: {fs.freshness_seconds('user_123'):.2f}s")
print(f"Features: {feats}")
06 Concept Drift Tespiti
Concept drift, veri dağılımı veya hedef ilişkisinin zamanla değişmesidir; tespit edilmezse model performansı sessizce bozulur.
Drift türleri: Data drift (covariate shift) — girdi dağılımı P(X) değişir; model aynı ama input farklı. Concept drift — P(Y|X) değişir; aynı özellikler artık farklı sonuçla ilişkilidir. Label drift — P(Y) değişir; sınıf dengesizliği kayar.
from river import drift
import numpy as np
# ADWIN (Adaptive Windowing) — hızlı drift tespiti
adwin = drift.ADWIN(delta=0.002)
# Page-Hinkley — gradual drift için
ph = drift.PageHinkley(min_instances=30, delta=0.005, threshold=50)
# DDM (Drift Detection Method) — hata oranı tabanlı
ddm = drift.DDM(warm_start=500)
# Simüle edilmiş stream: ilk 500 normal, sonra drift
rng = np.random.default_rng(42)
errors = []
for i in range(1000):
# 500. adımdan sonra hata oranı artar (concept drift simülasyonu)
error_prob = 0.1 if i < 500 else 0.4
error = int(rng.random() < error_prob)
errors.append(error)
adwin.update(error)
ddm.update(error)
if adwin.drift_detected:
print(f"[ADWIN] Drift tespit edildi! Adım: {i}")
if ddm.drift_detected:
print(f"[DDM] Drift tespit edildi! Adım: {i}")
print(f"İlk 500 adım hata oranı: {sum(errors[:500])/500:.3f}")
print(f"Son 500 adım hata oranı: {sum(errors[500:])/500:.3f}")
07 Lambda ve Kappa Mimarileri
Lambda ve Kappa, büyük ölçekli veri sistemlerinin nasıl organize edilmesi gerektiğine dair iki farklı mimari paradigmadır.
Lambda Mimarisi: Batch layer + Speed layer + Serving layer. Batch layer tüm veriyi yeniden işleyerek doğruluk sağlar; speed layer en son verileri düşük gecikmeyle işler; serving layer ikisini birleştirir. Güçlü ama iki farklı kod tabanı yönetimi gerektirir (batch + stream).
Kappa Mimarisi (Jay Kreps): Speed layer yeterlidir — her şeyi streaming ile işle, yeniden işleme gerektiğinde aynı pipeline'ı baştan çalıştır. Tek kod tabanı, daha basit operasyon. Ancak Kafka'da uzun süreli saklama veya başka bir immutable log gerektirir.
Modern sistemlerde Lambda ve Kappa ayrımı bulanıklaşıyor. Apache Flink'in unified batch+stream API'si, Delta Lake ve Iceberg'ün ACID stream ingestion desteği ile "streaming-first" yaklaşımı ön plana çıkıyor; batch recompute bir Flink job ile de yapılabiliyor.
08 Pratik: Kafka → Flink → Redis Online Inference Pipeline
Uçtan uca örnek: Kafka'dan gelen olayları Flink ile işleyip skorlayarak Redis'e yazma.
# Servisler
docker-compose up -d # kafka + zookeeper
docker run -d -p 6379:6379 redis:7-alpine
# Python bağımlılıkları
pip install kafka-python redis bytewax river scikit-learn
import threading, json, time, random
from kafka import KafkaProducer, KafkaConsumer
import redis
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
import numpy as np
# ── Global bileşenler ─────────────────────────────────────
rds = redis.Redis(host="localhost", port=6379, decode_responses=True)
clf = SGDClassifier(loss="log_loss", warm_start=True)
scaler = StandardScaler()
rng = np.random.default_rng(42)
# Modeli önceden ısıt
X_init = rng.normal(0, 1, (200, 4))
y_init = rng.integers(0, 2, 200)
scaler.fit(X_init)
clf.fit(scaler.transform(X_init), y_init)
# ── 1. Producer Thread ───────────────────────────────────
def produce_events(n=500, interval=0.05):
p = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode())
for i in range(n):
event = {
"user_id": f"u{i % 50}",
"features": rng.normal(0, 1, 4).tolist(),
"label": int(rng.integers(0, 2)),
"ts": time.time()
}
p.send("ml-events", event)
time.sleep(interval)
p.flush(); print("Producer tamamlandı")
# ── 2. Consumer / Inference Thread ───────────────────────
def consume_and_score():
c = KafkaConsumer(
"ml-events",
bootstrap_servers=["localhost:9092"],
group_id="online-inference",
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="earliest"
)
processed = 0
for msg in c:
event = msg.value
X = np.array(event["features"]).reshape(1, -1)
Xs = scaler.transform(X)
score = clf.predict_proba(Xs)[0][1]
# Sonucu Redis'e yaz (TTL: 5 dakika)
key = f"score:{event['user_id']}"
rds.setex(key, 300, json.dumps({
"score": round(float(score), 4),
"ts": event["ts"]
}))
# Online güncelleme (partial_fit)
clf.partial_fit(Xs, [event["label"]], classes=[0, 1])
processed += 1
if processed % 50 == 0:
print(f"İşlendi: {processed} | Son skor: {score:.3f}")
if processed >= 500: break
print("Consumer tamamlandı")
# Pipeline'ı başlat
t_prod = threading.Thread(target=produce_events)
t_cons = threading.Thread(target=consume_and_score)
t_prod.start(); t_cons.start()
t_prod.join(); t_cons.join()
# Redis'ten bir kullanıcı skoru oku
sample = rds.get("score:u0")
if sample:
print(f"u0 son skoru: {json.loads(sample)}")
Pipeline çalıştırıldığında producer 0.05s aralıklarla olay gönderir, consumer her olayı skorlar, Redis'e yazar ve modeli partial_fit ile günceller. 500 olay sonunda Redis'te her kullanıcı için güncel skor bulunur. Gerçek production'da Flink veya Bytewax consumer yerine kullanılır; bu örnek end-to-end kavramı gösterir.