Building a Custom GStreamer Plugin for NVIDIA DeepStream

Building a Custom GStreamer Plugin for NVIDIA DeepStream


a production-ready pipeline for multi-stream video analytics: hardware-accelerated decoding, tracking, on-screen display, and message brokering, all wired through GStreamer. For standard detection models exported to TensorRT, nvinfer handles everything.

However the common case has limits. Vision-language models, custom post-processing, rotated bounding boxes, or the need to hot-swap models at runtime, these are places where nvinfer‘s assumptions break down. Sometimes you have a mature PyTorch inference stack your team has carefully tuned, and you want DeepStream to call that rather than reimplementing it in a config file.

It’s worth noting that for YOLO-family models specifically, DeepStream-Yolo by Marcos Luciano has already done excellent work implementing custom postprocessing in C++. If C++ is on the table, start there. This article takes a different angle: achieving the same result entirely in Python, using a custom GStreamer plugin with pyservicemaker without sacrificing throughput.

The key insight that makes this possible: downstream elements like nvtracker, nvdsosd, and nvmsgconv don’t care which element produced detection metadata. Write to DeepStream’s metadata structure correctly and the rest of the ecosystem works as if nvinfer was never in the picture.

DeepStream Metadata

Every buffer flowing through a DeepStream pipeline carries more than pixel data. From the moment frames pass through nvstreammux, each GstBuffer has an NvDsBatchMeta structure attached to it. The hierarchy is straightforward and can be found in the official documentation.

NvDsBatchMeta
├── NvDsUserMeta                        (batch-level custom metadata)
└── NvDsFrameMeta                       (one per source stream)
    ├── NvDsUserMeta                    (frame-level custom metadata)
    └── NvDsObjectMeta                  (one per detected object)
        ├── NvDsClassifierMeta
        └── NvDsUserMeta                (object-level custom metadata)

NvDsBatchMeta describes the whole batch. Each NvDsFrameMeta corresponds to one source stream and carries frame-level information like the source ID and frame number. Each NvDsObjectMeta represents a single detection, meaning that when our plugin writes detections, we’ll write an NvDsObjectMeta for each one.

The critical thing to understand is that none of this is owned by nvinfer. It’s a shared data contract. Any GStreamer element in the pipeline can read from it, write to it, or both:

  • nvtracker reads object bounding boxes and writes tracking IDs.
  • nvdsosd reads boxes and labels to draw overlays.
  • nvmsgconv reads the whole structure to produce message payloads.

Our custom plugin will simply write detections into this structure the same way nvinfer would and everything downstream picks them up without modification. One important constraint worth understanding before we write any code: NvDsObjectMeta instances cannot be constructed directly from Python. Attempting to instantiate the class raises a No constructor defined! error at runtime.

The reason is architectural. DeepStream manages its metadata objects through memory pools, pre-allocated blocks that get recycled across frames to avoid the overhead of repeated heap allocation and deallocation in a high-throughput pipeline. These pools are owned by NvDsBatchMeta and live on the C side of the boundary. The Python bindings expose access to those pools, but deliberately don’t expose a Python-side constructor, because creating an NvDsObjectMeta outside the pool would bypass the lifecycle management that keeps DeepStream’s memory usage predictable. The correct way to get one is to ask the batch for it: batch_meta.acquire_object_meta(), which hands you a pre-allocated instance from the pool. When the frame is done, DeepStream returns it to the pool automatically.

The Python Bridge: pyservicemaker

To interact with DeepStream’s metadata from Python, we’ll use pyservicemaker, NVIDIA’s current, supported Python SDK for DeepStream. The official documentation covers the basics of pipelines and flows, but stops short of showing how to write and attach metadata from a custom inference element. That’s the gap this article fills.

The key abstraction is BatchMetadataOperator. Subclassing it and implementing handle_metadata(batch_meta) gives you access to the full NvDsBatchMeta for every buffer flowing through the pipeline. From there, iterating frames is just as simple as using batch_meta.frame_items and attaching a detection object.

pyservicemaker also provides a Buffer wrapper around Gst.Buffer that exposes batch_meta directly and, importantly, an extract(batch_id) method that returns a DLPack handle to each frame’s GPU memory. That’s what makes zero-copy inference possible as we can hand the frame straight to TensorRT without ever leaving the GPU.

Rather than using BatchMetadataOperator standalone via a probe, we’ll fold the same pattern directly into our custom plugin’s do_transform_ip method, which gives us control over the element’s lifecycle, properties, and caps negotiation alongside the metadata access. But first, we need to build that plugin.

A Discoverable Python GStreamer Plugin

GStreamer discovers plugins at runtime by scanning directories listed in GST_PLUGIN_PATH. For Python plugins specifically, it looks inside a python/ subdirectory within each of those paths. That means your plugin is just a .py file dropped in the right place, no compilation, no CMake, no shared library. The tradeoff is that the registration pattern is strict and getting it wrong produces silent failures that are genuinely hard to debug.

$GST_PLUGIN_PATH/
└── python/
    └── gstexampleplugin.py   # your plugin

Set GST_PLUGIN_PATH to point at the parent directory and GStreamer will find python/gstexampleplugin.py automatically on the next pipeline run.

The Plugin Skeleton

Here’s the minimal skeleton for a passthrough inference element: it receives batched video buffers, runs inference, attaches metadata, and passes the buffer downstream unmodified.

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GstBase, GObject

import torch
from pyservicemaker import Buffer

GST_PLUGIN_NAME = "gstexampleplugin"

Gst.init(None)

class GstExamplePlugin(GstBase.BaseTransform):

    __gstmetadata__ = (
        'GstExamplePlugin',                     # name
        'Filter/Effect/Video',                  # classification
        'Custom inference element',             # description
        'Your Name'                             # author
    )

    src_format = Gst.Caps.from_string(
        "video/x-raw(memory:NVMM), format=RGB, "
        "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
        "framerate=(fraction)[ 0/1, 2147483647/1 ]"
    )
    sink_format = Gst.Caps.from_string(
        "video/x-raw(memory:NVMM), format=RGB, "
        "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
        "framerate=(fraction)[ 0/1, 2147483647/1 ]"
    )

    src_pad_template = Gst.PadTemplate.new(
        "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format
    )
    sink_pad_template = Gst.PadTemplate.new(
        "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format
    )
    __gsttemplates__ = (src_pad_template, sink_pad_template)

    __gproperties__ = {
        'model-engine': (
            str,
            'TensorRT engine path',
            'Path to the .engine file',
            '',
            GObject.ParamFlags.READWRITE
        ),
        'confidence-threshold': (
            float,
            'Confidence threshold',
            'Minimum confidence to attach a detection',
            0.0, 1.0, 0.5,
            GObject.ParamFlags.READWRITE
        ),
    }

    def __init__(self):
        super().__init__()
        self.model_engine = ''
        self.confidence_threshold = 0.5
        self.engine = None

    def do_get_property(self, prop):
        if prop.name == 'model-engine':
            return self.model_engine
        elif prop.name == 'confidence-threshold':
            return self.confidence_threshold

    def do_set_property(self, prop, value):
        if prop.name == 'model-engine':
            self.model_engine = value
        elif prop.name == 'confidence-threshold':
            self.confidence_threshold = value

    def do_start(self):
        # Load your TensorRT engine here
        self.engine = load_engine(self.model_engine) # This function should be implemented
        return True

    def do_transform_ip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn:
        """In-place transform: attach metadata, pass buffer unchanged."""
        buffer = Buffer(gst_buffer)
        batch_meta = buffer.batch_meta

        frames = []
        for frame_meta in batch_meta.frame_items:
            t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id))
            frames.append(t)
        batch = torch.stack(frames, dim=0)

        # Run your model inference
        results = self.engine(batch)
        
        # Now we will need to iterate over the results for each frame
        # and attach it to the object_meta in case it is detection/segmentation
        # otherwise we can do it as user_meta
        # The following is pseudocode, which depends on your inference
        for frame_meta in batch_meta.frame_items:
            for det in results:
                obj = batch_meta.acquire_object_meta()
                # Fill the obj with each detection
                ...
                frame_meta.append(obj)
 
        return Gst.FlowReturn.OK


# --- Registration ---
GObject.type_register(GstExamplePlugin)
__gstelementfactory__ = (GST_PLUGIN_NAME, Gst.Rank.NONE, GstExamplePlugin)

A few things worth noting about this skeleton:

GstBase.BaseTransform is the right base class for an in-place filter, one that receives a buffer, modifies it (by attaching metadata) and passes it downstream. We override do_transform_ip rather than do_transform because we’re not allocating a new output buffer.

__gstmetadata__ and __gsttemplates__ are not optional. GStreamer won’t register the element without them. The caps string video/x-raw(memory:NVMM) tells GStreamer this element works with NVIDIA memory which is essential for staying on-GPU in a DeepStream pipeline.

__gproperties__ exposes model-engine and confidence-threshold as first-class GStreamer properties, which means you can set them from a gst-launch command line or from Python pipeline code without touching the source.

The last two lines are required for registration: GObject.type_register tells the GObject type system about the class and __gstelementfactory__ tells GStreamer what element name to expose and which class to instantiate.

Verifying the plugin. Once the file is in place and the cache is clear, verify registration with:

GST_PLUGIN_PATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin

You should see the element metadata, pad templates, and both properties listed. If you see them, GStreamer knows about your plugin and you’re ready to drop it into a pipeline.

Example of End-to-End Inference with Ultralytics

With the plugin skeleton in place, it’s time to fill in the inference logic. The full working code is available as a GitHub Gist. Once you have it discoverable, you can inspect it as we did before or launch the pipeline. Here is a simple example that just performs inference and displays the fps:

gst-launch-1.0 -v \
  nvstreammux name=m width=1280 height=720 batch-size=1 \
    batched-push-timeout=33000 ! \
  nvvideoconvert nvbuf-memory-type=0 ! \
  'video/x-raw(memory:NVMM), format=RGB' ! \
  gstyoloplugin model-path=/path/to/yolo26s.engine ! \
  fpsdisplaysink text-overlay=false silent=false sync=false \
    video-sink=fakesink \
  uridecodebin uri=file:///path/to/video.mp4 ! m.sink_0

Inspecting The Code

Compatibility Issue

If you’re reading the code you may have realised that we are overriding the tuple object, but only inside ultralytics.nn.backends.tensorrt module since it is where the problem is. There is a known compatibility edge case between the TensorRT Python bindings and the GStreamer Python wrapper framework (PyGObject) that will crash your pipeline with the infamous message “Segmentation fault (core dumped).” This is why it was needed to create this snippet of code that helps us preserve the intended behaviour:

import ultralytics.nn.backends.tensorrt as trt_backend

_original_tuple = tuple

def safe_tuple(obj):
    if "tensorrt" in type(obj).__module__ and type(obj).__name__ == "Dims":
        return _original_tuple(obj[i] for i in range(len(obj)))
    return _original_tuple(obj)

trt_backend.tuple = safe_tuple

This replaces the tuple reference inside the Ultralytics backend’s namespace in runtime with a version that falls back to index-based access for Dims objects, leaving everything else untouched. It’s not elegant, but it’s surgical and it needs to happen at import time, before any model is instantiated.

The Inference Loop

The inference loop itself is quite straightforward:

  1. Extract the frames from the buffers
  2. Preprocess + inference
  3. Attach the results to the object metadata of each frame if downstream elements of the pipeline are deepstream plugins.

Below, there is the snippet code for zero-copy using DLPack:

frames = []
for frame_meta in batch_meta.frame_items:
    t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id))
    frames.append(t)
batch = torch.stack(frames, dim=0)

Preprocessing the Input

YOLO models, when passing through a torch.Tensor, expect a fixed input shape (N, 3, 640, 640) according to the documentation. However, frames coming off nvstreammux will be whatever resolution your source is. The approach that is used is letterboxing: scale the frame to fit within the target dimensions while preserving aspect ratio, then pad the remaining space. The key insight here is that we can do this entirely on the GPU, across the whole batch at once, without ever touching CPU memory.

With frame extraction, letterboxing, inference and coordinate inversion all happening on the GPU within a single do_transform_ip call, the plugin behaves exactly like nvinfer from the perspective of every downstream element but with the full flexibility of a Python inference stack underneath.

From here, the rest of the DeepStream pipeline takes over: nvtracker assigns IDs, nvdsosd draws overlays and nvmsgconv serializes payloads.

Practical Takeaways and Next Steps

If you’ve followed along this far, you have a working pattern for replacing nvinfer with your own Python inference element, and more importantly, you understand why each piece is the way it is.

The pattern generalizes. Everything described here: the plugin skeleton, the batched preprocessing and the metadata attachment is model-agnostic. Swapping Ultralytics YOLO for Roboflow’s rfdetr is straightforward and the GStreamer and pyservicemaker scaffolding stays identical. The same is true for more exotic architectures: NVIDIA’s own deepstream_reference_apps repository includes a working example of integrating a Vision-Language Model via vLLM using exactly this plugin approach, which it’s worth studying if you’re pushing beyond detection into video understanding.

The full plugin code is available as a GitHub Gist. If you build something on top of it: a different model, a multi-stream setup or a VLM integration, I’d be curious to hear how it goes. Happy coding!



Source link