00 Feature Store Nedir?
Feature store, ML feature'larının merkezi deposudur. Training ve serving sırasında aynı feature hesaplamasının kullanılmasını garanti eder.
Training/serving skew problemi: model eğitiminde kullanılan feature ile production'da hesaplanan feature farklı olduğunda model performansı dramatik düşer. Örneğin eğitimde "son 7 günlük ortalama sipariş değeri" pandas ile hesaplanırken, serving'de aynı metrik SQL ile farklı şekilde hesaplanıyorsa skew oluşur.
Sorun Training: pandas groupby → 7 günlük ortalama = 42.5 Sorun Serving: SQL AVG() → 7 günlük ortalama = 39.1 ← FARK! Çözüm Feature Store → tek tanım, iki ortamda da aynı değer
Point-in-time correctness: Bir modeli geçmiş tarihlerdeki verilerle yeniden eğitirken, her training örneği için yalnızca o tarihe kadar bilinen feature değerleri kullanılmalıdır. Bu kural ihlal edilirse data leakage oluşur.
| Feature Store Olmadan | Feature Store İle |
|---|---|
| Her takım kendi feature hesaplama kodunu yazar | Merkezi feature registry — bir kez tanımla, her yerde kullan |
| Training/serving skew riski yüksek | Aynı feature pipeline her iki ortamda da çalışır |
| Point-in-time correctness manuel sağlanır | Otomatik time-travel query desteği |
| Feature yeniden kullanımı zor | Feature keşif ve yeniden kullanım araçları |
01 Feast Mimarisi
Feast açık kaynak feature store'un üç temel bileşeni: registry, offline store ve online store.
01 Tanım → Entity, FeatureView, DataSource Python ile tanımla 02 feast apply → Registry güncelle (S3/GCS/yerel) 03 feast materialize → Offline → Online store kopyala 04 Training → get_historical_features() — point-in-time correct 05 Serving → get_online_features() — düşük latency
02 Feast Kurulumu ve Feature View Tanımı
Yerel Parquet + SQLite ile geliştirme ortamı, production'da S3 + Redis kombinasyonu.
pip install feast[redis,aws]
feast init user_behavior_store
cd user_behavior_store
# Dizin yapısı:
# feature_store.yaml — konfigürasyon
# data/ — yerel parquet (dev)
# feature_views.py — feature tanımları
project: user_behavior_store
registry: s3://my-feast-bucket/registry.pb
provider: aws
online_store:
type: redis
connection_string: "redis:6379"
offline_store:
type: file # geliştirme için; production'da bigquery veya redshift
entity_key_serialization_version: 2
from datetime import timedelta
from feast import (
Entity, FeatureView, Field, FileSource, ValueType
)
from feast.types import Float32, Int64, String
# ── Entity tanımı ───────────────────────────────────────────────────
user = Entity(
name="user_id",
value_type=ValueType.INT64,
description="Kullanıcı benzersiz kimliği",
)
# ── Data Source ─────────────────────────────────────────────────────
user_events_source = FileSource(
path="s3://feature-bucket/user_events/*.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
# ── Feature View ────────────────────────────────────────────────────
user_behavior_fv = FeatureView(
name="user_behavior",
entities=[user],
ttl=timedelta(days=7),
schema=[
Field(name="total_purchases_7d", dtype=Int64),
Field(name="avg_order_value_7d", dtype=Float32),
Field(name="unique_products_7d", dtype=Int64),
Field(name="days_since_last_order", dtype=Int64),
Field(name="preferred_category", dtype=String),
],
online=True,
source=user_events_source,
tags={"team": "ml-platform", "owner": "data-science"},
)
# ── Registry'ye kaydet ──────────────────────────────────────────────
# Terminal: feast apply
03 Batch Feature Materializasyonu
Offline store'dan online store'a feature kopyalama. İncrementel materializasyon ile yalnızca yeni veriler kopyalanır.
from feast import FeatureStore
from datetime import datetime, timedelta, timezone
store = FeatureStore(repo_path=".")
# ── Tam materializasyon (ilk kez veya backfill) ─────────────────────
store.materialize(
start_date=datetime(2026, 1, 1, tzinfo=timezone.utc),
end_date=datetime(2026, 4, 12, tzinfo=timezone.utc),
)
# ── İncrementel (her gün cron ile çalıştır) ─────────────────────────
store.materialize_incremental(
end_date=datetime.now(tz=timezone.utc),
)
# Airflow ile günlük schedule:
# feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)
ttl=timedelta(days=7) ile 7 günden eski feature'lar online store'da geçersiz sayılır. Online store'da yer kaplamaz. TTL, modelin stale feature'lara karşı direncini ayarlar.
04 Online Serving
Düşük latency ile production'da feature okuma. Redis online store ile P99 < 10ms hedeflenebilir.
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
def get_user_features(user_ids: list[int]) -> pd.DataFrame:
"""Verilen user_id listesi için online feature'ları çek."""
entity_rows = [{"user_id": uid} for uid in user_ids]
feature_vector = store.get_online_features(
features=[
"user_behavior:total_purchases_7d",
"user_behavior:avg_order_value_7d",
"user_behavior:unique_products_7d",
"user_behavior:days_since_last_order",
"user_behavior:preferred_category",
],
entity_rows=entity_rows,
)
return feature_vector.to_df()
# ── FastAPI ile entegrasyon ─────────────────────────────────────────
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np, joblib
app = FastAPI()
model = joblib.load("churn_model.pkl")
class PredictRequest(BaseModel):
user_ids: list[int]
@app.post("/predict/churn")
async def predict_churn(req: PredictRequest):
features_df = get_user_features(req.user_ids)
feature_cols = [
"total_purchases_7d", "avg_order_value_7d",
"unique_products_7d", "days_since_last_order",
]
X = features_df[feature_cols].fillna(0).values
probs = model.predict_proba(X)[:, 1]
return {
"predictions": [
{"user_id": uid, "churn_prob": float(p)}
for uid, p in zip(req.user_ids, probs)
]
}
05 Time-Travel Query (as_of)
Model yeniden eğitimi sırasında data leakage'ı önlemek için her training örneği o tarihteki feature değerleriyle eşleştirilmelidir.
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
# ── Entity DataFrame: her satırda user_id + gözlem zamanı ───────────
entity_df = pd.DataFrame({
"user_id": [101, 102, 103, 104, 105],
"event_timestamp": pd.to_datetime([
"2026-01-15", "2026-02-03", "2026-02-20",
"2026-03-10", "2026-04-01",
]),
"label": [1, 0, 1, 0, 1], # churn etiketi
})
# ── get_historical_features: her satır için as_of zamanı kullanır ───
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"user_behavior:total_purchases_7d",
"user_behavior:avg_order_value_7d",
"user_behavior:unique_products_7d",
"user_behavior:days_since_last_order",
],
).to_df()
print(training_df.head())
# Her satır, event_timestamp tarihindeki feature değerlerini içerir
# Bu point-in-time correctness garantisidir — data leakage yok
# ── Model eğitimi ────────────────────────────────────────────────────
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
import joblib
feature_cols = [
"total_purchases_7d", "avg_order_value_7d",
"unique_products_7d", "days_since_last_order",
]
X = training_df[feature_cols].fillna(0)
y = training_df["label"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
clf = GradientBoostingClassifier(n_estimators=100, max_depth=4)
clf.fit(X_train, y_train)
joblib.dump(clf, "churn_model.pkl")
06 Feature Reuse ve Keşif
Feature store'un en büyük değerlerinden biri: bir takımın ürettiği feature'ı diğer takımlar bulup yeniden kullanabilir.
from feast import FeatureStore
store = FeatureStore(repo_path=".")
# ── Tüm feature view'ları listele ───────────────────────────────────
for fv in store.list_feature_views():
print(f"FeatureView: {fv.name}")
print(f" Entity: {fv.entities}")
print(f" TTL: {fv.ttl}")
print(f" Tags: {fv.tags}")
for field in fv.schema:
print(f" - {field.name}: {field.dtype}")
# ── Belirli bir feature view'ı detaylandır ──────────────────────────
fv = store.get_feature_view("user_behavior")
print(f"Owner: {fv.tags.get('owner', 'bilinmiyor')}")
print(f"Kaynak: {fv.source.path}")
# ── Data source üzerindeki feature'ları bul ─────────────────────────
for ds in store.list_data_sources():
print(f"DataSource: {ds.name} — {ds.timestamp_field}")
description parametresi doldurun. Feast UI (feast ui komutu) veya Amundsen/DataHub gibi data catalog araçlarıyla feature registry entegre edilebilir.
07 Hopsworks ve Tecton Karşılaştırma
Farklı feature store çözümlerinin güçlü ve zayıf yanları.
| Kriter | Feast | Hopsworks | Tecton |
|---|---|---|---|
| Lisans | Apache 2.0 (açık kaynak) | AGPL / Enterprise | SaaS (kapalı kaynak) |
| Deployment | Self-hosted (k8s, yerel) | Self-hosted / Cloud | Fully managed SaaS |
| Streaming | Sınırlı (Kafka connector) | Native streaming (Flink) | Native streaming |
| Feature Transform | Dışarıda yapılır | Dahili Spark/Python | Dahili (declarative) |
| Monitoring | Harici entegrasyon gerekir | Dahili drift detection | Dahili monitoring |
| Maliyet | Ücretsiz (altyapı maliyeti) | Orta | Yüksek |
| Uygun Senaryo | Başlangıç, maliyet odaklı | On-premise, veri gizliliği | Enterprise, tam yönetimli |
08 Feature Monitoring ve Practical
Drift detection, freshness kontrolü ve Feast + model eğitimi + gerçek zamanlı servis bütünleşik pipeline.
Feature Monitoring
import numpy as np
from scipy import stats
from feast import FeatureStore
import pandas as pd
store = FeatureStore(repo_path=".")
def compute_psi(expected: np.ndarray, actual: np.ndarray, buckets: int = 10) -> float:
"""Population Stability Index — PSI < 0.1: stabil, 0.1–0.25: değişim var, > 0.25: kritik"""
bins = np.percentile(expected, np.linspace(0, 100, buckets + 1))
bins[0] = -np.inf
bins[-1] = np.inf
expected_pcts = np.histogram(expected, bins=bins)[0] / len(expected)
actual_pcts = np.histogram(actual, bins=bins)[0] / len(actual)
expected_pcts = np.clip(expected_pcts, 1e-4, None)
actual_pcts = np.clip(actual_pcts, 1e-4, None)
return np.sum((actual_pcts - expected_pcts) * np.log(actual_pcts / expected_pcts))
def monitor_feature_drift(
feature_name: str,
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
threshold: float = 0.25,
):
psi = compute_psi(baseline_df[feature_name].dropna().values,
current_df[feature_name].dropna().values)
status = "OK" if psi < 0.1 else ("UYARI" if psi < threshold else "KRİTİK")
print(f"{feature_name}: PSI={psi:.4f} [{status}]")
if status == "KRİTİK":
alert_slack(f"Feature drift alarm: {feature_name} PSI={psi:.4f}")
return psi
def alert_slack(message: str):
import requests
requests.post(
"https://hooks.slack.com/services/YOUR/WEBHOOK",
json={"text": message}
)