00 MQTT nedir ve neden
MQTT, dar bant genişliğinde, yüksek gecikmeli ve güvenilmez ağlarda çalışmak üzere tasarlanmış, makine-makine iletişim protokolüdür.
1999 yılında IBM'den Andy Stanford-Clark ve Arcom'dan Arlen Nipper, Suudi Arabistan'daki petrol boru hatlarını uydu bağlantısı üzerinden izlemek için MQTT'yi geliştirdi. O dönem uydu bağlantıları pahalı, düşük bant genişlikli ve sık kesintiliydi. Protokol, bu kısıtlar altında çalışabilmek için tasarlandı: minimum overhead, broker üzerinden dolaylı iletişim ve bağlantı kesintisine dayanıklılık.
MQTT 2013'te OASIS standardına alındı. Günümüzde milyonlarca IoT cihazı bu protokolü kullanır: akıllı ev sensörlerinden endüstriyel SCADA sistemlerine, araç takip sistemlerinden hastane ekipman izlemeye kadar.
Request-response vs Publish/Subscribe
| Özellik | HTTP (Request-Response) | MQTT (Pub/Sub) |
|---|---|---|
| Bağlantı modeli | Client → Server, tek yönlü | Client → Broker → Client, çift yönlü |
| Veri akışı | Client talep eder (pull) | Broker iter (push) |
| Yeni veri bildirimi | Polling veya long-polling | Anlık — client bekler, broker gönderir |
| Bağlantı sürekliliği | Her istek ayrı bağlantı (HTTP/1.1) veya multiplexed (HTTP/2) | Kalıcı TCP bağlantısı |
| Overhead | Büyük (header, verb, path, version...) | Çok küçük — minimum 2 byte fixed header |
| Gönderici/alıcı bilgisi | Client her iki tarafı bilir | Publisher, subscriber'ı hiç bilmez |
Bağlantı modeli
Sensör A ──publish──▶ BROKER ──route──▶ Dashboard (subscribe: sensors/#) Sensör B ──publish──▶ ──route──▶ Logger (subscribe: sensors/#) Aktüatör ──subscribe──▶ ◀──publish─ Kontrolcü (publish: cmd/actuator)
Portlar
MQTT, TCP üzerinde çalışır. UDP versiyonu yoktur. Protokolün güvenilirliği (QoS 1 ve QoS 2) TCP'nin sağladığı sıralı, kayıpsız byte akışı üzerine inşa edilmiştir. UDP'nin düşük gecikmesi gerekiyorsa CoAP protokolü değerlendirilebilir.
01 MQTT vs HTTP vs gRPC
Doğru protokol seçimi, sistemin bant genişliği tüketimini, gecikme süresini ve uygulama karmaşıklığını doğrudan etkiler.
| Kriter | MQTT 3.1.1 | HTTP/1.1 | gRPC (HTTP/2) |
|---|---|---|---|
| Header overhead | 2 byte minimum | Yüzlerce byte | Orta (binary framing) |
| Bağlantı | Kalıcı TCP | Her istek yeni (veya keep-alive) | Kalıcı HTTP/2 stream |
| Bildirim modeli | Push (broker iletir) | Pull (client polling) | Server streaming ile push |
| Bant genişliği | Çok düşük | Yüksek | Düşük-orta |
| Kesinti toleransı | Yüksek (QoS + session) | Düşük | Orta |
| Embedded uygunluğu | Çok yüksek | Düşük-orta | Düşük (proto derleyici gerekir) |
| Browser desteği | WebSocket ile | Doğal | gRPC-Web ile sınırlı |
| Schema/IDL | Yok — payload serbest | Yok (OpenAPI isteğe bağlı) | Zorunlu (.proto dosyası) |
HTTP polling vs MQTT push
Bir sıcaklık sensöründen 1 dakikada bir veri almak istediğinizi varsayın. HTTP polling ile client her dakika bağlantı kurar, header gönderir, yanıt bekler, bağlantıyı kapatır — her cycle için ~500 byte overhead. MQTT ile sensör sadece veri değiştiğinde yayın yapar; dashboard sürekli bağlı bekler ve sadece yeni mesaj geldiğinde birkaç byte alır.
MQTT 3.1.1 vs 5.0
| Özellik | MQTT 3.1.1 | MQTT 5.0 |
|---|---|---|
| Reason codes | Sadece başarı/hata | Detaylı hata kodları |
| Message expiry | Yok | Mesaj TTL ayarlanabilir |
| User properties | Yok | Anahtar-değer metadata |
| Shared subscriptions | Yok (broker uzantısı) | Standart |
| Request/response pattern | Manuel | Response topic + correlation data |
| Destek | Yaygın (paho default) | Yeni broker'larda mevcut |
paho-mqtt 2.0'da MQTT 5.0 desteği için CallbackAPIVersion.VERSION2 kullanılmalıdır. MQTT 3.1.1 için CallbackAPIVersion.VERSION1 veya eski API kullanılabilir. Bu rehberde her iki versiyonu da gösteren örnekler yer alır.
02 Broker ve client mimarisi
Broker, tüm client bağlantılarını yöneten merkezi bileşendir; subscription tablosunu tutar ve mesajları ilgili subscriber'lara iletir.
Broker'ın görevi
Publisher
publish("sensors/temp", "23.5") ──▶ BROKER
│
subscription tablosu
"sensors/temp" → [Client A, Client B]
"sensors/#" → [Logger, Dashboard]
│
┌─────────┴──────────┐
▼ ▼
Client A Logger
Popüler broker'lar
apt install mosquitto ile kurulur.Temel CLI araçları (mosquitto-clients)
# broker'a bağlan ve TÜM topic'leri dinle (wildcard #)
mosquitto_sub -h localhost -t "#" -v
# -v: topic adını da göster
# sadece belirli bir topic
mosquitto_sub -h localhost -t "sensors/temperature"
# mesaj yayınla
mosquitto_pub -h localhost -t "test/hello" -m "world"
# QoS 1 ile yayınla
mosquitto_pub -h localhost -t "sensors/temp" -m "23.5" -q 1
# retained mesaj
mosquitto_pub -h localhost -t "sensors/temp" -m "23.5" -r
# TLS ile bağlan (8883)
mosquitto_sub -h broker.example.com -p 8883 --cafile ca.crt -t "#"
Client ID önemi
Her client benzersiz bir Client ID ile bağlanmalıdır. Aynı Client ID ile ikinci bir client bağlanırsa broker ilk client'ı kopar (disconnect eder). Bu davranış MQTT spesifikasyonunda tanımlanmıştır; test ortamında sık yapılan bir hatadır. Rastgele ID üretmek için Python'da str(uuid.uuid4()) kullanılabilir.
03 Topic hiyerarşisi
Topic, mesajların yönlendirildiği adresi tanımlar — / ile ayrılmış seviyeli bir yol dizisidir.
Topic yapısı
Topic, forward slash (/) ile ayrılmış string seviyelerinden oluşur. Büyük/küçük harf duyarlıdır: sensors/Temperature ile sensors/temperature farklı topic'lerdir. Boşluk içerebilir ama önerilmez. Uzunluk 65535 UTF-8 byte ile sınırlıdır.
# iyi yapılandırılmış topic hiyerarşisi
home/bedroom/temperature
home/bedroom/humidity
home/living_room/temperature
sensors/device_001/status
factory/line_3/machine_7/vibration
cmd/device_001/set_led
Wildcard'lar
sensors/+/+/temperaturesensors/## sensors/+/temperature → sadece bir seviye
# eşleşir: sensors/bedroom/temperature
# eşleşmez: sensors/floor1/bedroom/temperature
sensors/+/temperature
# home/# → home altındaki her şey
# eşleşir: home/bedroom/temp, home/kitchen, home/living_room/a/b/c
home/#
# tüm status topic'leri (herhangi derinlikte)
mosquitto_sub -t "+/+/status"
mosquitto_sub -t "sensors/#"
$SYS topic'leri
$SYS/ ile başlayan topic'ler broker'ın kendisi tarafından yayınlanır. Broker istatistiklerini, bağlı client sayısını ve mesaj trafiğini içerir. Wildcard ile subscibe edilemez.
# broker istatistikleri
mosquitto_sub -h localhost -t "\$SYS/#" -v
# $SYS/broker/clients/connected → bağlı client sayısı
# $SYS/broker/messages/received → alınan mesaj sayısı
# $SYS/broker/load/bytes/received → gelen byte/sn
Geçerli ve geçersiz topic örnekleri
| Topic | Geçerli? | Açıklama |
|---|---|---|
sensors/temp | Geçerli | Standart iki seviyeli topic |
sensors/+/temp | Geçerli (subscribe) | + wildcard — publish için geçersiz |
sensors/# | Geçerli (subscribe) | # sonda — publish için geçersiz |
sensors/#/temp | Geçersiz | # sonda olmalı |
/sensors/temp | Geçerli ama önerilmez | Başta / — boş ilk seviye oluşturur |
$SYS/info | Özel | Broker iç topic'i — normal publish yapılamaz |
04 QoS seviyeleri
QoS (Quality of Service), publisher ile broker ve broker ile subscriber arasındaki mesaj teslim garantisini tanımlar.
QoS 0 — At most once
En hızlı, en hafif seçenek. Mesaj bir kez gönderilir, alındı onayı beklenmez. Ağ kesintisi veya broker restart'ında mesaj kaybolur. Telemetri verileri için uygundur: sıcaklık her 5 saniyede yayınlanıyorsa bir mesajın kaybolması önemli değildir.
QoS 0: Publisher ──PUBLISH──▶ Broker ──PUBLISH──▶ Subscriber
(fire & forget — ACK yok)
QoS 1 — At least once
Mesaj en az bir kez teslim edilmesi garanti edilir. Broker PUBACK göndermezse publisher yeniden dener. Bu nedenle duplicate mesaj mümkündür. Subscriber tarafında idempotent işleme gerekebilir.
QoS 1: Publisher ──PUBLISH (DUP=0)──▶ Broker ──PUBACK──▶ Publisher
│
(PUBACK gelmezse)
│
Publisher ──PUBLISH (DUP=1)──▶ Broker ──PUBACK──▶ Publisher
Broker ──PUBLISH──▶ Subscriber ──PUBACK──▶ Broker
QoS 2 — Exactly once
4-way handshake ile mesaj tam olarak bir kez teslim edilir. En yüksek güvenilirlik, en yüksek overhead. Finansal işlemler veya kritik komutlar için uygundur.
QoS 2: Publisher ──PUBLISH──▶ Broker ──PUBREC──▶ Publisher
Publisher ──PUBREL──▶ Broker ──PUBCOMP──▶ Publisher
Broker ──PUBLISH──▶ Subscriber ──PUBREC──▶ Broker
Broker ──PUBREL──▶ Subscriber ──PUBCOMP──▶ Broker
Ne zaman hangi QoS?
| Kullanım | Önerilen QoS | Neden |
|---|---|---|
| Sıcaklık / nem telemetrisi | QoS 0 | Kayıp tolere edilir, hız önemli |
| Durum bildirimi (online/offline) | QoS 1 | Teslim garantisi, duplicate tolere edilir |
| Aktüatör komutu (röle aç/kapat) | QoS 1 veya 2 | Komutun ulaşması kritik |
| Ödeme / kritik iş olayı | QoS 2 | Tam olarak bir kez — duplicate kabul edilemez |
Subscribe QoS ve Publish QoS
Subscriber, subscribe ederken bir QoS seviyesi belirtir. Broker, mesajı subscriber'a iletirken min(publish_qos, subscribe_qos) seviyesini kullanır. Publisher QoS 2 ile yayınlamış olsa bile subscriber QoS 0 ile subscribe ettiyse mesajı QoS 0 alır. Bu nedenle kritik mesajlar için hem publisher hem subscriber'ın QoS 1 veya 2 kullanması gerekir.
05 Retained messages
Retained mesaj, broker'ın bir topic için sakladığı "son bilinen değer"dir — yeni bir subscriber bağlandığında geçmişte ne yayınlandığını hemen öğrenir.
Nasıl çalışır?
Publisher retain=True ile bir mesaj yayınladığında broker bu mesajı topic için saklar. Sonradan o topic'e subscribe olan her yeni client bu mesajı bağlanır bağlanmaz alır — mesaj ne zaman gönderilmiş olursa olsun. Bu, "Last Known Value" pattern'inin temelidir.
T=0 Sensör ──retain PUBLISH "23.5"──▶ Broker (saklar)
T=5 Dashboard bağlanır, subscribe("sensors/temp")
T=5 Broker ──PUBLISH "23.5"──▶ Dashboard (anında — T=0'daki değer)
# retained mesaj yayınla
mosquitto_pub -h localhost -t "sensors/temp" -m "23.5" -r
# retained mesajı sil: boş payload ile retain=True
mosquitto_pub -h localhost -t "sensors/temp" -m "" -r
Kullanım senaryoları
devices/sensor001/status topic'inde "online" retained — yeni subscriber anında durumu bilir.config/device001 topic'inden mevcut konfigürasyonu alır.Retained mesajın QoS'u, yayınlandığı QoS seviyesidir. Broker mesajı saklarken QoS bilgisini de korur. Ancak çok sık değişen değerler için retain kullanmak broker üzerinde yük oluşturabilir — yalnızca gerçekten "son bilinen değer" semantiği gereken topic'lerde kullanın.
06 Last Will and Testament (LWT)
LWT, bir client beklenmedik biçimde bağlantıyı kaybettiğinde broker'ın otomatik olarak yayınladığı önceden tanımlanmış mesajdır.
LWT nasıl çalışır?
Client, broker'a bağlanırken (CONNECT paketi) bir "will" mesajı tanımlar: topic, payload, QoS ve retain bayrağı. Eğer client bağlantısını temiz kapatmadan (graceful disconnect olmadan) koparırsa broker bu mesajı otomatik olarak yayınlar. TCP bağlantısının kopması, keepalive timeout'u veya uygulama crash'i bu durumu tetikler.
CONNECT paketi: willTopic="devices/s1/status"
willMessage="offline"
willQoS=1, willRetain=True
│
▼
Normal çalışma...
│
(bağlantı kesildi — crash, timeout, ağ hatası)
│
▼
Broker ──PUBLISH "offline" retain──▶ devices/s1/status topic subscribers
LWT ile sistem sağlık monitörü
import paho.mqtt.client as mqtt
CLIENT_ID = "sensor_001"
WILL_TOPIC = f"devices/{CLIENT_ID}/status"
client = mqtt.Client(client_id=CLIENT_ID)
# bağlanmadan ÖNCE will tanımla
client.will_set(
topic=WILL_TOPIC,
payload="offline",
qos=1,
retain=True
)
client.connect("localhost", 1883)
# başarılı bağlantıda "online" yayınla (retained)
client.publish(WILL_TOPIC, "online", qos=1, retain=True)
LWT vs graceful disconnect
client.disconnect() çağrıldığında broker LWT mesajını yayınlamaz — normal (temiz) bağlantı kapanması olarak değerlendirir. LWT yalnızca beklenmedik kopmalarda tetiklenir. Graceful shutdown'da kendiniz "offline" mesajı yayınlamalısınız: client.publish(WILL_TOPIC, "offline", retain=True) ardından client.disconnect().
07 paho-mqtt Python client
paho-mqtt, Eclipse Foundation'ın Python MQTT client kütüphanesidir — MQTT 3.1.1 ve 5.0 desteği, callback tabanlı asenkron API.
Kurulum
pip install paho-mqtt
# paho-mqtt >= 2.0 önerilir
paho-mqtt 2.0 API farkı
import paho.mqtt.client as mqtt
# paho-mqtt >= 2.0: CallbackAPIVersion belirtmek zorunlu
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id="mydevice_001",
protocol=mqtt.MQTTv311 # veya MQTTv5
)
# paho-mqtt < 2.0 (eski API — deprecated)
client = mqtt.Client(client_id="mydevice_001")
Publisher: sensör simülatörü
import paho.mqtt.client as mqtt
import json
import time
import random
BROKER = "localhost"
PORT = 1883
TOPIC = "sensors/device_001"
CLIENT_ID = "publisher_001"
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print(f"Broker'a bağlandı: {BROKER}:{PORT}")
else:
print(f"Bağlantı hatası: {reason_code}")
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENT_ID
)
client.on_connect = on_connect
# LWT: beklenmedik kopma durumunda "offline" yayınla
client.will_set("devices/device_001/status", "offline", qos=1, retain=True)
client.connect(BROKER, PORT, keepalive=60)
client.loop_start() # arka planda network döngüsü
# bağlantı bildir
client.publish("devices/device_001/status", "online", qos=1, retain=True)
try:
while True:
payload = json.dumps({
"temperature": round(random.uniform(18.0, 30.0), 2),
"humidity": round(random.uniform(40.0, 80.0), 2),
"pressure": round(random.uniform(1000.0, 1020.0), 2),
"timestamp": int(time.time())
})
result = client.publish(TOPIC, payload, qos=1)
result.wait_for_publish()
print(f"Yayınlandı → {TOPIC}: {payload}")
time.sleep(5)
except KeyboardInterrupt:
pass
finally:
client.publish("devices/device_001/status", "offline", qos=1, retain=True)
client.loop_stop()
client.disconnect()
Subscriber: mesaj dinle ve işle
import paho.mqtt.client as mqtt
import json
BROKER = "localhost"
PORT = 1883
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
# bağlantı kurulunca subscribe — yeniden bağlanmada da çalışır
client.subscribe("sensors/#", qos=1)
print("Bağlandı, sensors/# dinleniyor")
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
print(f"[{msg.topic}] sıcaklık={data['temperature']}°C "
f"nem={data['humidity']}% basınç={data['pressure']} hPa")
except (json.JSONDecodeError, KeyError) as e:
print(f"Parse hatası: {e} | ham: {msg.payload}")
def on_disconnect(client, userdata, flags, reason_code, properties):
print(f"Bağlantı kesildi: {reason_code}")
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id="subscriber_dashboard"
)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(BROKER, PORT, keepalive=60)
client.loop_forever() # bloklar, Ctrl+C ile dur
loop_start() vs loop_forever()
TLS bağlantısı
import ssl
import paho.mqtt.client as mqtt
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id="tls_client_001"
)
# CA sertifikası ile doğrulama
client.tls_set(
ca_certs="/etc/ssl/certs/ca.crt",
certfile=None, # mTLS değil — sadece sunucu doğrulama
keyfile=None,
tls_version=ssl.PROTOCOL_TLS_CLIENT
)
# kullanıcı adı / şifre
client.username_pw_set("sensor_user", "güçlü_şifre")
client.connect("broker.example.com", 8883, keepalive=60)
client.loop_forever()
08 Mosquitto broker kurulumu ve TLS
Mosquitto, geliştirme ve küçük ölçekli üretim ortamları için en yaygın kullanılan açık kaynak MQTT broker'ıdır.
Kurulum
# Debian / Ubuntu
apt install mosquitto mosquitto-clients
# durum kontrol
systemctl status mosquitto
# verbose başlatma (test için)
mosquitto -c /etc/mosquitto/mosquitto.conf -v
Minimal konfigürasyon
# plaintext listener — yalnızca localhost
listener 1883 127.0.0.1
# anonymous bağlantıya izin ver (test ortamı)
allow_anonymous true
# log seviyesi
log_type all
log_dest file /var/log/mosquitto/mosquitto.log
# persistent session için mesaj kalıcılığı
persistence true
persistence_location /var/lib/mosquitto/
Parola dosyası ile kimlik doğrulama
# yeni parola dosyası oluştur (-c), kullanıcı ekle
mosquitto_passwd -c /etc/mosquitto/passwd sensor_user
# mevcut dosyaya kullanıcı ekle (dosyayı yeniden oluşturmaz)
mosquitto_passwd /etc/mosquitto/passwd dashboard_user
# kullanıcı sil
mosquitto_passwd -D /etc/mosquitto/passwd eski_kullanici
listener 1883 127.0.0.1
allow_anonymous false
password_file /etc/mosquitto/passwd
TLS listener
# TLS listener — port 8883
listener 8883
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
# mTLS için (client sertifikası zorunlu)
require_certificate true
use_identity_as_username true
allow_anonymous false
password_file /etc/mosquitto/passwd
Self-signed sertifika ile test
# CA key ve sertifika oluştur
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 1826 -key ca.key -out ca.crt \
-subj "/CN=MQTT CA"
# sunucu key ve CSR
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr \
-subj "/CN=localhost"
# CA ile imzala
openssl x509 -req -days 365 -in server.csr \
-CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt
# test bağlantısı
mosquitto_sub -h localhost -p 8883 --cafile ca.crt -t "test/#"
Sistem servisi
# boot'ta başlat ve hemen başlat
systemctl enable --now mosquitto
# config değişikliği sonrası reload
systemctl reload mosquitto
# log izle
journalctl -u mosquitto -f
Üretim ortamında allow_anonymous true kullanmayın. Her client için ayrı kimlik bilgisi tanımlayın. mTLS rehberi ile X.509 sertifika tabanlı kimlik doğrulama için çapraz referans yapabilirsiniz.
09 Pratik: sensör simülatörü ve dashboard subscriber
QoS 1, retained mesaj ve LWT'yi birlikte kullanan tam bir örnek — sadece Python stdlib ve paho-mqtt.
Çalıştırma adımları
# 1. mosquitto başlat (ayrı terminal)
mosquitto -v
# 2. subscriber başlat (ayrı terminal)
python3 dashboard.py
# 3. publisher başlat (ayrı terminal)
python3 sensor_sim.py
# 4. Ctrl+C ile publisher'ı durdur — LWT "offline" yayınlanır
sensor_sim.py — tam publisher
#!/usr/bin/env python3
"""Sensör simülatörü — bağımlılık yok: paho-mqtt + stdlib"""
import json
import random
import time
import paho.mqtt.client as mqtt
BROKER = "localhost"
PORT = 1883
CLIENT_ID = "sensor_sim_001"
BASE = "home/sensors/sim_001"
STATUS = f"devices/{CLIENT_ID}/status"
INTERVAL = 5 # saniye
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
client.publish(STATUS, "online", qos=1, retain=True)
print(f"[sim] Bağlandı → {BROKER}:{PORT}")
else:
print(f"[sim] Bağlantı hatası: {reason_code}")
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENT_ID
)
client.on_connect = on_connect
# LWT: ani kopma → "offline" (retained QoS 1)
client.will_set(STATUS, "offline", qos=1, retain=True)
client.connect(BROKER, PORT, keepalive=30)
client.loop_start()
# broker'a bağlanmak için kısa süre ver
time.sleep(1)
seq = 0
try:
while True:
seq += 1
payload = {
"seq": seq,
"temperature": round(20.0 + random.gauss(0, 1.5), 2),
"humidity": round(55.0 + random.gauss(0, 5.0), 1),
"pressure": round(1013.25 + random.gauss(0, 2.0), 2),
"ts": int(time.time())
}
msg = json.dumps(payload)
# telemetri: QoS 1, retained (son değer her zaman mevcut)
client.publish(f"{BASE}/telemetry", msg, qos=1, retain=True)
print(f"[sim] #{seq:04d} → {BASE}/telemetry")
time.sleep(INTERVAL)
except KeyboardInterrupt:
print("[sim] Kapatılıyor...")
finally:
# graceful disconnect: LWT tetiklenmez, manuel "offline"
client.publish(STATUS, "offline", qos=1, retain=True)
time.sleep(0.5) # mesajın gönderilmesi için bekle
client.loop_stop()
client.disconnect()
print("[sim] Çıkıldı.")
dashboard.py — tam subscriber
#!/usr/bin/env python3
"""Terminal dashboard — sensör verilerini canlı göster"""
import json
import time
import paho.mqtt.client as mqtt
BROKER = "localhost"
PORT = 1883
CLIENT_ID = "dashboard_001"
TOPICS = [
("home/sensors/#", 1), # telemetri
("devices/+/status", 1), # cihaz durumu
]
stats = {"received": 0, "errors": 0}
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
client.subscribe(TOPICS)
print(f"[dash] Bağlandı → {BROKER}:{PORT}")
print(f"[dash] Dinlenen topic'ler: {[t for t, _ in TOPICS]}")
print("-" * 60)
else:
print(f"[dash] Hata: {reason_code}")
def on_message(client, userdata, msg):
ts = time.strftime("%H:%M:%S")
stats["received"] += 1
if "status" in msg.topic:
status = msg.payload.decode()
icon = "🟢" if status == "online" else "🔴"
print(f"[{ts}] {icon} {msg.topic}: {status}")
return
try:
d = json.loads(msg.payload)
print(
f"[{ts}] #{d.get('seq', '?'):04} "
f"🌡 {d['temperature']:6.2f}°C "
f"💧 {d['humidity']:5.1f}% "
f"🔵 {d['pressure']:8.2f} hPa"
)
except (json.JSONDecodeError, KeyError) as e:
stats["errors"] += 1
print(f"[{ts}] PARSE HATASI ({e}): {msg.payload[:80]}")
def on_disconnect(client, userdata, flags, reason_code, properties):
print(f"\n[dash] Bağlantı kesildi (kod={reason_code})")
print(f"[dash] Toplam: {stats['received']} mesaj, {stats['errors']} hata")
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=CLIENT_ID
)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(BROKER, PORT, keepalive=60)
try:
client.loop_forever()
except KeyboardInterrupt:
print("\n[dash] Ctrl+C ile durduruldu.")
Örnek çıktı
[dash] Bağlandı → localhost:1883
[dash] Dinlenen topic'ler: ['home/sensors/#', 'devices/+/status']
------------------------------------------------------------
[09:14:01] 🟢 devices/sensor_sim_001/status: online
[09:14:02] #0001 🌡 21.34°C 💧 58.3% 🔵 1014.83 hPa
[09:14:07] #0002 🌡 19.87°C 💧 52.1% 🔵 1012.44 hPa
[09:14:12] #0003 🌡 22.10°C 💧 60.7% 🔵 1015.21 hPa
[09:14:17] 🔴 devices/sensor_sim_001/status: offline
Bu bölümde öğrendikleriniz
- paho-mqtt CallbackAPIVersion.VERSION2 ile modern API kullanımı
- LWT, retained mesaj ve QoS 1'i aynı uygulamada birleştirme
- on_connect içinde subscribe — yeniden bağlanmada abonelik kaybı önleme
- Graceful shutdown sırasında manuel "offline" yayınlama
- loop_start() ile non-blocking publisher, loop_forever() ile bloklu subscriber