Veri Mühendisliği
RehberVeri Mühendisliği04 · Depolama

Veri Versiyonlama
DVC & LakeFS.

ML reproducibility için DVC ile veri ve pipeline versiyonlama, LakeFS ile data lake üzerinde Git semantiği ve data contract yönetimi.

00 Neden Veri Versiyonlama?

Kod versiyonlama çözülmüş bir problem; veri versiyonlama hâlâ ML'in en büyük acı noktalarından biri.

Bir modeli 6 ay sonra yeniden üretmeye çalıştığınızda şu sorularla karşılaşırsınız: "Hangi veri setiyle eğittim?", "O sırada feature engineering kodu ne durumdaydı?", "Preprocessing hangi parametrelerle çalışmıştı?" Bu soruların yanıtını veri versiyonlaması olmadan bulmak imkânsızdır.

ProblemDVC Çözümü
Git büyük dosyaları depolayamazDVC pointer dosyaları (.dvc) Git'te, gerçek veri remote'ta
Dataset audit trail yokHer dvc commit hash'i veri snapshot'ı temsil eder
Pipeline adımları arasındaki bağımlılık belirsizdvc.yaml ile deps/outs açıkça tanımlanır
Deney parametreleri kaybolurdvc params diff ile karşılaştırma
Data lake üzerinde branch/merge yokLakeFS ile S3/GCS üzerinde Git-like işlemler
DVC vs LakeFS farkı: DVC, ML proje tabanlıdır — bir Git repoyla birlikte çalışır, ML pipeline'larını ve dataset'leri versiyonlar. LakeFS, data lake katmanına oturur — tüm S3/GCS bucket'ını versiyonlu bir depo gibi yönetir; birden fazla takımın ve pipeline'ın kullandığı veri gölü için uygundur.

01 DVC Kurulumu ve Temel Komutlar

DVC, mevcut Git reposuna eklenir. Temel iş akışı git komutlarına paralel tasarlanmıştır.

terminal — DVC kurulum ve başlatma
# Kurulum
pip install "dvc[s3]"          # S3 remote için
# pip install "dvc[gs]"       # GCS remote için
# pip install "dvc[azure]"    # Azure Blob için
# pip install "dvc[ssh]"      # SSH remote için

# Git repo içinde başlat
git init my-ml-project
cd my-ml-project
dvc init
git add .dvc .dvcignore
git commit -m "DVC başlatıldı"
terminal — temel DVC iş akışı
# Veri ekle (büyük dosyayı DVC takibine al)
dvc add data/train.csv
# Oluşturulur: data/train.csv.dvc (pointer) ve data/.gitignore
git add data/train.csv.dvc data/.gitignore
git commit -m "Eğitim verisi v1 eklendi"

# Remote'a yükle
dvc push

# Başka makinede çek
git clone https://github.com/user/my-ml-project
dvc pull

# Belirli bir versiyona git
git checkout v1.0
dvc checkout          # veriyi de git versiyonuyla eşleştir

# Veri durumunu kontrol et
dvc status
dvc diff HEAD~1       # önceki commit ile fark

data/train.csv.dvc dosyası içeriği: MD5 hash, boyut ve remote path bilgisi içerir. Bu küçük pointer dosyası Git'te takip edilirken gerçek veri remote storage'da kalır.

data/train.csv.dvc — pointer dosyası örneği
outs:
- md5: a3b4c5d6e7f8...
  size: 524288000
  nfiles: null
  path: train.csv

02 DVC Remote Storage

DVC remote, büyük dosyaların gerçekten saklandığı yerdir. S3, GCS, Azure, SSH ve yerel dosya sistemi desteklenir.

terminal — remote tanımı
# S3 remote
dvc remote add myremote s3://my-dvc-bucket/datasets
dvc remote modify myremote region us-east-1
dvc remote modify myremote profile default   # AWS profile

# Google Cloud Storage
dvc remote add gcs-remote gs://my-gcs-bucket/dvc-store

# Azure Blob Storage
dvc remote add azure-remote azure://mycontainer/dvc
dvc remote modify azure-remote account_name myaccount

# Varsayılan remote ayarla
dvc remote default myremote

# .dvc/config dosyasına kaydedilir — git commit!
git add .dvc/config
git commit -m "DVC S3 remote konfigürasyonu"
.dvc/config — oluşan konfigürasyon
[core]
    remote = myremote
['remote "myremote"']
    url = s3://my-dvc-bucket/datasets
    region = us-east-1

03 dvc.yaml ile Pipeline

dvc.yaml, pipeline adımlarını, bağımlılıklarını ve çıktılarını tanımlar. Makefile'a benzer ancak veri farkındalığına sahiptir.

dvc.yaml
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw/images/         # dizin bağımlılığı
    params:
      - prepare.split_ratio
      - prepare.random_seed
    outs:
      - data/prepared/train/
      - data/prepared/val/
      - data/prepared/test/

  featurize:
    cmd: python src/featurize.py
    deps:
      - src/featurize.py
      - data/prepared/train/
    params:
      - featurize.max_features
      - featurize.ngram_range
    outs:
      - data/features/train.npz
      - data/features/val.npz

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/train.npz
      - data/features/val.npz
    params:
      - train.n_estimators
      - train.max_depth
      - train.learning_rate
    outs:
      - models/model.pkl
    metrics:
      - metrics/train_metrics.json:
          cache: false

  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/model.pkl
      - data/features/val.npz
    metrics:
      - metrics/eval_metrics.json:
          cache: false
    plots:
      - plots/confusion_matrix.json:
          cache: false
params.yaml
prepare:
  split_ratio: 0.2
  random_seed: 42

featurize:
  max_features: 5000
  ngram_range: [1, 2]

train:
  n_estimators: 100
  max_depth: 6
  learning_rate: 0.1
terminal — pipeline çalıştırma
# Tüm pipeline'ı çalıştır (değişen adımlar atlanır)
dvc repro

# Belirli bir adımdan itibaren çalıştır
dvc repro train

# DAG görselleştir
dvc dag

# Metrics göster
dvc metrics show
dvc metrics diff main

04 DVC Experiment Tracking

MLflow veya W&B gibi ağır araçlara gerek kalmadan Git ve DVC ile deney takibi.

terminal — experiment workflow
# Yeni deney çalıştır (otomatik branch/stash oluşturur)
dvc exp run --name exp-lr-0.01 \
  --set-param train.learning_rate=0.01

dvc exp run --name exp-lr-0.05 \
  --set-param train.learning_rate=0.05

dvc exp run --name exp-depth-8 \
  --set-param train.max_depth=8

# Tüm deneyleri karşılaştır
dvc exp show

# Metrics ve params farkını göster
dvc params diff exp-lr-0.01 exp-depth-8

# En iyi deneyi kalıcı hale getir
dvc exp apply exp-depth-8
git add .
git commit -m "En iyi deney: max_depth=8, acc=0.923"

# Deneyleri push et (başkaları paylaşabilir)
dvc exp push origin exp-depth-8
src/train.py — metrics kaydetme
import json, yaml, joblib
from pathlib import Path
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import numpy as np

# Params yükle
with open("params.yaml") as f:
    params = yaml.safe_load(f)["train"]

# Veri yükle
X_train = np.load("data/features/train.npz")["X"]
y_train = np.load("data/features/train.npz")["y"]
X_val   = np.load("data/features/val.npz")["X"]
y_val   = np.load("data/features/val.npz")["y"]

# Model eğit
clf = GradientBoostingClassifier(
    n_estimators=params["n_estimators"],
    max_depth=params["max_depth"],
    learning_rate=params["learning_rate"],
    random_state=42,
)
clf.fit(X_train, y_train)

# Metrikleri kaydet
preds = clf.predict(X_val)
probs = clf.predict_proba(X_val)[:, 1]
metrics = {
    "accuracy": accuracy_score(y_val, preds),
    "f1":       f1_score(y_val, preds, average="weighted"),
    "auc_roc":  roc_auc_score(y_val, probs),
}
Path("metrics").mkdir(exist_ok=True)
with open("metrics/train_metrics.json", "w") as f:
    json.dump(metrics, f, indent=2)

# Modeli kaydet
Path("models").mkdir(exist_ok=True)
joblib.dump(clf, "models/model.pkl")
print(metrics)

05 LakeFS — Git Semantics for Data Lakes

LakeFS, S3/GCS/Azure Blob üzerinde Git benzeri branch, commit, merge ve revert işlemleri sağlar.

LakeFS, object storage önüne bir proxy katmanı olarak oturur. Mevcut Spark, Presto, Flink kod tabanı değiştirilmeden kullanılmaya devam eder — yalnızca endpoint URL güncellenir (s3:// yerine lakefs://).

Git      main ← feature/new-data ← hotfix/schema-fix
LakeFS   main ← experiment/v2    ← staging/etl-test

Commit   → değişmez snapshot (hash tabanlı)
Branch   → izole geliştirme ortamı (veri kopyalanmaz, pointer)
Merge    → atomik merge, conflict detection
Revert   → anlık geri alma
terminal — LakeFS kurulum (Docker)
docker run -p 8000:8000 -p 8001:8001 \
  -e LAKEFS_BLOCKSTORE_TYPE=local \
  -e LAKEFS_AUTH_ENCRYPT_SECRET_KEY=some-secret \
  treeverse/lakefs run

# Python client
pip install lakefs
lakefs_client.py
import lakefs

# ── Client konfigürasyonu ────────────────────────────────────────────
clt = lakefs.client.Client(
    host="http://localhost:8000",
    username="admin",
    password="admin-secret",
)

# ── Repository oluştur ───────────────────────────────────────────────
repo = lakefs.Repository("ml-datasets", client=clt).create(
    storage_namespace="s3://my-bucket/lakefs-store",
    default_branch="main",
)

# ── Branch oluştur ───────────────────────────────────────────────────
branch = repo.branch("experiment/new-features").create(source_reference="main")

# ── Dosya yükle ──────────────────────────────────────────────────────
with branch.object("data/processed/train_v2.parquet").writer() as writer:
    writer.write(open("train_v2.parquet", "rb").read())

# ── Commit ───────────────────────────────────────────────────────────
branch.commit(
    message="Yeni feature engineering v2 eklendi",
    metadata={"author": "data-eng", "rows": "1234567"},
)

06 LakeFS Branch & Merge Workflow

ETL test, veri kalite kontrol ve production merge iş akışı.

lakefs_workflow.py — ETL test ve merge
import lakefs
import polars as pl
from deltalake import write_deltalake

repo = lakefs.Repository("ml-datasets")

# ── ETL'i izole branch'te çalıştır ──────────────────────────────────
etl_branch = repo.branch("etl/daily-load-2026-04-12").create(source_reference="main")

try:
    # ETL çalıştır — lakefs:// endpoint kullan
    df = pl.read_parquet("s3://raw-bucket/events/2026-04-12/*.parquet")
    df_clean = df.filter(pl.col("revenue") >= 0)

    # LakeFS branch'e yaz (S3 uyumlu endpoint)
    write_deltalake(
        "s3://ml-datasets/etl/daily-load-2026-04-12/data/events",
        df_clean.to_arrow(),
        storage_options={
            "AWS_ENDPOINT_URL": "http://localhost:8000",
            "AWS_ACCESS_KEY_ID": "admin",
            "AWS_SECRET_ACCESS_KEY": "admin-secret",
        }
    )

    # Veri kalite kontrolü
    assert len(df_clean) > 10_000, "Yeterli veri yok!"
    assert df_clean["user_id"].null_count() == 0, "Null user_id var!"

    # Başarılı — main'e merge et
    etl_branch.commit(message="ETL 2026-04-12 tamamlandı")
    merge_result = etl_branch.merge_into("main")
    print(f"Merge başarılı: {merge_result}")

except Exception as e:
    # Başarısız — branch'i sil, main etkilenmez
    etl_branch.delete()
    raise RuntimeError(f"ETL başarısız, değişiklikler geri alındı: {e}")

# ── Diff (hangi dosyalar değişti?) ─────────────────────────────────
for diff in repo.ref("main").diff(other_ref="main~1"):
    print(f"{diff.type}: {diff.path}")

# ── Belirli versiyona revert ─────────────────────────────────────────
# Bozuk veri gelirse önceki commit'e dön:
# repo.branch("main").revert(reference="~3")

07 Data Contracts — Schema Registry

Producer ve consumer arasındaki schema anlaşması. Confluent Schema Registry, Avro ve Protobuf ile schema evrim kuralları.

Data ContractVeri üreticisi ve tüketicisi arasında imzalanan schema anlaşması. Hangi alanlar var, tipleri neler, hangi alanlar zorunlu — değişmez kurallar.
Schema RegistrySchema versiyonlarının merkezi deposu. Confluent Schema Registry Kafka mesajları için; AWS Glue Schema Registry S3/Kinesis için kullanılır.
AvroJSON tabanlı schema tanımı, binary serialization. Kafka ile doğal entegrasyon. Schema evolution kuralları strict.
ProtobufGoogle'ın binary format. Field number tabanlı evrim — eski consumer yeni field'ları görmezden gelir.
schema/user_event.avsc — Avro schema
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.company.events",
  "doc": "Kullanıcı davranış olayı",
  "fields": [
    {"name": "user_id",     "type": "long",   "doc": "Kullanıcı ID"},
    {"name": "event_type",  "type": "string", "doc": "Olay türü"},
    {"name": "ts",          "type": "long",   "logicalType": "timestamp-millis"},
    {"name": "revenue",     "type": ["null", "double"], "default": null},
    {"name": "product_id",  "type": ["null", "string"], "default": null}
  ]
}
schema_registry.py — Confluent Schema Registry
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

client = SchemaRegistryClient({"url": "http://schema-registry:8081"})

# ── Schema kaydet ────────────────────────────────────────────────────
with open("schema/user_event.avsc") as f:
    schema_str = f.read()

schema = Schema(schema_str, "AVRO")
schema_id = client.register_schema("user-events-value", schema)
print(f"Schema ID: {schema_id}")

# ── Uyumluluk kontrolü ───────────────────────────────────────────────
new_schema_str = """... yeni alan eklenmiş schema ..."""
compat = client.test_compatibility(
    subject_name="user-events-value",
    schema=Schema(new_schema_str, "AVRO")
)
print(f"Uyumlu mu? {compat}")

# ── Uyumluluk politikası ayarla ──────────────────────────────────────
# BACKWARD: yeni consumer eski mesajları okuyabilir (yeni alanlar optional)
# FORWARD:  eski consumer yeni mesajları okuyabilir
# FULL:     her ikisi de
client.update_compatibility("BACKWARD", subject_name="user-events-value")

08 Schema Evolution ve Practical

Değişmez schema kuralları ve DVC ile image classification dataset versiyonlama pipeline.

Schema Evolution Kuralları

DeğişiklikBACKWARDFORWARDFULL
Opsiyonel alan ekle (default var)UyumluUyumluUyumlu
Zorunlu alan ekle (default yok)UyumsuzUyumluUyumsuz
Alan sil (opsiyonel)UyumluUyumsuzUyumsuz
Alan tipini değiştirUyumsuzUyumsuzUyumsuz
Alan adını değiştirUyumsuzUyumsuzUyumsuz
terminal — DVC ile image classification dataset
# ── Proje yapısı ─────────────────────────────────────────────────────
# image-classifier/
#   data/raw/         ← ham görüntüler (DVC ile takip edilir)
#   data/processed/   ← boyutlandırılmış, augmented (DVC çıktısı)
#   models/           ← eğitilmiş model (DVC çıktısı)
#   params.yaml       ← hyperparameters (Git ile takip)
#   dvc.yaml          ← pipeline tanımı (Git ile takip)

# v1: 10k görüntü, ResNet18
dvc add data/raw/images_v1/
git add data/raw/images_v1.dvc
git commit -m "Dataset v1: 10k görüntü eklendi"
git tag dataset-v1
dvc push

# v2: 25k görüntü, ek sınıflar
dvc add data/raw/images_v2/
git add data/raw/images_v2.dvc
git commit -m "Dataset v2: 25k görüntü, 3 yeni sınıf"
git tag dataset-v2
dvc push

# v1 verisiyle model reproducibility
git checkout dataset-v1
dvc checkout        # veriyi de v1'e döndürür
dvc repro           # pipeline'ı v1 veri + o zamanki params ile çalıştır
Reproducibility Garantisi: Her Git commit hash'i + DVC commit hash'i birlikte tam bir ML deney kaydıdır. Git hash kodu ve pipeline tanımını; DVC hash veriye ve model ağırlıklarına işaret eder. Bu ikilinin saklanması gelecekte tam reproduciblity sağlar.