Skip to main content
Version: 2.0.0

4. Custom Model 이용하는 Pipeline 만들기

Custom Model 만들기에서 만든 Custom Model을 이용하여 Pipeline을 만들어봅니다.
Gstreamer에 대한 이해가 필요하다면 Gstreamer docs를 참고해주세요.

Pipeline 설명

  • Pipeline은 특정 기능을 수행하는 element들의 집합이고 비디오, 오디오 등과 같은 미디어 데이터가 순차적으로 흐르며 처리됩니다.
  • 예제 Pipeline은 GStreamer의 video를 control 하기 위한 elements와 NuFiStreamer의 이미지 병렬 처리 및 출력을 위한 elements를 이용합니다.
    • GStreamer elements: filesrc, decodebin, videoconvert, queue, x264enc, mp4mux, appsink
    • NuFiStreamer elements: nufi

Element 설명

  • 사용되는 nufi element들의 기능을 설명하겠습니다.
    • nufi: model property를 이용하여 영상의 각 이미지 프레임들을 병렬적으로 처리합니다.
      • nufi element에서 구현한 Custom Model을 이용합니다.
      • nufi에 대한 세부정보는 nufi element 문서를 참고하세요.

사용 예시

Drawable MidRectanglePrinter Model 예제

  • MidRectanglePriter 모델은 영상의 중앙에 사각형을 그리는 모델입니다.
import logging as log
import sys
from typing import List, Optional

import cv2
import numpy as np
from gi.repository import GLib, Gst
from numpy import ndarray

import nufi
from nufi.core.postprocess import Box, Drawable, NufiResult
from nufi.models import BaseModel

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


class MidRectanglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")

def process(
self, image: ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self.calculate_center_box(image)
box = Box( # NuFiStreamer에서 제공하는 Drawable 구현체인 Box 이용.
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)
return [NufiResult(drawables=[box])]

def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)

def calculate_center_box(self, image: ndarray) -> List[int]:
height, width = image.shape[:2]

box_width = width // 4
box_height = height // 4

x1 = (width - box_width) // 2
y1 = (height - box_height) // 2

x2 = x1 + box_width
y2 = y1 + box_height

return [x1, y1, x2, y2]


def get_custom_models() -> List[BaseModel]:
return [MidRectanglePrinter(str(i)) for i in range(1, 6)]


def run_sdstreamer(
video_path: str, processed_result_path: str, models: List[BaseModel]
):
log.info("Creating Source")
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")

videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
qos = Gst.ElementFactory.make("qos", "qos")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps = Gst.Caps.from_string("video/x-raw, format=I420")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
filesink = Gst.ElementFactory.make("filesink", "filesink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
qos,
videoconvert2,
capsfilter,
x264enc,
mp4mux,
filesink,
]

if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)

filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", processed_result_path)
capsfilter.set_property("caps", caps)
nufi.set_property("model", models)

log.info("Creating Pipeline")
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)

log.info("Starting pipeline")
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

# cleanup
pipeline.set_state(Gst.State.NULL)
return processed_result_path


def main():
video_path = "./assets/thiland.mp4"
processed_file_path = "./assets/custom_test.mp4"
models = get_custom_models()
print(run_sdstreamer(video_path, processed_file_path, models))


if __name__ == "__main__":
main()

결과 영상

처리 전 영상처리 후 영상
custom-beforerectangle-result

Drawable MidTrianglePrinter Model 예제

  • MidRectanglePriter 모델은 영상의 중앙에 삼각형을 그리는 모델입니다.
import logging as log
import sys
from typing import List, Optional

import cv2
import numpy as np
from gi.repository import GLib, Gst
from numpy import ndarray

import nufi
from nufi.core.postprocess import Box, Drawable, NufiResult
from nufi.models import BaseModel

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


class Triangle(Drawable):
def __init__(
self,
xyxyxy: List[tuple],
color=(0, 0, 0),
thickness=2,
is_draw=True,
):
self.xyxyxy = xyxyxy
self.color = color
self.thickness = thickness
self.is_draw = is_draw

def draw(self, image: ndarray):
if self.is_draw:
triangle_pts = np.array(self.xyxyxy, np.int32)
cv2.polylines(
image,
[triangle_pts],
isClosed=True,
color=self.color,
thickness=self.thickness,
)


class MidTrianglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")

def process(
self, image: ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
points = self.calculate_points(image)
triangle = Triangle(
xyxyxy=points,
color=(255, 0, 0),
thickness=3,
is_draw=True,
)
return [NufiResult(drawables=[triangle])]

def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)

def calculate_points(self, image: ndarray) -> List[int]:
height, width = image.shape[:2]

tri_width = width // 3
tri_height = height // 3

x1 = (width - tri_width) // 2
y1 = height - tri_height

x2 = x1 + tri_width
y2 = y1

x3 = (x1 + x2) // 2
y3 = y2 - tri_height

return [(x1, y1), (x2, y2), (x3, y3)]


def get_custom_models() -> List[BaseModel]:
return [MidRectanglePrinter(str(i)) for i in range(1, 6)]


def run_sdstreamer(
video_path: str, processed_result_path: str, models: List[BaseModel]
):
log.info("Creating Source")
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")

videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
qos = Gst.ElementFactory.make("qos", "qos")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps = Gst.Caps.from_string("video/x-raw, format=I420")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
filesink = Gst.ElementFactory.make("filesink", "filesink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
qos,
videoconvert2,
capsfilter,
x264enc,
mp4mux,
filesink,
]

if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)

filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", processed_result_path)
capsfilter.set_property("caps", caps)
nufi.set_property("model", models)

log.info("Creating Pipeline")
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)

log.info("Starting pipeline")
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

# cleanup
pipeline.set_state(Gst.State.NULL)
return processed_result_path


def main():
video_path = "./assets/thiland.mp4"
processed_file_path = "./assets/custom_test.mp4"
models = get_custom_models()
print(run_sdstreamer(video_path, processed_file_path, models))


if __name__ == "__main__":
main()

결과 영상

처리 전 영상처리 후 영상
custom-beforetriangle-result

NufiResult data property 예제

Class

import logging as log
import sys
from typing import List, Optional

import numpy as np
from gi.repository import GLib, Gst, GstApp

import nufi
from nufi.core.postprocess import Box, NufiResult, SerializableBase
from nufi.models import BaseModel
from nufi.utils.metadata import gst_appsink_meta_get

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


class ImageFrameInfo(SerializableBase):
def __init__(self, width, height):
self.width = width
self.height = height

def __repr__(self) -> str:
return f"ImageFrameInfo(width={self.width}, height={self.height})"


class MidRectanglePrinter(BaseModel):
def __init__(self, id: int):
self.id = id
print(f"Initialize {self.id} rectangle printer")

def process(
self, image: np.ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self._calculate_center_box(image)
height, width = image.shape[:2]
data = ImageFrameInfo(width=width, height=height) # SerializableBase를 상속하는 클래스를 NufiResult에 입력
box = Box(
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)

return [NufiResult(data=data, drawables=[box])]

def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)

def _calculate_center_box(self, image: np.ndarray) -> List[int]:
height, width = image.shape[:2]

box_width = width // 4
box_height = height // 4

x1 = (width - box_width) // 2
y1 = (height - box_height) // 2

x2 = x1 + box_width
y2 = y1 + box_height

return [x1, y1, x2, y2]


def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")

tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)

# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)

# - nufi element models 설정
models = [
MidRectanglePrinter(i) for i in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)

# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)

filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)

tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)

loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)

def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK

metadatasink.connect("new-sample", metadata_print_cb)

# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path


if __name__ == "__main__":
video_path = "./assets/thiland.mp4"
result_file_path = "./assets/meta-data-property-class.mp4"
print(run_sdstreamer(video_path, result_file_path))

결과
Metadata 출력 log
metadata-log
실시간 Metadata 출력 영상
metadata-realtime

Dictionary

import logging as log
import sys
from typing import List, Optional

import numpy as np
from gi.repository import GLib, Gst, GstApp

import nufi
from nufi.core.postprocess import Box, NufiResult, SerializableBase
from nufi.models import BaseModel
from nufi.utils.metadata import gst_appsink_meta_get

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


class MidRectanglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")

def process(
self, image: np.ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self._calculate_center_box(image)
height, width = image.shape[:2]
data = {"width": width, "height": height} # dictionary NufiResult에 입력
box = Box(
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)

return [NufiResult(data=data, drawables=[box])]

def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)

def _calculate_center_box(self, image: np.ndarray) -> List[int]:
height, width = image.shape[:2]

box_width = width // 4
box_height = height // 4

x1 = (width - box_width) // 2
y1 = (height - box_height) // 2

x2 = x1 + box_width
y2 = y1 + box_height

return [x1, y1, x2, y2]


def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")

tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)

# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)

# - nufi element models 설정
models = [
MidRectanglePrinter(i) for i in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)

# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)

filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)

tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)

loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)

def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK

metadatasink.connect("new-sample", metadata_print_cb)

# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path


if __name__ == "__main__":
video_path = "./assets/thiland.mp4"
result_file_path = "./assets/meta-data-property-class.mp4"
print(run_sdstreamer(video_path, result_file_path))

결과
Metadata 출력 log
metadata-log
실시간 Metadata 출력 영상
metadata-realtime

Primitive Type

import logging as log
import sys
from typing import List, Optional

import numpy as np
from gi.repository import GLib, Gst, GstApp

import nufi
from nufi.core.postprocess import Box, NufiResult, SerializableBase
from nufi.models import BaseModel
from nufi.utils.metadata import gst_appsink_meta_get

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


class MidRectanglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")

def process(
self, image: np.ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self._calculate_center_box(image)
data = self.id # primitive type NufiResult에 입력
box = Box(
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)

return [NufiResult(data=data, drawables=[box])]

def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)

def _calculate_center_box(self, image: np.ndarray) -> List[int]:
height, width = image.shape[:2]

box_width = width // 4
box_height = height // 4

x1 = (width - box_width) // 2
y1 = (height - box_height) // 2

x2 = x1 + box_width
y2 = y1 + box_height

return [x1, y1, x2, y2]


def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")

tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)

# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)

# - nufi element models 설정
models = [
MidRectanglePrinter(i) for i in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)

# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)

filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)

tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)

loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)

def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK

metadatasink.connect("new-sample", metadata_print_cb)

# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path


if __name__ == "__main__":
video_path = "./assets/thiland.mp4"
result_file_path = "./assets/meta-data-property-class.mp4"
print(run_sdstreamer(video_path, result_file_path))

결과
Metadata 출력 log
metadata-log
실시간 Metadata 출력 영상
metadata-realtime