00 Neden Veri Versiyonlama?
Kod versiyonlama çözülmüş bir problem; veri versiyonlama hâlâ ML'in en büyük acı noktalarından biri.
Bir modeli 6 ay sonra yeniden üretmeye çalıştığınızda şu sorularla karşılaşırsınız: "Hangi veri setiyle eğittim?", "O sırada feature engineering kodu ne durumdaydı?", "Preprocessing hangi parametrelerle çalışmıştı?" Bu soruların yanıtını veri versiyonlaması olmadan bulmak imkânsızdır.
| Problem | DVC Çözümü |
|---|---|
| Git büyük dosyaları depolayamaz | DVC pointer dosyaları (.dvc) Git'te, gerçek veri remote'ta |
| Dataset audit trail yok | Her dvc commit hash'i veri snapshot'ı temsil eder |
| Pipeline adımları arasındaki bağımlılık belirsiz | dvc.yaml ile deps/outs açıkça tanımlanır |
| Deney parametreleri kaybolur | dvc params diff ile karşılaştırma |
| Data lake üzerinde branch/merge yok | LakeFS ile S3/GCS üzerinde Git-like işlemler |
01 DVC Kurulumu ve Temel Komutlar
DVC, mevcut Git reposuna eklenir. Temel iş akışı git komutlarına paralel tasarlanmıştır.
# Kurulum
pip install "dvc[s3]" # S3 remote için
# pip install "dvc[gs]" # GCS remote için
# pip install "dvc[azure]" # Azure Blob için
# pip install "dvc[ssh]" # SSH remote için
# Git repo içinde başlat
git init my-ml-project
cd my-ml-project
dvc init
git add .dvc .dvcignore
git commit -m "DVC başlatıldı"
# Veri ekle (büyük dosyayı DVC takibine al)
dvc add data/train.csv
# Oluşturulur: data/train.csv.dvc (pointer) ve data/.gitignore
git add data/train.csv.dvc data/.gitignore
git commit -m "Eğitim verisi v1 eklendi"
# Remote'a yükle
dvc push
# Başka makinede çek
git clone https://github.com/user/my-ml-project
dvc pull
# Belirli bir versiyona git
git checkout v1.0
dvc checkout # veriyi de git versiyonuyla eşleştir
# Veri durumunu kontrol et
dvc status
dvc diff HEAD~1 # önceki commit ile fark
data/train.csv.dvc dosyası içeriği: MD5 hash, boyut ve remote path bilgisi içerir. Bu küçük pointer dosyası Git'te takip edilirken gerçek veri remote storage'da kalır.
outs:
- md5: a3b4c5d6e7f8...
size: 524288000
nfiles: null
path: train.csv
02 DVC Remote Storage
DVC remote, büyük dosyaların gerçekten saklandığı yerdir. S3, GCS, Azure, SSH ve yerel dosya sistemi desteklenir.
# S3 remote
dvc remote add myremote s3://my-dvc-bucket/datasets
dvc remote modify myremote region us-east-1
dvc remote modify myremote profile default # AWS profile
# Google Cloud Storage
dvc remote add gcs-remote gs://my-gcs-bucket/dvc-store
# Azure Blob Storage
dvc remote add azure-remote azure://mycontainer/dvc
dvc remote modify azure-remote account_name myaccount
# Varsayılan remote ayarla
dvc remote default myremote
# .dvc/config dosyasına kaydedilir — git commit!
git add .dvc/config
git commit -m "DVC S3 remote konfigürasyonu"
[core]
remote = myremote
['remote "myremote"']
url = s3://my-dvc-bucket/datasets
region = us-east-1
03 dvc.yaml ile Pipeline
dvc.yaml, pipeline adımlarını, bağımlılıklarını ve çıktılarını tanımlar. Makefile'a benzer ancak veri farkındalığına sahiptir.
stages:
prepare:
cmd: python src/prepare.py
deps:
- src/prepare.py
- data/raw/images/ # dizin bağımlılığı
params:
- prepare.split_ratio
- prepare.random_seed
outs:
- data/prepared/train/
- data/prepared/val/
- data/prepared/test/
featurize:
cmd: python src/featurize.py
deps:
- src/featurize.py
- data/prepared/train/
params:
- featurize.max_features
- featurize.ngram_range
outs:
- data/features/train.npz
- data/features/val.npz
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features/train.npz
- data/features/val.npz
params:
- train.n_estimators
- train.max_depth
- train.learning_rate
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: python src/evaluate.py
deps:
- src/evaluate.py
- models/model.pkl
- data/features/val.npz
metrics:
- metrics/eval_metrics.json:
cache: false
plots:
- plots/confusion_matrix.json:
cache: false
prepare:
split_ratio: 0.2
random_seed: 42
featurize:
max_features: 5000
ngram_range: [1, 2]
train:
n_estimators: 100
max_depth: 6
learning_rate: 0.1
# Tüm pipeline'ı çalıştır (değişen adımlar atlanır)
dvc repro
# Belirli bir adımdan itibaren çalıştır
dvc repro train
# DAG görselleştir
dvc dag
# Metrics göster
dvc metrics show
dvc metrics diff main
04 DVC Experiment Tracking
MLflow veya W&B gibi ağır araçlara gerek kalmadan Git ve DVC ile deney takibi.
# Yeni deney çalıştır (otomatik branch/stash oluşturur)
dvc exp run --name exp-lr-0.01 \
--set-param train.learning_rate=0.01
dvc exp run --name exp-lr-0.05 \
--set-param train.learning_rate=0.05
dvc exp run --name exp-depth-8 \
--set-param train.max_depth=8
# Tüm deneyleri karşılaştır
dvc exp show
# Metrics ve params farkını göster
dvc params diff exp-lr-0.01 exp-depth-8
# En iyi deneyi kalıcı hale getir
dvc exp apply exp-depth-8
git add .
git commit -m "En iyi deney: max_depth=8, acc=0.923"
# Deneyleri push et (başkaları paylaşabilir)
dvc exp push origin exp-depth-8
import json, yaml, joblib
from pathlib import Path
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import numpy as np
# Params yükle
with open("params.yaml") as f:
params = yaml.safe_load(f)["train"]
# Veri yükle
X_train = np.load("data/features/train.npz")["X"]
y_train = np.load("data/features/train.npz")["y"]
X_val = np.load("data/features/val.npz")["X"]
y_val = np.load("data/features/val.npz")["y"]
# Model eğit
clf = GradientBoostingClassifier(
n_estimators=params["n_estimators"],
max_depth=params["max_depth"],
learning_rate=params["learning_rate"],
random_state=42,
)
clf.fit(X_train, y_train)
# Metrikleri kaydet
preds = clf.predict(X_val)
probs = clf.predict_proba(X_val)[:, 1]
metrics = {
"accuracy": accuracy_score(y_val, preds),
"f1": f1_score(y_val, preds, average="weighted"),
"auc_roc": roc_auc_score(y_val, probs),
}
Path("metrics").mkdir(exist_ok=True)
with open("metrics/train_metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
# Modeli kaydet
Path("models").mkdir(exist_ok=True)
joblib.dump(clf, "models/model.pkl")
print(metrics)
05 LakeFS — Git Semantics for Data Lakes
LakeFS, S3/GCS/Azure Blob üzerinde Git benzeri branch, commit, merge ve revert işlemleri sağlar.
LakeFS, object storage önüne bir proxy katmanı olarak oturur. Mevcut Spark, Presto, Flink kod tabanı değiştirilmeden kullanılmaya devam eder — yalnızca endpoint URL güncellenir (s3:// yerine lakefs://).
Git main ← feature/new-data ← hotfix/schema-fix LakeFS main ← experiment/v2 ← staging/etl-test Commit → değişmez snapshot (hash tabanlı) Branch → izole geliştirme ortamı (veri kopyalanmaz, pointer) Merge → atomik merge, conflict detection Revert → anlık geri alma
docker run -p 8000:8000 -p 8001:8001 \
-e LAKEFS_BLOCKSTORE_TYPE=local \
-e LAKEFS_AUTH_ENCRYPT_SECRET_KEY=some-secret \
treeverse/lakefs run
# Python client
pip install lakefs
import lakefs
# ── Client konfigürasyonu ────────────────────────────────────────────
clt = lakefs.client.Client(
host="http://localhost:8000",
username="admin",
password="admin-secret",
)
# ── Repository oluştur ───────────────────────────────────────────────
repo = lakefs.Repository("ml-datasets", client=clt).create(
storage_namespace="s3://my-bucket/lakefs-store",
default_branch="main",
)
# ── Branch oluştur ───────────────────────────────────────────────────
branch = repo.branch("experiment/new-features").create(source_reference="main")
# ── Dosya yükle ──────────────────────────────────────────────────────
with branch.object("data/processed/train_v2.parquet").writer() as writer:
writer.write(open("train_v2.parquet", "rb").read())
# ── Commit ───────────────────────────────────────────────────────────
branch.commit(
message="Yeni feature engineering v2 eklendi",
metadata={"author": "data-eng", "rows": "1234567"},
)
06 LakeFS Branch & Merge Workflow
ETL test, veri kalite kontrol ve production merge iş akışı.
import lakefs
import polars as pl
from deltalake import write_deltalake
repo = lakefs.Repository("ml-datasets")
# ── ETL'i izole branch'te çalıştır ──────────────────────────────────
etl_branch = repo.branch("etl/daily-load-2026-04-12").create(source_reference="main")
try:
# ETL çalıştır — lakefs:// endpoint kullan
df = pl.read_parquet("s3://raw-bucket/events/2026-04-12/*.parquet")
df_clean = df.filter(pl.col("revenue") >= 0)
# LakeFS branch'e yaz (S3 uyumlu endpoint)
write_deltalake(
"s3://ml-datasets/etl/daily-load-2026-04-12/data/events",
df_clean.to_arrow(),
storage_options={
"AWS_ENDPOINT_URL": "http://localhost:8000",
"AWS_ACCESS_KEY_ID": "admin",
"AWS_SECRET_ACCESS_KEY": "admin-secret",
}
)
# Veri kalite kontrolü
assert len(df_clean) > 10_000, "Yeterli veri yok!"
assert df_clean["user_id"].null_count() == 0, "Null user_id var!"
# Başarılı — main'e merge et
etl_branch.commit(message="ETL 2026-04-12 tamamlandı")
merge_result = etl_branch.merge_into("main")
print(f"Merge başarılı: {merge_result}")
except Exception as e:
# Başarısız — branch'i sil, main etkilenmez
etl_branch.delete()
raise RuntimeError(f"ETL başarısız, değişiklikler geri alındı: {e}")
# ── Diff (hangi dosyalar değişti?) ─────────────────────────────────
for diff in repo.ref("main").diff(other_ref="main~1"):
print(f"{diff.type}: {diff.path}")
# ── Belirli versiyona revert ─────────────────────────────────────────
# Bozuk veri gelirse önceki commit'e dön:
# repo.branch("main").revert(reference="~3")
07 Data Contracts — Schema Registry
Producer ve consumer arasındaki schema anlaşması. Confluent Schema Registry, Avro ve Protobuf ile schema evrim kuralları.
{
"type": "record",
"name": "UserEvent",
"namespace": "com.company.events",
"doc": "Kullanıcı davranış olayı",
"fields": [
{"name": "user_id", "type": "long", "doc": "Kullanıcı ID"},
{"name": "event_type", "type": "string", "doc": "Olay türü"},
{"name": "ts", "type": "long", "logicalType": "timestamp-millis"},
{"name": "revenue", "type": ["null", "double"], "default": null},
{"name": "product_id", "type": ["null", "string"], "default": null}
]
}
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
# ── Schema kaydet ────────────────────────────────────────────────────
with open("schema/user_event.avsc") as f:
schema_str = f.read()
schema = Schema(schema_str, "AVRO")
schema_id = client.register_schema("user-events-value", schema)
print(f"Schema ID: {schema_id}")
# ── Uyumluluk kontrolü ───────────────────────────────────────────────
new_schema_str = """... yeni alan eklenmiş schema ..."""
compat = client.test_compatibility(
subject_name="user-events-value",
schema=Schema(new_schema_str, "AVRO")
)
print(f"Uyumlu mu? {compat}")
# ── Uyumluluk politikası ayarla ──────────────────────────────────────
# BACKWARD: yeni consumer eski mesajları okuyabilir (yeni alanlar optional)
# FORWARD: eski consumer yeni mesajları okuyabilir
# FULL: her ikisi de
client.update_compatibility("BACKWARD", subject_name="user-events-value")
08 Schema Evolution ve Practical
Değişmez schema kuralları ve DVC ile image classification dataset versiyonlama pipeline.
Schema Evolution Kuralları
| Değişiklik | BACKWARD | FORWARD | FULL |
|---|---|---|---|
| Opsiyonel alan ekle (default var) | Uyumlu | Uyumlu | Uyumlu |
| Zorunlu alan ekle (default yok) | Uyumsuz | Uyumlu | Uyumsuz |
| Alan sil (opsiyonel) | Uyumlu | Uyumsuz | Uyumsuz |
| Alan tipini değiştir | Uyumsuz | Uyumsuz | Uyumsuz |
| Alan adını değiştir | Uyumsuz | Uyumsuz | Uyumsuz |
# ── Proje yapısı ─────────────────────────────────────────────────────
# image-classifier/
# data/raw/ ← ham görüntüler (DVC ile takip edilir)
# data/processed/ ← boyutlandırılmış, augmented (DVC çıktısı)
# models/ ← eğitilmiş model (DVC çıktısı)
# params.yaml ← hyperparameters (Git ile takip)
# dvc.yaml ← pipeline tanımı (Git ile takip)
# v1: 10k görüntü, ResNet18
dvc add data/raw/images_v1/
git add data/raw/images_v1.dvc
git commit -m "Dataset v1: 10k görüntü eklendi"
git tag dataset-v1
dvc push
# v2: 25k görüntü, ek sınıflar
dvc add data/raw/images_v2/
git add data/raw/images_v2.dvc
git commit -m "Dataset v2: 25k görüntü, 3 yeni sınıf"
git tag dataset-v2
dvc push
# v1 verisiyle model reproducibility
git checkout dataset-v1
dvc checkout # veriyi de v1'e döndürür
dvc repro # pipeline'ı v1 veri + o zamanki params ile çalıştır