Skip to main content
Version: 2.0.0

foldersrc

개요

파이프라인에 이미지를 삽입하기 위한 source element입니다.
기존에 비디오를 이용하여 이미지들을 파이프라인에 삽입한 것과 같이 특정 폴더 하위에 이미지들을 전부 읽어와서 파이프라인에 삽입하는 element입니다.
반드시 foldersrc는 모든 element들중 가장 먼저 파이프라인에 위치합니다.

상세 설명

  • 이미지명은 "%Y%m%d_%H%M%S_%f.jpeg"와 같은 형태여야 합니다.
    • %Y: 4자리 연도 (예: 2024)
    • %m: 2자리 월 (01-12)
    • %d: 2자리 일 (01-31)
    • %H: 24시간 형식의 2자리 시간 (00-23)
    • %M: 2자리 분 (00-59)
    • %S: 2자리 초 (00-59)
    • %f: 마이크로초 (000000-999999)
  • 이미지 파일 확장자는 .jpeg, .jpg여야 합니다.

Properties

  • location: 이미지들이 존재하는 디렉토리 path를 삽입합니다.

사용 예시

  • foldersrc element를 이용한 간단한 추론 파이프라인 예시 코드입니다.
    • folder에 존재하는 이미지 파일들을 프레임 단위로 추론하여 객체 탐지 결과 영상을 저장하는 코드입니다.
  • foldersrc element는 모든 element 중 가장 먼저 파이프라인에 위치합니다.
  • nufi.models 하위에 존재하는 inference class 객체를 생성하여 models property에 삽입합니다.
  • run_sdstreamer(video_path, result_file_path) 함수를 이용하여 추론 파이프라인을 생성하고 이를 실행합니다.
    • video_path: 추론에 이용할 타겟 영상 파일 path를 의미합니다.
    • result_file_path: 추론된 결과 영상 파일이 저장될 path를 의미합니다.
  • NuFiStreamer elements를 제외한 다른 elements에 대한 정보가 궁금하다면 Gstreamer docs를 참고해주세요.
import logging as log
import os
import sys

from gi.repository import GLib, Gst

import nufi
from nufi.models.yolo import Yolo

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


def run_sdstreamer(imgs_folder_path, result_file_path):
# 1. Elements 생성
# - foldersrc element 생성
foldersrc = Gst.ElementFactory.make("foldersrc", "foldersrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")

videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
sink = Gst.ElementFactory.make("filesink", "filesink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
videoconvert2,
x264enc,
mp4mux,
sink,
]

if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)


# 2. Elements properties 설정
# - foldersrc element location 설정 - 이미지 폴더 path 입력
foldersrc.set_property("location", imgs_folder_path)

videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
sink.set_property("location", result_file_path)


# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)


# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path
처리 전 디렉토리 이미지 리스트처리 후 영상
thiland-resultthiland-result