00 Model-Based vs Model-Free Tradeoff
Model-free algoritmalar güvenilir ama veri açtır; model-based yöntemler verimlidir ama model hatalarına karşı hassastır.
PPO ve DQN gibi model-free yöntemler, sadece deneyimden (s,a,r,s') öğrenir. Ortamın nasıl çalıştığına dair herhangi bir model tutmazlar. Bu yaklaşım robusttır — hatalı model yoktur — ama son derece sample-verimsizdir. Atari'de uzman insan seviyesine ulaşmak için tipik olarak 200 milyon frame gerekir; bir insan birkaç saatte aynı noktaya ulaşır.
Model-based RL, ortamın bir world model'ini öğrenir: P(s'|s,a) ve R(s,a). Bu model, gerçek ortam etkileşimi olmadan "hayal edilen" deneyimler üretebilir. Avantajları: dramatik olarak daha az gerçek etkileşim, planlama kapasitesi, öğrenilen dinamiklerden genelleme.
01 World Model Nedir?
World model, ortamın dinamiklerini ve ödül fonksiyonunu parametrik biçimde temsil eden bir sinir ağıdır.
Bir world model iki temel bileşenden oluşur:
Opsiyonel olarak termination model de eklenebilir: d_ψ(s) → {0,1}. Bu üçlü birlikte, gerçek ortamı simüle eden bir "hayal ortamı" oluşturur.
World model öğrenme: gerçek deneyimlerden (s, a, r, s') oluşan bir veri kümesi üzerinde supervised learning ile eğitilir. Sonra bu model, policy iyileştirme için hayal edilen veri üretmekte kullanılır.
import torch
import torch.nn as nn
import torch.nn.functional as F
class DeterministicWorldModel(nn.Module):
"""
Basit deterministik dünya modeli.
Input : (state, action)
Output: (next_state, reward)
"""
def __init__(self, state_dim, action_dim, hidden=256):
super().__init__()
self.dynamics = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden), nn.SiLU(),
nn.Linear(hidden, hidden), nn.SiLU(),
nn.Linear(hidden, state_dim)
)
self.reward = nn.Sequential(
nn.Linear(state_dim + action_dim, hidden), nn.SiLU(),
nn.Linear(hidden, hidden // 2), nn.SiLU(),
nn.Linear(hidden // 2, 1)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
next_state = state + self.dynamics(x) # residual bağlantı
reward = self.reward(x).squeeze(-1)
return next_state, reward
def train_world_model(model, optimizer, replay_buffer, batch_size=256):
"""Gerçek deneyimlerden world model eğit."""
s, a, r, s2, _ = replay_buffer.sample(batch_size)
s_pred, r_pred = model(s, a)
dyn_loss = F.mse_loss(s_pred, s2)
rew_loss = F.mse_loss(r_pred, r)
loss = dyn_loss + rew_loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
return dyn_loss.item(), rew_loss.item()
02 Dyna-Q — Model + Planning + Learning
Dyna-Q, gerçek deneyim ve model-tabanlı simülasyonu tek döngüde birleştirir.
Sutton'ın 1991'de tanımladığı Dyna mimarisi, birbirini tamamlayan üç bileşenden oluşur: (1) Gerçek ortamla etkileşim, (2) bu deneyimlerden model öğrenme, (3) öğrenilen modelden simüle edilmiş deneyimler üretip Q-learning ile güncelleme.
Her adımda k kez "planlama" yapılır: mevcut model ile hayal edilen (s,a,r,s') tuple'ları üretilir ve Q-tablosu bu hayali deneyimlerle de güncellenir. k büyüdükçe daha hızlı öğrenilir — ama daha fazla hesaplama gerekir.
import gymnasium as gym
import numpy as np
from collections import defaultdict
import random
class DynaQAgent:
def __init__(self, n_states, n_actions, alpha=0.1,
gamma=0.99, eps=0.1, k_planning=5):
self.Q = np.zeros((n_states, n_actions))
# Model: (s,a) → (r, s') geçmişini sakla
self.model = {} # {(s,a): (r, s_next)}
self.visited = [] # ziyaret edilen (s,a) çiftleri
self.alpha = alpha
self.gamma = gamma
self.eps = eps
self.k_planning = k_planning
def act(self, state):
if np.random.random() < self.eps:
return np.random.randint(self.Q.shape[1])
return np.argmax(self.Q[state])
def update(self, s, a, r, s2, done):
# 1. Gerçek deneyimden Q-learning güncellemesi
td_target = r + self.gamma * np.max(self.Q[s2]) * (1-done)
self.Q[s, a] += self.alpha * (td_target - self.Q[s, a])
# 2. Model güncelleme (deterministik tablo tabanlı)
self.model[(s, a)] = (r, s2)
if (s, a) not in self.visited:
self.visited.append((s, a))
# 3. Planlama: k kez hayali deneyim üret
for _ in range(self.k_planning):
if not self.visited:
break
s_sim, a_sim = random.choice(self.visited)
r_sim, s2_sim = self.model[(s_sim, a_sim)]
td_t = r_sim + self.gamma * np.max(self.Q[s2_sim])
self.Q[s_sim, a_sim] += self.alpha * (td_t - self.Q[s_sim, a_sim])
# FrozenLake ile Dyna-Q test
env = gym.make('FrozenLake-v1', is_slippery=False)
agent = DynaQAgent(n_states=16, n_actions=4, k_planning=10)
rewards = []
for ep in range(2000):
s, _ = env.reset()
total = 0
while True:
a = agent.act(s)
s2, r, done, trunc, _ = env.step(a)
agent.update(s, a, r, s2, done)
s = s2; total += r
if done or trunc: break
rewards.append(total)
import numpy as np
print(f"Son 500 ep win rate: {np.mean(rewards[-500:]):.3f}")
# Dyna-Q (k=10) saf Q-learning'den çok daha hızlı converge eder
03 MuZero Mimarisi
MuZero, ortam dinamiklerini öğrenilen latent uzayda temsil eder — oyun kuralları olmadan AlphaZero kalitesine ulaşır.
DeepMind'ın 2019 Nature makalesinde tanımlanan MuZero, three-network mimarisine dayanır:
MuZero'nun özgünlüğü: gerçek ortam kuralarını hiç kullanmaz. Tüm planlama, öğrenilen latent space'te yapılır. Bu, Chess, Go, Atari ve Mancala gibi çok farklı problemleri tek mimariye sığdırır.
import torch
import torch.nn as nn
class RepresentationNetwork(nn.Module):
"""h_θ: gözlem → latent state"""
def __init__(self, obs_dim, latent_dim=256):
super().__init__()
self.net = nn.Sequential(
nn.Linear(obs_dim, 512), nn.ReLU(),
nn.Linear(512, latent_dim), nn.Tanh() # normalize
)
def forward(self, obs):
return self.net(obs)
class DynamicsNetwork(nn.Module):
"""g_θ: (latent_state, action) → (next_latent, reward)"""
def __init__(self, latent_dim, n_actions):
super().__init__()
self.action_embed = nn.Embedding(n_actions, 64)
self.net = nn.Sequential(
nn.Linear(latent_dim + 64, 512), nn.ReLU(),
nn.Linear(512, 512), nn.ReLU()
)
self.next_state = nn.Sequential(nn.Linear(512, latent_dim), nn.Tanh())
self.reward = nn.Linear(512, 601) # categorical reward
def forward(self, latent, action):
a_emb = self.action_embed(action)
x = torch.cat([latent, a_emb], dim=-1)
feat = self.net(x)
return self.next_state(feat), self.reward(feat)
class PredictionNetwork(nn.Module):
"""f_θ: latent_state → (policy, value)"""
def __init__(self, latent_dim, n_actions):
super().__init__()
self.shared = nn.Sequential(
nn.Linear(latent_dim, 512), nn.ReLU(),
nn.Linear(512, 512), nn.ReLU()
)
self.policy = nn.Linear(512, n_actions) # prior for MCTS
self.value = nn.Linear(512, 601) # categorical value
def forward(self, latent):
feat = self.shared(latent)
return self.policy(feat), self.value(feat)
04 MCTS ile Learned Model
Monte Carlo Tree Search, öğrenilen world model ile birleşince güçlü bir planlama aracına dönüşür.
Klasik MCTS (AlphaGo, AlphaZero) gerçek ortam simülatörünü kullanır. MuZero'da ise MCTS, öğrenilen dynamics network üzerinde çalışır. Her MCTS simülasyonu, gerçek ortam yerine latent state geçişleri kullanır.
MCTS döngüsü 4 adımdan oluşur: Selection (en umut verici node'u seç, UCB skoru), Expansion (yeni node oluştur, prediction network ile prior al), Simulation (dynamics network ile rollout), Backpropagation (değerleri geriye yay).
import math
import numpy as np
class MCTSNode:
def __init__(self, prior: float):
self.prior = prior # P(s,a) — prediction network'ten
self.visit_count = 0
self.value_sum = 0.0
self.children = {} # {action: MCTSNode}
self.hidden_state = None # latent state
self.reward = 0.0
@property
def value(self):
if self.visit_count == 0: return 0
return self.value_sum / self.visit_count
def ucb_score(self, parent_visit, c_puct=1.25):
"""UCB skoru: değer tahmini + keşif bonusu."""
u = (c_puct * self.prior *
math.sqrt(parent_visit) / (1 + self.visit_count))
return self.value + u
def run_mcts(root_hidden, pred_net, dyn_net, n_simulations=50, n_actions=4):
"""Basit MCTS — learned model üzerinde."""
import torch
# Root node
with torch.no_grad():
policy_logits, value = pred_net(root_hidden)
priors = torch.softmax(policy_logits, dim=-1).squeeze().numpy()
root = MCTSNode(prior=1.0)
root.hidden_state = root_hidden
for a in range(n_actions):
root.children[a] = MCTSNode(prior=float(priors[a]))
for _ in range(n_simulations):
node = root
path = [root]
# Selection: en yüksek UCB'yi seç
while node.children and all(c.visit_count > 0
for c in node.children.values()):
action = max(node.children,
key=lambda a: node.children[a].ucb_score(node.visit_count))
node = node.children[action]
path.append(node)
# Expansion + Simulation
with torch.no_grad():
a_tensor = torch.tensor([list(node.children.keys())[0]])
next_h, _ = dyn_net(path[-2].hidden_state, a_tensor)
_, v = pred_net(next_h)
value_est = v.item()
# Backprop
for n in path:
n.visit_count += 1
n.value_sum += value_est
# En çok ziyaret edilen aksiyon
best_action = max(root.children, key=lambda a: root.children[a].visit_count)
return best_action
05 Dreamer — Latent Dynamics
Dreamer, tüm öğrenmeyi latent hayal edilmiş diziler üzerinde yapar — gerçek ortamla minimum etkileşimle.
Hafner ve arkadaşlarının Dreamer serisinde (v1 2019, v2 2020, v3 2023), agent ham piksel gözlemlerinden latent temsiller öğrenir ve policy'yi tamamen bu "hayal edilen" deneyimlerle günceller. Gerçek ortam yalnızca world model eğitimi için kullanılır.
Temel fikir: latent state s_t deterministik (h_t, RNN ile hesaplanan) ve stokastik (z_t, Gaussian örnekleme) bileşenlerden oluşur. Bu RSSM (Recurrent State-Space Model) yapısı, geçmiş bilgiyi deterministik h_t'de taşırken belirsizliği z_t'de modeller.
h_t = f(h_{t-1}, z_{t-1}, a_{t-1}) → deterministik (GRU)
z_t ~ q(z_t | h_t, o_t) → stokastik, gözlemden posterior
z_t ~ p(z_t | h_t) → stokastik, hayal için prior
ô_t = dec(h_t, z_t) → gözlem rekonstrüksiyonu
import torch
import torch.nn as nn
from torch.distributions import Normal
class RSSM(nn.Module):
"""
Basitleştirilmiş Recurrent State-Space Model.
h: deterministik hidden state (GRU)
z: stokastik latent state (Gaussian)
"""
def __init__(self, obs_dim, action_dim, h_dim=256, z_dim=32):
super().__init__()
self.h_dim = h_dim
self.z_dim = z_dim
# Deterministik geçiş: GRU
self.cell = nn.GRUCell(z_dim + action_dim, h_dim)
# Prior: p(z_t | h_t)
self.prior_net = nn.Sequential(
nn.Linear(h_dim, h_dim), nn.ELU(),
nn.Linear(h_dim, z_dim * 2) # mean, log_std
)
# Posterior: q(z_t | h_t, o_t)
self.posterior_net = nn.Sequential(
nn.Linear(h_dim + obs_dim, h_dim), nn.ELU(),
nn.Linear(h_dim, z_dim * 2)
)
def posterior_step(self, h, obs, action):
"""Gözlem ile posterior z örnekle."""
# Posterior: z ~ q(z|h,o)
post_out = self.posterior_net(torch.cat([h, obs], dim=-1))
post_mean, post_log_std = post_out.chunk(2, dim=-1)
post_std = post_log_std.exp().clamp(0.1, 10)
z = post_mean + post_std * torch.randn_like(post_mean)
# Bir sonraki h
h_next = self.cell(torch.cat([z, action], dim=-1), h)
return h_next, z, Normal(post_mean, post_std)
def imagine_step(self, h, action):
"""Gözlem olmadan prior z ile hayal et."""
prior_out = self.prior_net(h)
prior_mean, prior_log_std = prior_out.chunk(2, dim=-1)
prior_std = prior_log_std.exp().clamp(0.1, 10)
z = prior_mean + prior_std * torch.randn_like(prior_mean)
h_next = self.cell(torch.cat([z, action], dim=-1), h)
return h_next, z
06 PlaNet ve RSSM
PlaNet, planlama için latent uzayda trajektori optimizasyonu yapar — CEM ile.
Hafner ve ark.'nın 2019 tarihli PlaNet'i, RSSM tabanlı world model öğrenir ve planlama için Random Shooting veya CEM (Cross Entropy Method) kullanır. Agent, her adımda latent uzayda birçok olası aksiyon dizisi hayal eder, tahmini kümülatif ödülleri hesaplar ve en iyi diziyi seçer.
import torch
import numpy as np
def cem_plan(rssm, reward_model, h_init, z_init, horizon=12,
n_candidates=1000, n_elite=100, n_iters=10,
action_dim=1, action_low=-1.0, action_high=1.0):
"""
CEM (Cross Entropy Method) ile latent uzayda planlama.
En umut verici horizon-adımlık aksiyon dizisini bul.
"""
# Başlangıç dağılım parametreleri
mean = torch.zeros(horizon, action_dim)
std = torch.ones(horizon, action_dim)
for iteration in range(n_iters):
# Aday aksiyon dizileri örnekle: (N, T, action_dim)
actions = mean + std * torch.randn(n_candidates, horizon, action_dim)
actions = actions.clamp(action_low, action_high)
# Her aday için hayal rollout yap
returns = torch.zeros(n_candidates)
with torch.no_grad():
for i in range(n_candidates):
h = h_init.clone()
z = z_init.clone()
total_r = 0.0
for t in range(horizon):
a = actions[i, t].unsqueeze(0)
h, z = rssm.imagine_step(h, a)
r = reward_model(torch.cat([h, z], dim=-1))
total_r += (0.99 ** t) * r.item()
returns[i] = total_r
# Elite örnekleri seç (en yüksek return'ler)
elite_idx = returns.topk(n_elite).indices
elite_acts = actions[elite_idx]
# Dağılım güncelle
mean = elite_acts.mean(dim=0)
std = elite_acts.std(dim=0) + 1e-5
return mean[0] # İlk adımın en iyi aksiyonu
07 Imagination Rollouts
Dreamer-v2 ve v3'te policy, tamamen hayal edilen rollout'lar üzerinde PPO/actor-critic ile güncellenir.
World model bir kez eğitildiğinde, policy gradients hesaplanmak için gerçek ortama ihtiyaç kalmaz. RSSM'in imagine_step fonksiyonu ile H adım ileri sarılır, her adımda reward ve value tahmin edilir:
import torch
def imagine_trajectory(rssm, actor, reward_model, value_model,
h_init, z_init, horizon=15, gamma=0.99, lam=0.95):
"""
Latent uzayda H-adım hayal rollout.
Tüm hesaplama differentiable → doğrudan policy gradient.
"""
states = [] # (h,z) çiftleri
rewards = []
values = []
log_probs = []
h, z = h_init, z_init
for t in range(horizon):
state = torch.cat([h, z], dim=-1)
# Actor: aksiyon örnekle
action_dist = actor(state)
action = action_dist.rsample() # reparameterization
log_prob = action_dist.log_prob(action)
# Reward tahmini
r = reward_model(state).squeeze()
# Value tahmini (bootstrap)
v = value_model(state).squeeze()
states.append(state)
rewards.append(r)
values.append(v)
log_probs.append(log_prob)
# Hayal adımı (differentiable)
h, z = rssm.imagine_step(h, action)
# Lambda-returns hesapla
next_v = value_model(torch.cat([h, z], dim=-1)).squeeze()
targets = []
last = next_v
for r, v in zip(reversed(rewards), reversed(values)):
last = r + gamma * ((1-lam)*v + lam*last)
targets.insert(0, last)
targets = torch.stack(targets)
# Policy loss + value loss
values_t = torch.stack(values)
actor_loss = -(torch.stack(log_probs) * (targets - values_t.detach())).mean()
critic_loss = ((values_t - targets.detach()) ** 2).mean()
return actor_loss, critic_loss
08 Sample Efficiency Karşılaştırma
Model-based yöntemler, aynı performans için 10-100x daha az gerçek etkileşim gerektirir.
Gerçek robotik uygulamalar veya pahalı simülasyonlar için model-based tercih edilir. Atari ve oyun ortamları için model-free (özellikle PPO/SAC) genellikle yeterlidir — simülasyon ucuzdur. Hybrid yaklaşım: Dyna-Q tarzı model yardımıyla model-free'yi hızlandır.
09 Pratik: Pendulum MPC
Gymnasium Pendulum-v1 üzerinde neural dynamics model ve Model Predictive Control ile kontrol.
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
class DynamicsModel(nn.Module):
"""Pendulum dynamics: (obs, action) → next_obs"""
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(3 + 1, 256), nn.SiLU(),
nn.Linear(256, 256), nn.SiLU(),
nn.Linear(256, 3)
)
def forward(self, obs, action):
x = torch.cat([obs, action], dim=-1)
return obs + self.net(x) # residual
def pendulum_reward(obs, action):
"""Pendulum-v1 reward: -(theta^2 + 0.1*theta_dot^2 + 0.001*a^2)"""
cos_th, sin_th, th_dot = obs[..., 0], obs[..., 1], obs[..., 2]
theta = torch.atan2(sin_th, cos_th)
a = action[..., 0]
cost = theta**2 + 0.1 * th_dot**2 + 0.001 * a**2
return -cost
def random_shooting_mpc(model, obs, horizon=20, n_samples=500):
"""Random shooting MPC: en iyi aksiyon dizisini bul."""
obs_t = torch.FloatTensor(obs).unsqueeze(0).expand(n_samples, -1)
actions = torch.FloatTensor(n_samples, horizon, 1).uniform_(-2, 2)
total_r = torch.zeros(n_samples)
with torch.no_grad():
curr_obs = obs_t.clone()
for t in range(horizon):
a_t = actions[:, t, :]
r_t = pendulum_reward(curr_obs, a_t)
total_r += (0.99**t) * r_t
curr_obs = model(curr_obs, a_t)
best_idx = total_r.argmax()
return actions[best_idx, 0, 0].item()
# ── Ana eğitim döngüsü ─────────────────────────────────────
env = gym.make('Pendulum-v1')
model = DynamicsModel()
opt = optim.Adam(model.parameters(), lr=1e-3)
buffer = deque(maxlen=50_000)
ep_rewards = []
for ep in range(200):
obs, _ = env.reset()
total_r = 0
use_mpc = ep >= 30 # İlk 30 ep: rastgele veri topla
for step in range(200):
if use_mpc:
a_val = random_shooting_mpc(model, obs)
else:
a_val = env.action_space.sample()[0]
action = np.array([a_val])
obs2, r, done, trunc, _ = env.step(action)
buffer.append((obs, a_val, obs2))
total_r += r; obs = obs2
# Model güncelleme
if len(buffer) >= 256:
batch = random.sample(buffer, 256)
s_b = torch.FloatTensor([b[0] for b in batch])
a_b = torch.FloatTensor([[b[1]] for b in batch])
s2_b = torch.FloatTensor([b[2] for b in batch])
pred_s2 = model(s_b, a_b)
loss = nn.functional.mse_loss(pred_s2, s2_b)
opt.zero_grad(); loss.backward(); opt.step()
if done or trunc: break
ep_rewards.append(total_r)
if (ep + 1) % 10 == 0:
print(f"Ep {ep+1:3d} | Reward: {total_r:7.1f} | MPC: {use_mpc}")
env.close()
# MPC devreye girdikten sonra reward -1000'den ~-200'e iyileşir
30 episode rastgele veri toplama sonrası MPC devreye girer. Episode reward -1000'den -200 ile -400 arasına iyileşir. Model daha fazla veriyle güçlendikçe MPC performansı artar. Gerçek RL ile rekabetçi, ama çok daha az gerçek etkileşim gerektirir.