Veri Mühendisliği
RehberVeri Mühendisliği01 · Pipeline

Veri Pipeline
ETL, Lakehouse & Polars.

ETL vs ELT paradigmasından Airflow DAG'larına, Delta Lake lakehouse mimarisinden Polars lazy evaluation'a ve Spark optimizasyonlarına kapsamlı bir bakış.

00 ETL vs ELT — Paradigma Farkı

İki paradigma aynı veriyi hareket ettirir ama farklı noktalarda dönüştürür. Modern bulut veri ambarları ELT'yi mümkün kıldı; ancak ikisinin doğru kullanım alanları hâlâ farklıdır.

ETL (Extract-Transform-Load): Veri kaynaktan çekilir, bir ara işlem katmanında (genellikle bir sunucu veya Spark cluster'ı) temizlenir/dönüştürülür, ardından hedef sisteme yüklenir. Klasik veri ambarı mimarilerinin (Teradata, IBM Netezza) temel yöntemidir.

ELT (Extract-Load-Transform): Ham veri önce hedef sisteme (BigQuery, Snowflake, Redshift) yüklenir, dönüşüm işlemi hedef sistemin hesaplama gücüyle yapılır. dbt (data build tool) bu paradigmanın standart aracıdır. Modern bulut veri ambarlarının ölçeklenebilir SQL motoru sayesinde güçlenmiştir.

KriterETLELT
Dönüşüm YeriKaynak ile hedef arasındaHedef sistemde
Ham VeriHedefte saklanmazHam halde saklanır
AraçlarInformatica, SSIS, Sparkdbt, Dataform, BigQuery
Güçlü Olduğu YerPII masking, kısıtlı storageHızlı analitik, iterasyon
Zayıf Olduğu YerBüyük veri, yavaş iterasyonHedefte depolama maliyeti
Ne zaman ETL? Kişisel veri (PII) içeren kayıtların hedef sisteme taşınmadan önce maskelenmesi/şifrelenmesi gerekiyorsa ETL zorunludur. Ayrıca hedef sistemin yeterli hesaplama gücü yoksa ara dönüşüm katmanı gerekir.

01 Batch Pipeline Mimarisi — Airflow & Prefect

Airflow DAG'ları ile günlük batch pipeline'lar; Prefect ile modern Python-first orkestrasyon.

Apache Airflow, DAG (Directed Acyclic Graph) tabanlı iş akışı orkestrasyon aracıdır. Her adım bir Operator'dür; bağımlılıklar >> operatörüyle tanımlanır. Airflow Scheduler, belirlenen schedule_interval'da DAG'ı tetikler.

dags/daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import polars as pl

DEFAULT_ARGS = {
    "owner": "data-eng",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["alerts@company.com"],
}

def extract(**context):
    # S3'ten parquet oku
    ds = context["ds"]  # YYYY-MM-DD
    df = pl.read_parquet(f"s3://raw-bucket/events/date={ds}/*.parquet")
    context["ti"].xcom_push(key="row_count", value=len(df))
    df.write_parquet(f"/tmp/extract_{ds}.parquet")

def transform(**context):
    ds = context["ds"]
    df = pl.read_parquet(f"/tmp/extract_{ds}.parquet")
    df = (
        df
        .filter(pl.col("event_type").is_in(["click", "purchase", "view"]))
        .with_columns([
            pl.col("ts").cast(pl.Datetime).alias("event_time"),
            (pl.col("revenue").fill_null(0.0)).alias("revenue"),
        ])
        .drop_nulls(subset=["user_id", "event_type"])
    )
    df.write_parquet(f"/tmp/transform_{ds}.parquet")

def load_to_delta(**context):
    from deltalake import write_deltalake
    ds = context["ds"]
    df = pl.read_parquet(f"/tmp/transform_{ds}.parquet")
    write_deltalake(
        "s3://curated-bucket/events",
        df.to_arrow(),
        mode="append",
        partition_by=["event_type"],
    )

with DAG(
    dag_id="daily_events_pipeline",
    default_args=DEFAULT_ARGS,
    schedule_interval="@daily",
    start_date=days_ago(1),
    catchup=False,
    tags=["events", "production"],
) as dag:

    t_extract   = PythonOperator(task_id="extract",    python_callable=extract,        provide_context=True)
    t_transform = PythonOperator(task_id="transform",   python_callable=transform,      provide_context=True)
    t_load      = PythonOperator(task_id="load_delta",  python_callable=load_to_delta,  provide_context=True)

    t_extract >> t_transform >> t_load

Prefect, Python dekoratörleriyle çalışan modern alternatiftir. Airflow'un XML/YAML karmaşıklığı olmadan saf Python kodu yazar, dağıtım için prefect deploy yeterlidir.

flows/events_flow.py — Prefect
from prefect import flow, task
from prefect.tasks import exponential_backoff
import polars as pl

@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=2))
def extract_events(date: str) -> pl.DataFrame:
    return pl.read_parquet(f"s3://raw-bucket/events/date={date}/*.parquet")

@task
def transform_events(df: pl.DataFrame) -> pl.DataFrame:
    return (
        df.filter(pl.col("event_type") != "bot")
          .with_columns(pl.col("ts").cast(pl.Datetime))
    )

@task
def load_events(df: pl.DataFrame, date: str):
    from deltalake import write_deltalake
    write_deltalake("s3://curated-bucket/events", df.to_arrow(), mode="append")

@flow(name="events-pipeline", log_prints=True)
def events_pipeline(date: str = "2026-01-01"):
    raw  = extract_events(date)
    clean = transform_events(raw)
    load_events(clean, date)
    print(f"Pipeline tamamlandı: {len(clean)} satır yüklendi.")

if __name__ == "__main__":
    events_pipeline(date="2026-04-12")

02 Data Lakehouse — Delta Lake, Iceberg, Hudi

Object storage üzerinde ACID transaction desteği sağlayan açık tablo formatları data lake ve veri ambarı dünyasını birleştirdi.

Klasik data lake (S3/GCS üzerinde parquet dosyaları) ölçeklenebilir ve ucuzdu ancak ACID garantisi yoktu: paralel yazılar veriyi bozabilir, kısmi başarısız yazılar kalıntı bırakır, schema evolution riskli, time-travel imkânsızdı. Lakehouse mimarisi bu sorunları çözer.

FormatŞirketGüçlü YanıEkosistem
Delta LakeDatabricks / Linux FoundationSpark entegrasyonu, DML (UPDATE/DELETE/MERGE)PySpark, Delta-rs (Rust/Python)
Apache IcebergNetflix / ApacheHidden partitioning, time travel, engine-agnosticSpark, Flink, Trino, DuckDB
Apache HudiUber / ApacheStreaming upserts, near-realtime ingestionSpark, Flink, Presto
lakehouse_demo.py — Delta Lake (Python delta-rs)
import pyarrow as pa
from deltalake import DeltaTable, write_deltalake

TABLE_PATH = "s3://my-bucket/silver/users"
STORAGE_OPTIONS = {"AWS_REGION": "us-east-1", "AWS_S3_ALLOW_UNSAFE_RENAME": "true"}

# ── İlk yazma ──────────────────────────────────────────────────────
schema = pa.schema([
    pa.field("user_id",   pa.int64()),
    pa.field("name",      pa.string()),
    pa.field("country",   pa.string()),
    pa.field("created_at", pa.timestamp("us")),
])
data = pa.table({
    "user_id":   [1, 2, 3],
    "name":      ["Alice", "Bob", "Carol"],
    "country":   ["TR", "DE", "US"],
    "created_at": pa.array([1_700_000_000_000_000] * 3, type=pa.timestamp("us")),
})
write_deltalake(TABLE_PATH, data, schema=schema,
               mode="overwrite", partition_by=["country"],
               storage_options=STORAGE_OPTIONS)

# ── UPSERT (MERGE) ──────────────────────────────────────────────────
dt = DeltaTable(TABLE_PATH, storage_options=STORAGE_OPTIONS)
new_data = pa.table({
    "user_id":   [2, 4],
    "name":      ["Bobby", "Dave"],
    "country":   ["DE", "FR"],
    "created_at": pa.array([1_710_000_000_000_000] * 2, type=pa.timestamp("us")),
})
(
    dt.merge(
        source=new_data,
        predicate="s.user_id = t.user_id",
        source_alias="s",
        target_alias="t",
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)

# ── Time travel ─────────────────────────────────────────────────────
dt_v0 = DeltaTable(TABLE_PATH, version=0, storage_options=STORAGE_OPTIONS)
print("Version 0:", dt_v0.to_pandas())
Hangi format seçilmeli? Databricks/Spark ağırlıklı ekosistemler için Delta Lake en olgun seçimdir. Engine-agnostic (Trino, Flink, DuckDB) ortamlar için Iceberg tercih edilir. Near-realtime streaming upsert ihtiyacı varsa Hudi değerlendirilebilir.

03 Polars ile Hızlı Veri Dönüşümü

Rust üzerine inşa edilmiş Polars, pandas'ın 5–20 katı hızında çalışır. Lazy evaluation sayesinde sorgu planı optimize edilir, bellek tüketimi azaltılır.

Eager vs Lazypl.read_csv() eager (anında çalışır). pl.scan_csv() lazy (sorgu planı oluşturur, .collect() ile çalıştırır).
scan_parquetParquet dosyalarını lazy okur. Predicate pushdown ve projection pushdown otomatik uygulanır.
Expressionspl.col(), pl.lit(), pl.when().then().otherwise() ile kolon bazlı işlemler.
group_by + aggParallel execution ile pandas groupby'dan çok daha hızlı aggregation.
joinhow="inner/left/outer/semi/anti/cross" — hash join paralel çalışır.
polars_pipeline.py
import polars as pl
from datetime import date

# ── Lazy pipeline (scan, filter, aggregate) ────────────────────────
events = (
    pl.scan_parquet("s3://bucket/events/**/*.parquet")
    .filter(
        (pl.col("event_date") >= date(2026, 1, 1)) &
        (pl.col("event_type").is_in(["purchase", "add_to_cart"]))
    )
    .with_columns([
        (pl.col("revenue") * pl.col("quantity")).alias("total_value"),
        pl.col("user_id").cast(pl.Utf8).alias("user_id_str"),
    ])
)

# ── Aggregation ────────────────────────────────────────────────────
user_stats = (
    events
    .group_by(["user_id", "event_date"])
    .agg([
        pl.col("total_value").sum().alias("daily_revenue"),
        pl.col("event_type").count().alias("event_count"),
        pl.col("product_id").n_unique().alias("unique_products"),
    ])
    .sort(["user_id", "event_date"])
)

# ── Join ile kullanıcı profili ─────────────────────────────────────
users = pl.scan_parquet("s3://bucket/users/*.parquet").select(["user_id", "country", "tier"])

result = (
    user_stats
    .join(users, on="user_id", how="left")
    .filter(pl.col("tier") == "premium")
    .collect(streaming=True)  # büyük dataset için streaming mode
)
print(result.head())

# ── Window function (rolling 7 gün ortalama) ───────────────────────
rolling = (
    result
    .sort(["user_id", "event_date"])
    .with_columns(
        pl.col("daily_revenue")
          .rolling_mean(window_size=7)
          .over("user_id")
          .alias("rev_7d_avg")
    )
)

04 Spark Temel API

PySpark DataFrame API, transformation vs action ayrımı ve doğru partitioning stratejisi Spark performansının temelidir.

Spark'ta tüm işlemler lazy değerlendirilir. filter(), select(), join() gibi işlemler yalnızca bir transformation (execution planına ekleme) yapar. count(), show(), write() gibi işlemler ise action'dır ve planı fiziksel olarak çalıştırır.

spark_job.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = (
    SparkSession.builder
    .appName("UserFeaturePipeline")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.adaptive.enabled", "true")   # AQE — Adaptive Query Execution
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

# ── Okuma ─────────────────────────────────────────────────────────
events = spark.read.format("delta").load("s3://bucket/silver/events")
users  = spark.read.format("parquet").load("s3://bucket/bronze/users")

# ── Transformations (lazy) ─────────────────────────────────────────
enriched = (
    events
    .filter(F.col("event_type") == "purchase")
    .withColumn("revenue_usd", F.col("revenue") * F.col("fx_rate"))
    .join(users.select("user_id", "country", "tier"), on="user_id", how="left")
    .groupBy("user_id", "country")
    .agg(
        F.sum("revenue_usd").alias("total_revenue"),
        F.count("*").alias("purchase_count"),
        F.avg("revenue_usd").alias("avg_order_value"),
    )
)

# ── Action — Delta'ya yaz ─────────────────────────────────────────
(
    enriched
    .repartition(50, "country")   # partition pruning için
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("country")
    .save("s3://bucket/gold/user_features")
)

spark.stop()
Shuffle sınırı: Varsayılan spark.sql.shuffle.partitions=200 küçük veri için çok fazla, büyük veri için az olabilir. Kural: her partition yaklaşık 128 MB olmalı. AQE (Adaptive Query Execution) açıksa Spark bu değeri otomatik ayarlar.

05 Data Quality — Great Expectations & Soda

Veri kalitesi kontrolleri pipeline'a entegre edilmezse çöp giren çöp çıkar. Great Expectations ve Soda iki popüler seçenektir.

gx_suite.py — Great Expectations
import great_expectations as gx
import pandas as pd

context = gx.get_context()

# ── Datasource tanımla ──────────────────────────────────────────────
datasource = context.sources.add_or_update_pandas(name="events_ds")
asset = datasource.add_dataframe_asset(name="events")
batch_request = asset.build_batch_request(dataframe=pd.read_parquet("events.parquet"))

# ── Expectation Suite ───────────────────────────────────────────────
suite = context.add_or_update_expectation_suite("events_suite")
validator = context.get_validator(batch_request=batch_request, expectation_suite=suite)

validator.expect_column_to_exist("user_id")
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("revenue", min_value=0, max_value=100_000)
validator.expect_column_values_to_be_in_set("event_type", ["click", "purchase", "view"])
validator.expect_column_pair_values_to_be_equal("order_id", "order_id_copy")
validator.expect_table_row_count_to_be_between(min_value=1000, max_value=10_000_000)

validator.save_expectation_suite()

# ── Checkpoint çalıştır ─────────────────────────────────────────────
checkpoint = context.add_or_update_checkpoint(
    name="events_checkpoint",
    validator=validator,
)
result = checkpoint.run()
if not result["success"]:
    raise ValueError("Data quality check başarısız! Pipeline durduruluyor.")

06 Pipeline Test Stratejisi

Veri pipeline'larını test etmek yazılım testinden farklıdır: veri bağımlılığı, yan etkiler ve non-determinism zorluğu vardır.

01 Unit Test       → Saf transformation fonksiyonları küçük sabit veriyle test et
02 Integration    → Gerçek kaynak/hedef sistemlere karşı (Docker Compose ile yerel)
03 Contract Test  → Kaynak schema'nın beklenen sözleşmeye uyduğunu doğrula
04 Data Quality   → Great Expectations / Soda ile beklentileri otomatik kontrol et
05 E2E Test       → Küçük fixture dataset ile tüm pipeline'ı uçtan uca çalıştır
test_transform.py — pytest
import polars as pl
import pytest
from pipeline.transforms import clean_events, aggregate_daily

@pytest.fixture
def raw_events():
    return pl.DataFrame({
        "user_id":    [1, 2, 2, 3, None],
        "event_type": ["click", "purchase", "bot", "purchase", "view"],
        "revenue":    [0.0, 49.99, 0.0, 120.0, 0.0],
    })

def test_clean_removes_bot_events(raw_events):
    result = clean_events(raw_events)
    assert "bot" not in result["event_type"].to_list()

def test_clean_drops_null_user_id(raw_events):
    result = clean_events(raw_events)
    assert result["user_id"].null_count() == 0

def test_aggregate_daily_revenue(raw_events):
    clean = clean_events(raw_events)
    agg   = aggregate_daily(clean)
    user2_revenue = agg.filter(pl.col("user_id") == 2)["total_revenue"][0]
    assert user2_revenue == pytest.approx(49.99, rel=1e-3)

07 Cost Optimization

Partition pruning, Z-ordering ve compaction ile S3 okuma maliyeti ve Spark compute süresi dramatik biçimde düşürülebilir.

Partition PruningParquet/Delta dosyalarını partition kolona göre dizinle; sorgu yalnızca ilgili klasörlere bakar. event_date veya country gibi sık filtrenen kolonları seç.
Z-Ordering (Delta)OPTIMIZE komutu ile sık birlikte filtrenen birden fazla kolonu (user_id + event_date) Z-order'a göre birlikte sırala, data skipping artırılır.
Small File CompactionStreaming yazıları çok sayıda küçük dosya üretir. OPTIMIZE / VACUUM ile birleştir ve gereksiz versiyonları temizle.
Columnar ProjectionSELECT * yerine yalnızca gereken kolonları seç; Parquet column-pruning ile okuma maliyeti kolun sayısıyla orantılı azalır.
Spot / PreemptibleBatch Spark job'larında Spot Instance kullan (%70'e kadar tasarruf). Checkpointing ile yeniden başlatmayı destekle.
optimize_delta.sql — Delta Lake optimize komutları
-- Z-ORDER ile optimize (Spark SQL / Databricks)
OPTIMIZE delta.`s3://bucket/gold/user_features`
ZORDER BY (user_id, event_date);

-- 7 günden eski versiyon dosyalarını temizle
VACUUM delta.`s3://bucket/gold/user_features` RETAIN 168 HOURS;

-- Partition statistics güncelle
ANALYZE TABLE user_features COMPUTE STATISTICS FOR ALL COLUMNS;

08 Practical — S3 → Delta Lake → Feature Tablosu

Airflow DAG ile tam uçtan uca pipeline: S3'ten ham event verisi çek, Delta Lake silver tablosuna yaz, feature engineering sonucu gold feature tablosuna yükle.

01 S3 raw         → günlük parquet dosyaları (events/date=YYYY-MM-DD/)
02 Extract        → Polars ile oku, boyut kontrol et
03 Quality Check  → Great Expectations ile schema & range doğrula
04 Transform      → temizle, tip dönüştür, null yönet
05 Silver Delta   → temiz ham veri, partitionBy(event_date)
06 Feature Eng    → 7 günlük pencere aggregation (Spark)
07 Gold Delta     → user_features tablosu, Z-ORDER(user_id)
08 Feast Materialize → online feature store güncelle
dags/full_pipeline.py
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta

@dag(
    dag_id="s3_to_feature_table",
    schedule_interval="0 3 * * *",   # her gün 03:00
    start_date=days_ago(1),
    default_args={"retries": 2, "retry_delay": timedelta(minutes=10)},
    catchup=False,
)
def pipeline():

    @task
    def extract(ds=None):
        import polars as pl
        df = pl.scan_parquet(f"s3://raw/events/date={ds}/*.parquet").collect()
        assert len(df) > 0, "Boş dataset!"
        return df.write_ipc(None)  # Arrow IPC bytes — XCom için serialize

    @task
    def validate(ipc_bytes: bytes):
        import polars as pl, io
        df = pl.read_ipc(io.BytesIO(ipc_bytes))
        required_cols = {"user_id", "event_type", "revenue", "ts"}
        assert required_cols.issubset(set(df.columns)), "Eksik kolon!"
        assert df["revenue"].min() >= 0, "Negatif revenue!"
        return ipc_bytes

    @task
    def write_silver(ipc_bytes: bytes, ds=None):
        import polars as pl, io
        from deltalake import write_deltalake
        df = pl.read_ipc(io.BytesIO(ipc_bytes))
        write_deltalake("s3://curated/silver/events", df.to_arrow(),
                        mode="append", partition_by=["event_date"])

    @task
    def build_features(ds=None):
        from pyspark.sql import SparkSession
        import pyspark.sql.functions as F
        spark = SparkSession.builder.getOrCreate()
        df = spark.read.format("delta").load("s3://curated/silver/events")
        features = (
            df.groupBy("user_id")
              .agg(
                F.sum("revenue").alias("ltv"),
                F.count("*").alias("total_events"),
                F.countDistinct("event_date").alias("active_days"),
              )
        )
        features.write.format("delta").mode("overwrite").save("s3://curated/gold/user_features")
        spark.stop()

    raw = extract()
    validated = validate(raw)
    write_silver(validated) >> build_features()

pipeline()
XCom limiti: Airflow XCom varsayılan olarak metadata DB'de saklar (genellikle PostgreSQL). Büyük DataFrameler için XCom backend olarak S3 veya custom XCom kullanın, ya da veriye dosya path'iyle referans edin.