4. Custom Model 이용하는 Pipeline 만들기
Custom Model 만들기에서 만든 Custom Model을 이용하여 Pipeline을 만들어봅니다.
Gstreamer에 대한 이해가 필요하다면 Gstreamer docs를 참고해주세요.
Pipeline 설명
- Pipeline은 특정 기능을 수행하는 element들의 집합이고 비디오, 오디오 등과 같은 미디어 데이터가 순차적으로 흐르며 처리됩니다.
- 예제 Pipeline은 GStreamer의 video를 control 하기 위한 elements와 NuFiStreamer의 이미지 병렬 처리 및 출력을 위한 elements를 이용합니다.
- GStreamer elements: filesrc, decodebin, videoconvert, queue, x264enc, mp4mux, appsink
- NuFiStreamer elements: nufi
Element 설명
- 사용되는 nufi element들의 기능을 설명하겠습니다.
- nufi: model property를 이용하여 영상의 각 이미지 프레임들을 병렬적으로 처리합니다.
- nufi element에서 구현한 Custom Model을 이용합니다.
- nufi에 대한 세부정보는 nufi element 문서를 참고하세요.
- nufi: model property를 이용하여 영상의 각 이미지 프레임들을 병렬적으로 처리합니다.
사용 예시
Drawable MidRectanglePrinter Model 예제
- MidRectanglePriter 모델은 영상의 중앙에 사각형을 그리는 모델입니다.
import logging as log
import sys
from typing import List, Optional
import cv2
import numpy as np
from gi.repository import GLib, Gst
from numpy import ndarray
import nufi
from nufi.core.postprocess import Box, Drawable, NufiResult
from nufi.models import BaseModel
nufi.init()
def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True
class MidRectanglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")
def process(
self, image: ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self.calculate_center_box(image)
box = Box( # NuFiStreamer에서 제공하는 Drawable 구현체인 Box 이용.
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)
return [NufiResult(drawables=[box])]
def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)
def calculate_center_box(self, image: ndarray) -> List[int]:
height, width = image.shape[:2]
box_width = width // 4
box_height = height // 4
x1 = (width - box_width) // 2
y1 = (height - box_height) // 2
x2 = x1 + box_width
y2 = y1 + box_height
return [x1, y1, x2, y2]
def get_custom_models() -> List[BaseModel]:
return [MidRectanglePrinter(str(i)) for i in range(1, 6)]
def run_sdstreamer(
video_path: str, processed_result_path: str, models: List[BaseModel]
):
log.info("Creating Source")
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")
nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
qos = Gst.ElementFactory.make("qos", "qos")
videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps = Gst.Caps.from_string("video/x-raw, format=I420")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
filesink = Gst.ElementFactory.make("filesink", "filesink")
elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
qos,
videoconvert2,
capsfilter,
x264enc,
mp4mux,
filesink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", processed_result_path)
capsfilter.set_property("caps", caps)
nufi.set_property("model", models)
log.info("Creating Pipeline")
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)
caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))
decodebin.connect("pad-added", on_demuxer_pad_added)
log.info("Starting pipeline")
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()
# cleanup
pipeline.set_state(Gst.State.NULL)
return processed_result_path
def main():
video_path = "./assets/thiland.mp4"
processed_file_path = "./assets/custom_test.mp4"
models = get_custom_models()
print(run_sdstreamer(video_path, processed_file_path, models))
if __name__ == "__main__":
main()
결과 영상
처리 전 영상 | 처리 후 영상 |
---|---|
Drawable MidTrianglePrinter Model 예제
- MidRectanglePriter 모델은 영상의 중앙에 삼각형을 그리는 모델입니다.
import logging as log
import sys
from typing import List, Optional
import cv2
import numpy as np
from gi.repository import GLib, Gst
from numpy import ndarray
import nufi
from nufi.core.postprocess import Box, Drawable, NufiResult
from nufi.models import BaseModel
nufi.init()
def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True
class Triangle(Drawable):
def __init__(
self,
xyxyxy: List[tuple],
color=(0, 0, 0),
thickness=2,
is_draw=True,
):
self.xyxyxy = xyxyxy
self.color = color
self.thickness = thickness
self.is_draw = is_draw
def draw(self, image: ndarray):
if self.is_draw:
triangle_pts = np.array(self.xyxyxy, np.int32)
cv2.polylines(
image,
[triangle_pts],
isClosed=True,
color=self.color,
thickness=self.thickness,
)
class MidTrianglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")
def process(
self, image: ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
points = self.calculate_points(image)
triangle = Triangle(
xyxyxy=points,
color=(255, 0, 0),
thickness=3,
is_draw=True,
)
return [NufiResult(drawables=[triangle])]
def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)
def calculate_points(self, image: ndarray) -> List[int]:
height, width = image.shape[:2]
tri_width = width // 3
tri_height = height // 3
x1 = (width - tri_width) // 2
y1 = height - tri_height
x2 = x1 + tri_width
y2 = y1
x3 = (x1 + x2) // 2
y3 = y2 - tri_height
return [(x1, y1), (x2, y2), (x3, y3)]
def get_custom_models() -> List[BaseModel]:
return [MidRectanglePrinter(str(i)) for i in range(1, 6)]
def run_sdstreamer(
video_path: str, processed_result_path: str, models: List[BaseModel]
):
log.info("Creating Source")
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")
nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
qos = Gst.ElementFactory.make("qos", "qos")
videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
caps = Gst.Caps.from_string("video/x-raw, format=I420")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
filesink = Gst.ElementFactory.make("filesink", "filesink")
elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
qos,
videoconvert2,
capsfilter,
x264enc,
mp4mux,
filesink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", processed_result_path)
capsfilter.set_property("caps", caps)
nufi.set_property("model", models)
log.info("Creating Pipeline")
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)
caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))
decodebin.connect("pad-added", on_demuxer_pad_added)
log.info("Starting pipeline")
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()
# cleanup
pipeline.set_state(Gst.State.NULL)
return processed_result_path
def main():
video_path = "./assets/thiland.mp4"
processed_file_path = "./assets/custom_test.mp4"
models = get_custom_models()
print(run_sdstreamer(video_path, processed_file_path, models))
if __name__ == "__main__":
main()
결과 영상
처리 전 영상 | 처리 후 영상 |
---|---|
NufiResult data property 예제
Class
import logging as log
import sys
from typing import List, Optional
import numpy as np
from gi.repository import GLib, Gst, GstApp
import nufi
from nufi.core.postprocess import Box, NufiResult, SerializableBase
from nufi.models import BaseModel
from nufi.utils.metadata import gst_appsink_meta_get
nufi.init()
def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True
class ImageFrameInfo(SerializableBase):
def __init__(self, width, height):
self.width = width
self.height = height
def __repr__(self) -> str:
return f"ImageFrameInfo(width={self.width}, height={self.height})"
class MidRectanglePrinter(BaseModel):
def __init__(self, id: int):
self.id = id
print(f"Initialize {self.id} rectangle printer")
def process(
self, image: np.ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self._calculate_center_box(image)
height, width = image.shape[:2]
data = ImageFrameInfo(width=width, height=height) # SerializableBase를 상속하는 클래스를 NufiResult에 입력
box = Box(
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)
return [NufiResult(data=data, drawables=[box])]
def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)
def _calculate_center_box(self, image: np.ndarray) -> List[int]:
height, width = image.shape[:2]
box_width = width // 4
box_height = height // 4
x1 = (width - box_width) // 2
y1 = (height - box_height) // 2
x2 = x1 + box_width
y2 = y1 + box_height
return [x1, y1, x2, y2]
def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")
nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")
elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)
# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)
# - nufi element models 설정
models = [
MidRectanglePrinter(i) for i in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)
# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)
filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)
tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)
caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))
decodebin.connect("pad-added", on_demuxer_pad_added)
def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK
metadatasink.connect("new-sample", metadata_print_cb)
# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()
pipeline.set_state(Gst.State.NULL)
return result_file_path
if __name__ == "__main__":
video_path = "./assets/thiland.mp4"
result_file_path = "./assets/meta-data-property-class.mp4"
print(run_sdstreamer(video_path, result_file_path))
결과
Metadata 출력 log |
---|
실시간 Metadata 출력 영상 |
---|
Dictionary
import logging as log
import sys
from typing import List, Optional
import numpy as np
from gi.repository import GLib, Gst, GstApp
import nufi
from nufi.core.postprocess import Box, NufiResult, SerializableBase
from nufi.models import BaseModel
from nufi.utils.metadata import gst_appsink_meta_get
nufi.init()
def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True
class MidRectanglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")
def process(
self, image: np.ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self._calculate_center_box(image)
height, width = image.shape[:2]
data = {"width": width, "height": height} # dictionary NufiResult에 입력
box = Box(
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)
return [NufiResult(data=data, drawables=[box])]
def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)
def _calculate_center_box(self, image: np.ndarray) -> List[int]:
height, width = image.shape[:2]
box_width = width // 4
box_height = height // 4
x1 = (width - box_width) // 2
y1 = (height - box_height) // 2
x2 = x1 + box_width
y2 = y1 + box_height
return [x1, y1, x2, y2]
def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")
nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")
elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)
# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)
# - nufi element models 설정
models = [
MidRectanglePrinter(i) for i in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)
# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)
filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)
tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)
caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))
decodebin.connect("pad-added", on_demuxer_pad_added)
def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK
metadatasink.connect("new-sample", metadata_print_cb)
# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()
pipeline.set_state(Gst.State.NULL)
return result_file_path
if __name__ == "__main__":
video_path = "./assets/thiland.mp4"
result_file_path = "./assets/meta-data-property-class.mp4"
print(run_sdstreamer(video_path, result_file_path))
결과
Metadata 출력 log |
---|
실시간 Metadata 출력 영상 |
---|
Primitive Type
import logging as log
import sys
from typing import List, Optional
import numpy as np
from gi.repository import GLib, Gst, GstApp
import nufi
from nufi.core.postprocess import Box, NufiResult, SerializableBase
from nufi.models import BaseModel
from nufi.utils.metadata import gst_appsink_meta_get
nufi.init()
def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True
class MidRectanglePrinter(BaseModel):
def __init__(self, id):
self.id = id
print(f"Initialize {self.id} rectangle printer")
def process(
self, image: np.ndarray, pipeline_id: Optional[int] = None
) -> List[NufiResult]:
box_points = self._calculate_center_box(image)
data = self.id # primitive type NufiResult에 입력
box = Box(
"rectangle",
box_points,
(255, 0, 0),
is_draw=True,
)
return [NufiResult(data=data, drawables=[box])]
def clean(self, pipeline_id: Optional[int] = None):
return super().clean(pipeline_id)
def _calculate_center_box(self, image: np.ndarray) -> List[int]:
height, width = image.shape[:2]
box_width = width // 4
box_height = height // 4
x1 = (width - box_width) // 2
y1 = (height - box_height) // 2
x2 = x1 + box_width
y2 = y1 + box_height
return [x1, y1, x2, y2]
def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")
nufi = Gst.ElementFactory.make("nufi", "yolo")
draw = Gst.ElementFactory.make("draw", "draw")
videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
tee = Gst.ElementFactory.make("tee", "tee")
queue_file = Gst.ElementFactory.make("queue", "queue_file")
queue_meta = Gst.ElementFactory.make("queue", "queue_meta")
filesink = Gst.ElementFactory.make("filesink", "filesink")
metadatasink = Gst.ElementFactory.make("appsink", "metadatasink")
elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
videoconvert2,
x264enc,
mp4mux,
tee,
queue_file,
queue_meta,
filesink,
metadatasink,
]
if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)
# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
filesink.set_property("location", result_file_path)
metadatasink.set_property("emit-signals", True)
# - nufi element models 설정
models = [
MidRectanglePrinter(i) for i in range(5)
] # multi threading 개수 높이려면 iter count 증가시키기
nufi.set_property("model", models)
# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
for element in elements:
pipeline.add(element)
filesrc.link(decodebin)
Gst.Element.link_many(
videoconvert1, queue, nufi, draw, videoconvert2, x264enc, mp4mux, tee
)
tee.link(queue_file)
queue_file.link(filesink)
tee.link(queue_meta)
queue_meta.link(metadatasink)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)
caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))
decodebin.connect("pad-added", on_demuxer_pad_added)
def metadata_print_cb(metadatasink: GstApp.AppSink):
metadata = gst_appsink_meta_get(metadatasink)
log.debug(metadata)
return Gst.FlowReturn.OK
metadatasink.connect("new-sample", metadata_print_cb)
# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()
pipeline.set_state(Gst.State.NULL)
return result_file_path
if __name__ == "__main__":
video_path = "./assets/thiland.mp4"
result_file_path = "./assets/meta-data-property-class.mp4"
print(run_sdstreamer(video_path, result_file_path))
결과
Metadata 출력 log |
---|
실시간 Metadata 출력 영상 |
---|