import logging as log
import os
import sys
from gi.repository import GLib, Gst, GstApp
import nufi
from nufi.models.yolo import Yolo
from nufi.utils.metadata import gst_appsink_meta_get
nufi.init()
def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True
def run_sdstreamer(video_path, result_file_path):
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")
nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")
elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)
models = [
Yolo() for _ in range(5)
]
nufi.set_property("model", models)
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)
filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)
tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)
caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))
decodebin.connect("pad-added", on_demuxer_pad_added)
def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK
metadatasink.connect("new-sample", metadata_print_cb)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()
pipeline.set_state(Gst.State.NULL)
return result_file_path
if __name__ == "__main__":
video_path = "./assets/input/video.mp4"
result_file_path = "./assets/result/object-detection.mp4"
print(run_sdstreamer(video_path, result_file_path))