Skip to main content
Version: 2.0.0

2. Metadata 가져오는 Pipeline 만들기

NuFiStreamer를 활용하여 Metadata를 가져오는 방법에 대해서 살펴봅니다. nufi element가 model에 따라 어떻게 처리하는지 살펴봅니다.
Gstreamer에 대한 이해가 필요하다면 Gstreamer docs를 참고해주세요.

Metadata 설명

  • Metadata는 nufi element에 의해서 미디어 데이터를 처리하고 그 결과를 의미하고 buffer에 쓰여집니다.
  • Metadata의 data type은 NufiElement이고 이를 buffer에 json 형태로 쓰여집니다.
    • NufiElement는 nufi.core.postprocess 하위에 존재합니다.
    • NufiElement를 이용하는 방법에 대해서는 Custom Model 만들기에서 자세히 설명하겠습니다.
    • NufiResult에 대한 세부정보는 NufiResult 문서를 참고하세요.
  • Metadata를 pipeline buffer에서 가져오기 위해선 Gstreamer appsink element와 NuFiStreamer gst_appsink_meta_get util 함수를 이용합니다.
    • gst_appsink_meta_get 함수는 nufi.utils.metadata 하위에 존재합니다.
  • Metadata에 대한 세부정보는 Metadata 문서를 참고하세요.

Pipeline 설명

  • Pipeline은 특정 기능을 수행하는 element들의 집합이고 비디오, 오디오 등과 같은 미디어 데이터가 순차적으로 흐르며 처리됩니다.
  • 예제 Pipeline은 GStreamer의 video를 control 하기 위한 elements와 NuFiStreamer의 이미지 병렬 처리를 위한 element를 이용합니다.
    • GStreamer elements: filesrc, decodebin, videoconvert, queue, , x264enc, mp4mux, appsink
    • NuFiStreamer elements: nufi
  • 예제 Pipeline은 영상 데이터를 받아서 이를 추론하여 객체를 탐지한 box, label, confidence 데이터를 출력하고 추론된 영상을 저장합니다.

Element 설명

  • 사용되는 nufi element들의 기능을 설명하겠습니다.
    • nufi: model property를 이용하여 영상의 각 이미지 프레임들을 병렬적으로 처리합니다.
      • nufi = Gst.ElementFactory.make("nufi", "yolo") 해당 라인을 통해서 nufi element를 생성합니다.
      • nufi.set_property("model", models) 해당 라인을 통해서 이미지 프레임을 처리할 model 객체를 삽입합니다.
        • 예제 코드에선 NuFiStreamer에서 기본적으로 제공하는 Yolo 객체를 이용하여 이미지 상에서 객체를 탐지합니다.
      • model property를 models = [Yolo() for _ in range(5)]와 같이 여러 처리 객체로 설정한다면 nufi element 내부적으로 Multi threading을 이용해 영상의 프레임들을 각 처리 객체로 나눠서 처리하여 빠른 이미지 처리가 가능합니다.
      • nufi에 대한 세부정보는 nufi element 문서를 참고하세요.
  • 추가적으로 appsink element를 사용하는 이유를 설명하겠습니다.
    • appsink element가 일반적으로 pipeline의 출력을 애플리케이션 코드에서 직접 다루고자 할 때 사용되는 element입니다.
    • appsink.set_property("emit-signals", True) 해당 라인은 appsink element가 signal을 발생시키도록 하기 위해서 해당 property를 True로 설정하는데 이 신호들중 new-sample 신호가 포함됩니다.
    • appsink.connect("new-sample", metadata_print_cb) 해당 라인은 appsink로 새로운 샘플이 도착할 때마다 콜백 함수를 실행해서 실시간으로 metadata를 콜백 함수에서 처리합니다.

Model 설명

  • 사용되는 model의 기능을 설명하겠습니다.
    • Yolo: pipeline에 입력된 데이터의 각 프레임을 추론하여 객체를 탐지하는 model입니다.
      • Yolo에 대한 세부정보는 Yolo 문서를 참고하세요.

사용 예시

  • appsink를 이용해서 buffer에 있는 메타데이터를 application에서 출력해봅니다.
import logging as log
import os
import sys

from gi.repository import GLib, Gst, GstApp

import nufi
from nufi.models.yolo import Yolo
from nufi.utils.metadata import gst_appsink_meta_get

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")

tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)

# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
# - appsink signal 발생 여부 설정
metadatasink.set_property("emit-signals", True)


# - nufi element models 설정
models = [
Yolo() for _ in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)

# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)

filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)

tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)

loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)

def metadata_print_cb(metadatasink: GstApp.AppSink):
# appsink의 buffer에서 metadata를 추출
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata) # log 대신 다른 처리도 가능.
return Gst.FlowReturn.OK

# - callback 함수를 이용하여 실시간으로 appsink로 도착하는 metadata handling
metadatasink.connect("new-sample", metadata_print_cb)

# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path


if __name__ == "__main__":
video_path = "./assets/input/video.mp4"
result_file_path = "./assets/result/object-detection.mp4"
print(run_sdstreamer(video_path, result_file_path))

결과

Metadata 출력 log
metadata-log
실시간 Metadata 출력 영상
metadata-realtime