Skip to main content
Version: 2.0.0

1. Multi Threading 이용하는 Pipeline 만들기

NuFiStreamer를 활용하여 간단한 추론 파이프라인을 만들고 실행해봅니다.
nufi element가 model에 따라 어떻게 처리하는지 살펴봅니다.
Gstreamer에 대한 이해가 필요하다면 Gstreamer docs를 참고해주세요.

Pipeline 설명

  • Pipeline은 특정 기능을 수행하는 element들의 집합이고 비디오, 오디오 등과 같은 미디어 데이터가 순차적으로 흐르며 처리됩니다.
  • 예제 Pipeline은 GStreamer의 video를 control 하기 위한 elements와 NuFiStreamer의 이미지 병렬 처리 및 출력을 위한 elements를 이용합니다.
    • GStreamer elements: filesrc, decodebin, videoconvert, queue, x264enc, mp4mux, filesink
    • NuFiStreamer elements: nufi, draw
  • 예제 Pipeline은 영상 데이터를 받아서 이를 추론하여 객체를 탐지하고 box 및 label을 출력한 결과 영상을 반환합니다.

Element 설명

  • 사용되는 nufi element들의 기능을 설명하겠습니다.
    • nufi: model property를 이용하여 영상의 각 이미지 프레임들을 병렬적으로 처리합니다.
      • nufi = Gst.ElementFactory.make("nufi", "yolo") 해당 라인을 통해서 nufi element를 생성합니다.
      • nufi.set_property("model", models) 해당 라인을 통해서 이미지 프레임을 처리할 model 객체를 삽입합니다.
        • 예제 코드에선 NuFiStreamer에서 기본적으로 제공하는 Yolo 객체를 이용하여 이미지 상에서 객체를 탐지합니다.
      • model property를 models = [Yolo() for _ in range(5)]와 같이 여러 처리 객체로 설정한다면 nufi element 내부적으로 Multi threading을 이용해 영상의 프레임들을 각 처리 객체로 나눠서 처리하여 빠른 이미지 처리가 가능합니다.
      • nufi에 대한 세부정보는 nufi element 문서를 참고하세요.

Model 설명

  • 사용되는 model의 기능을 설명하겠습니다.
    • Yolo: pipeline에 입력된 데이터의 각 프레임을 추론하여 객체를 탐지하는 model입니다.
      • Yolo에 대한 세부정보는 Yolo 문서를 참고하세요.

사용 예시

import logging as log
import os
import sys

from gi.repository import GLib, Gst

import nufi
from nufi.models.yolo import Yolo

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
videoconvert2,
x264enc,
mp4mux,
sink,
]

if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)


# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
sink.set_property("location", result_file_path)

# - nufi element models 설정
models = [Yolo() for _ in range(5)] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)


# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)


# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path


if __name__ == "__main__":
video_path = "./assets/input/video.mp4"
result_file_path = "./assets/result/object-detection.mp4"
print(run_sdstreamer(video_path, result_file_path))

결과

추론 전 영상추론 후 영상
yolo-beforeyolo-result