00 GStreamer + AI mimarisi — genel bakış
Video analitik, nesne tespiti ve görüntü sınıflandırma gibi AI görevlerini gerçek zamanlı pipeline'a entegre etmenin birden fazla yolu vardır. GStreamer'ın modüler yapısı, bu entegrasyonu kademeli ya da tam gömülü biçimde yapmanıza olanak tanır.
GStreamer pipeline'ına AI eklemenin üç ana yaklaşımı vardır:
Yaklaşım 1 — appsink/appsrc köprüsü:
v4l2src → decode → videoconvert → appsink
↓ (Python)
TFLite/ONNX
↓
display ← videoconvert ← appsrc ← overlay
Yaklaşım 2 — NNStreamer:
v4l2src → videoconvert → tensor_converter
→ tensor_filter (TFLite)
→ tensor_decoder → sink
Yaklaşım 3 — DeepStream:
nvurisrcbin → nvstreammux → nvinfer (TensorRT)
→ nvtracker → nvdsosd → nveglglessink
Hangi yaklaşımı seçeceğiniz donanıma, gecikme gereksinimlerine ve model karmaşıklığına bağlıdır. Bu rehber üç yaklaşımı da ele almakta ve pratik kod örnekleri sunmaktadır.
01 appsink ile kare yakalama ve TFLite inference
appsink elementi, GStreamer pipeline'ından kareleri uygulama kodunuza aktarmanın standart yoludur. Callback veya pull modunda çalışabilir; gömülü sistemlerde pull modu tercih edilir.
Bağımlılıklar
sudo apt install python3-gi gir1.2-gstreamer-1.0 \
gir1.2-gst-plugins-base-1.0
pip3 install tflite-runtime numpy
Python ile appsink — pull modu
import gi, numpy as np, tflite_runtime.interpreter as tflite
gi.require_version('Gst', '1.0')
from gi.repository import Gst
Gst.init(None)
PIPELINE_DESC = (
"v4l2src device=/dev/video0 ! "
"video/x-raw,width=640,height=480,framerate=30/1 ! "
"videoconvert ! video/x-raw,format=RGB ! "
"videoscale ! video/x-raw,width=224,height=224 ! "
"appsink name=sink emit-signals=false max-buffers=1 drop=true"
)
pipeline = Gst.parse_launch(PIPELINE_DESC)
sink = pipeline.get_by_name("sink")
interpreter = tflite.Interpreter(model_path="mobilenet_v2.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
pipeline.set_state(Gst.State.PLAYING)
try:
while True:
sample = sink.emit("pull-sample")
if sample is None:
break
buf = sample.get_buffer()
success, map_info = buf.map(Gst.MapFlags.READ)
if not success:
continue
frame = np.frombuffer(map_info.data, dtype=np.uint8)
frame = frame.reshape((224, 224, 3))
buf.unmap(map_info)
inp = np.expand_dims(frame.astype(np.float32) / 255.0, axis=0)
interpreter.set_tensor(input_details[0]['index'], inp)
interpreter.invoke()
output = interpreter.get_tensor(output_details[0]['index'])
top_class = int(np.argmax(output[0]))
confidence = float(output[0][top_class])
print(f"Class: {top_class} Confidence: {confidence:.3f}")
finally:
pipeline.set_state(Gst.State.NULL)
max-buffers=1 drop=true kombinasyonu, inference modeliniz pipeline hızından yavaş olduğunda kare kuyruğunun dolmasını engeller; her zaman en güncel kare işlenir.
C API ile appsink — callback modu
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
static GstFlowReturn new_sample_cb(GstAppSink *sink, gpointer user_data)
{
GstSample *sample = gst_app_sink_pull_sample(sink);
GstBuffer *buffer = gst_sample_get_buffer(sample);
GstMapInfo map;
if (gst_buffer_map(buffer, &map, GST_MAP_READ)) {
/* map.data → RGB frame (224x224x3) */
/* run_inference(map.data, map.size); */
gst_buffer_unmap(buffer, &map);
}
gst_sample_unref(sample);
return GST_FLOW_OK;
}
int main(int argc, char **argv)
{
gst_init(&argc, &argv);
GstElement *pipeline = gst_parse_launch(
"v4l2src ! videoconvert ! "
"video/x-raw,format=RGB,width=224,height=224 ! "
"appsink name=sink emit-signals=true sync=false", NULL);
GstAppSink *sink = GST_APP_SINK(
gst_bin_get_by_name(GST_BIN(pipeline), "sink"));
GstAppSinkCallbacks cbs = { .new_sample = new_sample_cb };
gst_app_sink_set_callbacks(sink, &cbs, NULL, NULL);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
GstBus *bus = gst_element_get_bus(pipeline);
gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE,
GST_MESSAGE_ERROR | GST_MESSAGE_EOS);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
return 0;
}
02 appsrc ile overlay kareleri pipeline'a geri enjekte etme
Inference sonuçlarını (bounding box, etiket, overlay) orijinal video akışına yerleştirip ekrana yansıtmak için appsrc kullanılır. Bu "köprü" mimarisi çift pipeline yapısıyla kurulur.
import gi, numpy as np, cv2
gi.require_version('Gst', '1.0')
from gi.repository import Gst
Gst.init(None)
W, H, FPS = 640, 480, 30
src_pipe = Gst.parse_launch(
f"v4l2src device=/dev/video0 ! "
f"video/x-raw,width={W},height={H},framerate={FPS}/1 ! "
"videoconvert ! video/x-raw,format=BGR ! "
"appsink name=src_sink max-buffers=1 drop=true"
)
dst_pipe = Gst.parse_launch(
f"appsrc name=dst_src format=time "
f"caps=video/x-raw,format=BGR,width={W},height={H},"
f"framerate={FPS}/1 ! "
"videoconvert ! autovideosink sync=false"
)
src_sink = src_pipe.get_by_name("src_sink")
dst_src = dst_pipe.get_by_name("dst_src")
src_pipe.set_state(Gst.State.PLAYING)
dst_pipe.set_state(Gst.State.PLAYING)
pts = 0
frame_dur = Gst.SECOND // FPS
while True:
sample = src_sink.emit("pull-sample")
if sample is None:
break
buf = sample.get_buffer()
ok, m = buf.map(Gst.MapFlags.READ)
if not ok:
continue
frame = np.frombuffer(m.data, dtype=np.uint8).reshape(H, W, 3).copy()
buf.unmap(m)
# Inference sonucu — örnek bounding box
cv2.rectangle(frame, (50, 50), (200, 200), (0, 255, 0), 2)
cv2.putText(frame, "person 0.92", (52, 45),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
out_buf = Gst.Buffer.new_wrapped(frame.tobytes())
out_buf.pts = pts
out_buf.duration = frame_dur
pts += frame_dur
dst_src.emit("push-buffer", out_buf)
src_pipe.set_state(Gst.State.NULL)
dst_pipe.set_state(Gst.State.NULL)
Bu yaklaşımın kısıtı her karede Python — GStreamer arasında bellek kopyasıdır. 1080p 60fps için NNStreamer veya DeepStream tercih edilmelidir.
03 NNStreamer — ML pipeline as GStreamer
NNStreamer, Samsung Research tarafından geliştirilen açık kaynaklı GStreamer eklenti setidir. TFLite, ONNX Runtime, PyTorch, Caffe, SNAP ve ARMNN framework'lerini doğrudan GStreamer elementi olarak sunar.
Kurulum
sudo add-apt-repository ppa:nnstreamer/ppa
sudo apt update
sudo apt install nnstreamer nnstreamer-tensorflow2-lite \
nnstreamer-onnxruntime python3-nnstreamer
# Kontrol
gst-inspect-1.0 tensor_converter
gst-inspect-1.0 tensor_filter
gst-inspect-1.0 tensor_sink
NNStreamer temel elementleri
gst-launch ile MobileNet sınıflandırma
gst-launch-1.0 \
v4l2src device=/dev/video0 ! \
video/x-raw,width=640,height=480,framerate=30/1 ! \
videoscale ! video/x-raw,width=224,height=224 ! \
videoconvert ! video/x-raw,format=RGB ! \
tensor_converter ! \
tensor_filter framework=tensorflow2-lite \
model=/opt/models/mobilenet_v2_1.0_224_quant.tflite ! \
tensor_sink name=res signal-rate=10
04 NNStreamer tensor_filter ve özel model yükleme
tensor_filter elementi, ML modelinin hangi framework üzerinde, hangi donanım hızlandırıcıyla çalışacağını belirlemenizi sağlar. Raspberry Pi 4, Coral USB Accelerator ve NPU entegrasyonları bu element üzerinden yapılır.
YOLO nesne tespiti pipeline'ı
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import numpy as np
Gst.init(None)
PIPELINE = (
"v4l2src device=/dev/video0 ! "
"video/x-raw,width=640,height=480,framerate=15/1 ! "
"tee name=t "
"t. ! queue ! videoconvert ! autovideosink sync=false "
"t. ! queue ! "
" videoscale ! video/x-raw,width=416,height=416 ! "
" videoconvert ! video/x-raw,format=RGB ! "
" tensor_converter ! "
" tensor_filter "
" framework=tensorflow2-lite "
" model=/opt/models/yolov5s_416.tflite "
" accelerator=true:cpu "
" custom=NumThreads:4 ! "
" tensor_sink name=result emit-signal=true"
)
pipeline = Gst.parse_launch(PIPELINE)
result_sink = pipeline.get_by_name("result")
def on_new_data(sink, buf):
ok, m = buf.map(Gst.MapFlags.READ)
if ok:
data = np.frombuffer(m.data, dtype=np.float32)
detections = data.reshape(1, -1, 85)[0]
mask = detections[:, 4] > 0.5
boxes = detections[mask]
for box in boxes[:5]:
cx, cy, w, h = box[:4]
conf = box[4]
cls = int(np.argmax(box[5:]))
print(f"cls={cls} conf={conf:.2f} cx={cx:.1f} cy={cy:.1f}")
buf.unmap(m)
result_sink.connect("new-data", on_new_data)
pipeline.set_state(Gst.State.PLAYING)
loop = GLib.MainLoop()
try:
loop.run()
finally:
pipeline.set_state(Gst.State.NULL)
Coral Edge TPU ile hızlandırma
echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" \
| sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
sudo apt update && sudo apt install libedgetpu1-std
# NNStreamer ile Edge TPU kullanımı
gst-launch-1.0 \
filesrc location=test.jpg ! jpegdec ! \
videoscale ! video/x-raw,width=224,height=224 ! \
videoconvert ! video/x-raw,format=RGB ! \
tensor_converter ! \
tensor_filter framework=tensorflow2-lite \
model=mobilenet_v2_edgetpu.tflite \
accelerator=true:edgetpu ! \
tensor_sink
05 NVIDIA DeepStream — nvinfer element
NVIDIA DeepStream SDK, Jetson Nano/Xavier/Orin ve x86 GPU'larda gerçek zamanlı video analitik için optimize edilmiş GStreamer tabanlı framework'tür. TensorRT aracılığıyla INT8/FP16 inference sağlar.
DeepStream kurulumu (Jetson JetPack 6.x)
sudo apt install deepstream-7.0
deepstream-app --version
# deepstream-app version 7.0.0
cd /opt/nvidia/deepstream/deepstream/lib
sudo python3 setup.py install
nvinfer yapılandırma dosyası
[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-engine-file=yolov5s_b1_gpu0_int8.engine
labelfile-path=labels.txt
batch-size=1
network-mode=1 # 0=FP32, 1=INT8, 2=FP16
num-detected-classes=80
interval=0
gie-unique-id=1
input-dims=3;640;640;0 # C;H;W;format(0=NCHW)
[class-attrs-all]
threshold=0.5
Minimal DeepStream pipeline
import sys, gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import pyds
Gst.init(None)
pipeline = Gst.Pipeline()
def make(factory, name):
el = Gst.ElementFactory.make(factory, name)
if not el:
sys.exit(f"Element yaratılamadı: {factory}")
pipeline.add(el)
return el
src = make("nvurisrcbin", "src")
mux = make("nvstreammux", "mux")
infer = make("nvinfer", "infer")
tracker = make("nvtracker", "tracker")
osd = make("nvdsosd", "osd")
converter = make("nvegltransform", "converter")
sink = make("nveglglessink", "sink")
src.set_property("uri", "file:///opt/test/sample_720p.mp4")
mux.set_property("width", 1280)
mux.set_property("height", 720)
mux.set_property("batch-size", 1)
mux.set_property("batched-push-timeout", 4000000)
infer.set_property("config-file-path", "config_infer_primary.txt")
src.connect("pad-added", lambda s, pad:
pad.link(mux.get_request_pad("sink_0")))
Gst.Element.link_many(mux, infer, tracker, osd, converter, sink)
pipeline.set_state(Gst.State.PLAYING)
GLib.MainLoop().run()
06 DeepStream metadata ve nvdsosd ile overlay
DeepStream, inference sonuçlarını GStreamer buffer metadata'sına (NvDsMeta) yazar. Downstream elementler bu metadata'ya erişerek bounding box çizimi ve istatistik toplama yapar.
Probe ile metadata okuma
import pyds
def osd_sink_pad_buffer_probe(pad, info, u_data):
gst_buffer = info.get_buffer()
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
frame_list = batch_meta.frame_meta_list
while frame_list is not None:
frame_meta = pyds.NvDsFrameMeta.cast(frame_list.data)
obj_list = frame_meta.obj_meta_list
while obj_list is not None:
obj_meta = pyds.NvDsObjectMeta.cast(obj_list.data)
class_id = obj_meta.class_id
obj_label = obj_meta.obj_label
confidence = obj_meta.confidence
track_id = obj_meta.object_id
rect = obj_meta.rect_params
print(f"Frame={frame_meta.frame_num} "
f"ID={track_id} cls={obj_label}({class_id}) "
f"conf={confidence:.2f} "
f"bbox=[{rect.left:.0f},{rect.top:.0f},"
f"{rect.width:.0f},{rect.height:.0f}]")
# Overlay rengi özelleştir (yeşil)
obj_meta.rect_params.border_color.set(0.0, 1.0, 0.0, 1.0)
obj_meta.rect_params.border_width = 3
obj_list = obj_list.next
frame_list = frame_list.next
return Gst.PadProbeReturn.OK
osd = pipeline.get_by_name("osd")
osd_pad = osd.get_static_pad("sink")
osd_pad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
Çoklu akım konfigürasyonu
[application]
enable-perf-measurement=1
perf-measurement-interval-sec=2
[tiled-display]
enable=1
rows=2
columns=2
width=1280
height=720
[source0]
enable=1
type=4
uri=rtsp://cam1.local/stream
[source1]
enable=1
type=4
uri=rtsp://cam2.local/stream
[primary-gie]
enable=1
batch-size=4
model-engine-file=yolov5s_b4_int8.engine
config-file=config_infer_primary.txt
deepstream-app -c ds_4stream.txt
07 Özel GStreamer inference element yazma
Standart elementler ihtiyacınızı karşılamadığında, doğrudan C ile özel bir GStreamer transform elementi yazabilirsiniz. Bu element video/x-raw buffer alır, inference uygular ve overlay ile döndürür.
GstVideoFilter tabanlı element iskeleti
#include <gst/gst.h>
#include <gst/video/gstvideofilter.h>
#define GST_TYPE_INFERENCE_FILTER (gst_inference_filter_get_type())
G_DECLARE_FINAL_TYPE(GstInferenceFilter, gst_inference_filter,
GST, INFERENCE_FILTER, GstVideoFilter)
struct _GstInferenceFilter {
GstVideoFilter base;
gchar *model_path;
gpointer interpreter; /* TFLite Interpreter* */
};
static GstStaticPadTemplate sink_tmpl = GST_STATIC_PAD_TEMPLATE("sink",
GST_PAD_SINK, GST_PAD_ALWAYS,
GST_STATIC_CAPS("video/x-raw,format=RGB,width=[1,4096],height=[1,4096]"));
static GstStaticPadTemplate src_tmpl = GST_STATIC_PAD_TEMPLATE("src",
GST_PAD_SRC, GST_PAD_ALWAYS,
GST_STATIC_CAPS("video/x-raw,format=RGB,width=[1,4096],height=[1,4096]"));
/* In-place dönüşüm: buffer üzerinde doğrudan çalış */
static GstFlowReturn
gst_inference_filter_transform_ip(GstVideoFilter *filter, GstVideoFrame *frame)
{
GstInferenceFilter *self = GST_INFERENCE_FILTER(filter);
guint8 *data = GST_VIDEO_FRAME_PLANE_DATA(frame, 0);
gint width = GST_VIDEO_FRAME_WIDTH(frame);
gint height = GST_VIDEO_FRAME_HEIGHT(frame);
/* run_tflite_inference(self->interpreter, data, width, height); */
/* draw_bounding_boxes(data, width, height, results); */
(void)data; (void)width; (void)height; (void)self;
return GST_FLOW_OK;
}
static gboolean plugin_init(GstPlugin *plugin)
{
return gst_element_register(plugin, "inferencefilt",
GST_RANK_NONE,
GST_TYPE_INFERENCE_FILTER);
}
GST_PLUGIN_DEFINE(GST_VERSION_MAJOR, GST_VERSION_MINOR,
inferencefilt, "Custom Inference Filter",
plugin_init, "1.0", "LGPL", "custom", "https://example.com")
Derleme (Meson)
project('gst-inference-plugin', 'c')
gst_dep = dependency('gstreamer-1.0')
gstvideo_dep = dependency('gstreamer-video-1.0')
shared_library('gstinferencefilt',
'gst_inference_filter.c',
dependencies: [gst_dep, gstvideo_dep],
install: true,
install_dir: get_option('libdir') / 'gstreamer-1.0'
)
meson setup build && ninja -C build && sudo ninja -C build install
gst-inspect-1.0 inferencefilt
08 Zero-copy GPU pipeline — CUDA + DMA-BUF
Yüksek çözünürlüklü video akışlarında CPU — GPU bellek kopyaları ciddi darboğaz oluşturur. Zero-copy pipeline'lar, kamera donanımından GPU belleğine doğrudan DMA-BUF veya NVIDIA NVMM kullanır.
Jetson'da NVMM tabanlı zero-copy
# nvv4l2camerasrc kamerayı doğrudan GPU belleğine bağlar
gst-launch-1.0 \
nvv4l2camerasrc device=/dev/video0 ! \
"video/x-raw(memory:NVMM),width=1920,height=1080,framerate=30/1,format=NV12" ! \
nvvidconv ! \
"video/x-raw(memory:NVMM),format=RGBA" ! \
nvinfer config-file-path=config_infer_primary.txt ! \
nvdsosd ! \
nvegltransform ! \
nveglglessink
memory:NVMM caps özelliği buffer'ın GPU DRAM'inde tutulduğunu belirtir. NVIDIA elementleri bu belleği CPU'ya kopyalamadan doğrudan işler.
V4L2 + DMA-BUF ile x86 GPU
# io-mode=dmabuf: kamera buffer'ları kernel alanında kalır
gst-launch-1.0 \
v4l2src io-mode=dmabuf device=/dev/video0 ! \
"video/x-raw,format=NV12,width=1920,height=1080" ! \
glupload ! \
"video/x-raw(memory:GLMemory),format=RGBA" ! \
gleffects effect=0 ! \
gldownload ! \
appsink name=sink
Bellek kopyası karşılaştırması — 1080p@30fps
Yöntem Gecikme (ms) CPU % Bant (GB/s)
───────────────────────────────────────────────────────
CPU kare kopyalama 8–15 35 3.2
appsink/appsrc köprüsü 12–25 45 6.4
DMA-BUF zero-copy 1–3 5 0.0
NVMM (Jetson) 0.5–2 3 0.0
09 Performans kıyaslama ve optimizasyon ipuçları
GStreamer AI pipeline'ını üretim ortamına taşımadan önce sistematik kıyaslama ve profilleme yapılmalıdır. Darboğazın decode, inference ya da postprocess aşamasında mı olduğunu belirlemek optimizasyonun başlangıç noktasıdır.
FPS ölçümü
gst-launch-1.0 \
v4l2src device=/dev/video0 ! \
videoconvert ! video/x-raw,format=RGB,width=224,height=224 ! \
tensor_converter ! \
tensor_filter framework=tensorflow2-lite \
model=mobilenet_v2_quant.tflite ! \
fpsdisplaysink video-sink=fakesink sync=false text-overlay=false
GST_DEBUG ile profilleme
export GST_DEBUG="tensor_filter:5"
export GST_DEBUG_FILE=/tmp/gst_debug.log
# Pipeline çalıştır, sonra analiz et:
grep "invoke" /tmp/gst_debug.log | awk '{print $7}' | sort -n | tail -20
Optimizasyon kontrol listesi
Kıyaslama sonuçları — Raspberry Pi 5 (Cortex-A76)
Model Format Çalışma Yeri FPS Gecikme CPU%
──────────────────────────────────────────────────
FP32 TFLite CPU (1 thread) 8 125ms 95
FP32 TFLite CPU (4 thread) 22 45ms 85
INT8 TFLite CPU (4 thread) 58 17ms 70
INT8 TFLite Coral USB 120 8ms 12
ONNX Runtime CPU (4 thread) 35 28ms 80