Veri Mühendisliği
RehberVeri Mühendisliği02 · Depolama

Feature Store
Feast & Online/Offline.

Training/serving skew sorununu çöz, Feast ile feature view tanımla, batch materializasyondan online serving'e, time-travel query'den feature monitoring'e tam rehber.

00 Feature Store Nedir?

Feature store, ML feature'larının merkezi deposudur. Training ve serving sırasında aynı feature hesaplamasının kullanılmasını garanti eder.

Training/serving skew problemi: model eğitiminde kullanılan feature ile production'da hesaplanan feature farklı olduğunda model performansı dramatik düşer. Örneğin eğitimde "son 7 günlük ortalama sipariş değeri" pandas ile hesaplanırken, serving'de aynı metrik SQL ile farklı şekilde hesaplanıyorsa skew oluşur.

Sorun  Training:  pandas groupby → 7 günlük ortalama = 42.5
Sorun  Serving:   SQL AVG() → 7 günlük ortalama = 39.1  ← FARK!
Çözüm  Feature Store → tek tanım, iki ortamda da aynı değer

Point-in-time correctness: Bir modeli geçmiş tarihlerdeki verilerle yeniden eğitirken, her training örneği için yalnızca o tarihe kadar bilinen feature değerleri kullanılmalıdır. Bu kural ihlal edilirse data leakage oluşur.

Feature Store OlmadanFeature Store İle
Her takım kendi feature hesaplama kodunu yazarMerkezi feature registry — bir kez tanımla, her yerde kullan
Training/serving skew riski yüksekAynı feature pipeline her iki ortamda da çalışır
Point-in-time correctness manuel sağlanırOtomatik time-travel query desteği
Feature yeniden kullanımı zorFeature keşif ve yeniden kullanım araçları

01 Feast Mimarisi

Feast açık kaynak feature store'un üç temel bileşeni: registry, offline store ve online store.

RegistryFeature tanımlarının (FeatureView, Entity, DataSource) merkezi depo. Yerel dosya, S3, GCS veya SQL DB olabilir.
Offline StoreTarihsel feature verilerinin tutulduğu yer. BigQuery, Redshift, Spark, DuckDB, Parquet (yerel) desteği var. Training için kullanılır.
Online StoreDüşük latency (milisaniye) ile son feature değerlerinin sunulduğu yer. Redis, DynamoDB, SQLite, Datastore desteği var. Serving için kullanılır.
MaterializationOffline store'dan online store'a feature değerlerini kopyalama işlemi. feast materialize-incremental ile çalışır.
Feature ServerREST API ile online feature serving. feast serve ile başlatılır.
01 Tanım          → Entity, FeatureView, DataSource Python ile tanımla
02 feast apply    → Registry güncelle (S3/GCS/yerel)
03 feast materialize → Offline → Online store kopyala
04 Training       → get_historical_features() — point-in-time correct
05 Serving        → get_online_features() — düşük latency

02 Feast Kurulumu ve Feature View Tanımı

Yerel Parquet + SQLite ile geliştirme ortamı, production'da S3 + Redis kombinasyonu.

terminal
pip install feast[redis,aws]
feast init user_behavior_store
cd user_behavior_store
# Dizin yapısı:
# feature_store.yaml  — konfigürasyon
# data/               — yerel parquet (dev)
# feature_views.py    — feature tanımları
feature_store.yaml
project: user_behavior_store
registry: s3://my-feast-bucket/registry.pb
provider: aws
online_store:
  type: redis
  connection_string: "redis:6379"
offline_store:
  type: file   # geliştirme için; production'da bigquery veya redshift
entity_key_serialization_version: 2
feature_views.py
from datetime import timedelta
from feast import (
    Entity, FeatureView, Field, FileSource, ValueType
)
from feast.types import Float32, Int64, String

# ── Entity tanımı ───────────────────────────────────────────────────
user = Entity(
    name="user_id",
    value_type=ValueType.INT64,
    description="Kullanıcı benzersiz kimliği",
)

# ── Data Source ─────────────────────────────────────────────────────
user_events_source = FileSource(
    path="s3://feature-bucket/user_events/*.parquet",
    timestamp_field="event_timestamp",
    created_timestamp_column="created",
)

# ── Feature View ────────────────────────────────────────────────────
user_behavior_fv = FeatureView(
    name="user_behavior",
    entities=[user],
    ttl=timedelta(days=7),
    schema=[
        Field(name="total_purchases_7d",   dtype=Int64),
        Field(name="avg_order_value_7d",   dtype=Float32),
        Field(name="unique_products_7d",   dtype=Int64),
        Field(name="days_since_last_order", dtype=Int64),
        Field(name="preferred_category",    dtype=String),
    ],
    online=True,
    source=user_events_source,
    tags={"team": "ml-platform", "owner": "data-science"},
)

# ── Registry'ye kaydet ──────────────────────────────────────────────
# Terminal: feast apply

03 Batch Feature Materializasyonu

Offline store'dan online store'a feature kopyalama. İncrementel materializasyon ile yalnızca yeni veriler kopyalanır.

materialize.py
from feast import FeatureStore
from datetime import datetime, timedelta, timezone

store = FeatureStore(repo_path=".")

# ── Tam materializasyon (ilk kez veya backfill) ─────────────────────
store.materialize(
    start_date=datetime(2026, 1, 1, tzinfo=timezone.utc),
    end_date=datetime(2026, 4, 12, tzinfo=timezone.utc),
)

# ── İncrementel (her gün cron ile çalıştır) ─────────────────────────
store.materialize_incremental(
    end_date=datetime.now(tz=timezone.utc),
)

# Airflow ile günlük schedule:
# feast materialize-incremental $(date -u +%Y-%m-%dT%H:%M:%S)
TTL Ayarı: FeatureView'da ttl=timedelta(days=7) ile 7 günden eski feature'lar online store'da geçersiz sayılır. Online store'da yer kaplamaz. TTL, modelin stale feature'lara karşı direncini ayarlar.

04 Online Serving

Düşük latency ile production'da feature okuma. Redis online store ile P99 < 10ms hedeflenebilir.

inference_service.py
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

def get_user_features(user_ids: list[int]) -> pd.DataFrame:
    """Verilen user_id listesi için online feature'ları çek."""
    entity_rows = [{"user_id": uid} for uid in user_ids]

    feature_vector = store.get_online_features(
        features=[
            "user_behavior:total_purchases_7d",
            "user_behavior:avg_order_value_7d",
            "user_behavior:unique_products_7d",
            "user_behavior:days_since_last_order",
            "user_behavior:preferred_category",
        ],
        entity_rows=entity_rows,
    )
    return feature_vector.to_df()

# ── FastAPI ile entegrasyon ─────────────────────────────────────────
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np, joblib

app = FastAPI()
model = joblib.load("churn_model.pkl")

class PredictRequest(BaseModel):
    user_ids: list[int]

@app.post("/predict/churn")
async def predict_churn(req: PredictRequest):
    features_df = get_user_features(req.user_ids)
    feature_cols = [
        "total_purchases_7d", "avg_order_value_7d",
        "unique_products_7d", "days_since_last_order",
    ]
    X = features_df[feature_cols].fillna(0).values
    probs = model.predict_proba(X)[:, 1]
    return {
        "predictions": [
            {"user_id": uid, "churn_prob": float(p)}
            for uid, p in zip(req.user_ids, probs)
        ]
    }

05 Time-Travel Query (as_of)

Model yeniden eğitimi sırasında data leakage'ı önlemek için her training örneği o tarihteki feature değerleriyle eşleştirilmelidir.

training_dataset.py
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

# ── Entity DataFrame: her satırda user_id + gözlem zamanı ───────────
entity_df = pd.DataFrame({
    "user_id": [101, 102, 103, 104, 105],
    "event_timestamp": pd.to_datetime([
        "2026-01-15", "2026-02-03", "2026-02-20",
        "2026-03-10", "2026-04-01",
    ]),
    "label": [1, 0, 1, 0, 1],  # churn etiketi
})

# ── get_historical_features: her satır için as_of zamanı kullanır ───
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_behavior:total_purchases_7d",
        "user_behavior:avg_order_value_7d",
        "user_behavior:unique_products_7d",
        "user_behavior:days_since_last_order",
    ],
).to_df()

print(training_df.head())
# Her satır, event_timestamp tarihindeki feature değerlerini içerir
# Bu point-in-time correctness garantisidir — data leakage yok

# ── Model eğitimi ────────────────────────────────────────────────────
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
import joblib

feature_cols = [
    "total_purchases_7d", "avg_order_value_7d",
    "unique_products_7d", "days_since_last_order",
]
X = training_df[feature_cols].fillna(0)
y = training_df["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
clf = GradientBoostingClassifier(n_estimators=100, max_depth=4)
clf.fit(X_train, y_train)
joblib.dump(clf, "churn_model.pkl")

06 Feature Reuse ve Keşif

Feature store'un en büyük değerlerinden biri: bir takımın ürettiği feature'ı diğer takımlar bulup yeniden kullanabilir.

feature_discovery.py
from feast import FeatureStore

store = FeatureStore(repo_path=".")

# ── Tüm feature view'ları listele ───────────────────────────────────
for fv in store.list_feature_views():
    print(f"FeatureView: {fv.name}")
    print(f"  Entity: {fv.entities}")
    print(f"  TTL: {fv.ttl}")
    print(f"  Tags: {fv.tags}")
    for field in fv.schema:
        print(f"    - {field.name}: {field.dtype}")

# ── Belirli bir feature view'ı detaylandır ──────────────────────────
fv = store.get_feature_view("user_behavior")
print(f"Owner: {fv.tags.get('owner', 'bilinmiyor')}")
print(f"Kaynak: {fv.source.path}")

# ── Data source üzerindeki feature'ları bul ─────────────────────────
for ds in store.list_data_sources():
    print(f"DataSource: {ds.name} — {ds.timestamp_field}")
Feature Documentation: Her FeatureView ve Field için description parametresi doldurun. Feast UI (feast ui komutu) veya Amundsen/DataHub gibi data catalog araçlarıyla feature registry entegre edilebilir.

07 Hopsworks ve Tecton Karşılaştırma

Farklı feature store çözümlerinin güçlü ve zayıf yanları.

KriterFeastHopsworksTecton
LisansApache 2.0 (açık kaynak)AGPL / EnterpriseSaaS (kapalı kaynak)
DeploymentSelf-hosted (k8s, yerel)Self-hosted / CloudFully managed SaaS
StreamingSınırlı (Kafka connector)Native streaming (Flink)Native streaming
Feature TransformDışarıda yapılırDahili Spark/PythonDahili (declarative)
MonitoringHarici entegrasyon gerekirDahili drift detectionDahili monitoring
MaliyetÜcretsiz (altyapı maliyeti)OrtaYüksek
Uygun SenaryoBaşlangıç, maliyet odaklıOn-premise, veri gizliliğiEnterprise, tam yönetimli

08 Feature Monitoring ve Practical

Drift detection, freshness kontrolü ve Feast + model eğitimi + gerçek zamanlı servis bütünleşik pipeline.

Feature Monitoring

Data DriftFeature dağılımının zaman içinde değişmesi. KL divergence, PSI (Population Stability Index) veya Kolmogorov-Smirnov testi ile tespit.
FreshnessOnline store'daki feature değerlerinin ne kadar güncel olduğu. TTL aşılmış entity'ler stale sayılır.
Missing Rateget_online_features() çağrısında null dönen entity yüzdesi. Yüksekse materializasyon veya kaynak sorun yaşıyor.
feature_monitor.py — drift detection
import numpy as np
from scipy import stats
from feast import FeatureStore
import pandas as pd

store = FeatureStore(repo_path=".")

def compute_psi(expected: np.ndarray, actual: np.ndarray, buckets: int = 10) -> float:
    """Population Stability Index — PSI < 0.1: stabil, 0.1–0.25: değişim var, > 0.25: kritik"""
    bins = np.percentile(expected, np.linspace(0, 100, buckets + 1))
    bins[0]  = -np.inf
    bins[-1] =  np.inf
    expected_pcts = np.histogram(expected, bins=bins)[0] / len(expected)
    actual_pcts   = np.histogram(actual,   bins=bins)[0] / len(actual)
    expected_pcts = np.clip(expected_pcts, 1e-4, None)
    actual_pcts   = np.clip(actual_pcts,   1e-4, None)
    return np.sum((actual_pcts - expected_pcts) * np.log(actual_pcts / expected_pcts))

def monitor_feature_drift(
    feature_name: str,
    baseline_df: pd.DataFrame,
    current_df: pd.DataFrame,
    threshold: float = 0.25,
):
    psi = compute_psi(baseline_df[feature_name].dropna().values,
                       current_df[feature_name].dropna().values)
    status = "OK" if psi < 0.1 else ("UYARI" if psi < threshold else "KRİTİK")
    print(f"{feature_name}: PSI={psi:.4f} [{status}]")
    if status == "KRİTİK":
        alert_slack(f"Feature drift alarm: {feature_name} PSI={psi:.4f}")
    return psi

def alert_slack(message: str):
    import requests
    requests.post(
        "https://hooks.slack.com/services/YOUR/WEBHOOK",
        json={"text": message}
    )
End-to-End Özet: 1) feature_views.py ile feature tanımla → 2) feast apply → 3) feast materialize-incremental (günlük cron) → 4) get_historical_features() ile training set oluştur → 5) modeli eğit → 6) get_online_features() ile serving → 7) PSI monitörü ile drift takip et.