Skip to main content
Version: 2.0.0

Warboy Fall Detection

개요

  • nufi.models.BaseModel을 상속합니다.
  • Warboy Yolo Posenet 모델을 이용하는 쓰러짐 탐지 추론 객체입니다.
  • nufi element를 이용하여 파이프라인에서 추론 시 사용합니다.

Methods

  • __init__(self, device: str = "npu11pe0", draw_bboxes=True)
    • 생성자 함수
    • Input
      • device: 이용할 warboy device 이름
        • furiosactl list 명령어를 통해 device 종류를 확인할 수 있고 동일한 device를 중복해서 할당할 수 없습니다.
        • furiosa list를 참고하세요.
      • draw_bboxes: 이미지 상에 box 데이터 출력 여부 (default = True)
  • process(self, image: numpy.ndarray, pipeline_id: int = None)
    • 이미지 프레임을 처리하는 함수
    • Input
      • image: 추론 타겟 이미지
      • pipeline_id: 해당 모델을 이용하는 pipeline의 id (optional). 모델을 여러 파이프라인에서 공유할경우 각 파이프라인을 구분하기위해 사용됩니다.
    • Output
      • List[NufiResult]
        • is_draw: 이미지 상에 NufiResult 출력 여부 (default = True)
        • drawables: 이미지 상에 실제로 그려질 객체 list (default = [])
        • data: 이미지 상에 그려지지 않는 이미지 처리 결과 메타데이터 (default = None)
          • dictionary 형태로 입력.
          • label: object label 문자열 항상 "person"
          • conf: 신뢰도(confidence) 실수값
          • bboxes: 박스좌표("xyxy") list
          • is_fall: object의 쓰러짐 여부
          • keypoints: 사람의 포즈에 관한 keypoint 데이터(list)를 포함하는 list
            • keypoints 내부 정보 나열
            • [pose.nose.x, pose.nose.y]
            • [pose.left_eye.x, pose.left_eye.y]
            • [pose.right_eye.x, pose.right_eye.y]
            • [pose.left_ear.x, pose.left_ear.y]
            • [pose.right_ear.x, pose.right_ear.y]
            • [pose.left_shoulder.x, pose.left_shoulder.y]
            • [pose.right_shoulder.x, pose.right_shoulder.y]
            • [pose.left_elbow.x, pose.left_elbow.y]
            • [pose.right_elbow.x, pose.right_elbow.y]
            • [pose.left_wrist.x, pose.left_wrist.y]
            • [pose.right_wrist.x, pose.right_wrist.y]
            • [pose.left_hip.x, pose.left_hip.y]
            • [pose.right_hip.x, pose.right_hip.y]
            • [pose.left_knee.x, pose.left_knee.y]
            • [pose.right_knee.x, pose.right_knee.y]
            • [pose.left_ankle.x, pose.left_ankle.y]
            • [pose.right_ankle.x, pose.right_ankle.y]
  • clean(self, pipeline_id: int)
    • 파이프라인과 연관된 리소스를 정리하는 함수
    • Input
      • pipeline_id: 해당 모델을 이용하는 element가 속한 pipeline id
    • Output
      • None

사용 예시

  • nufi element를 이용하여 model property에 삽입하여 파이프라인에서 추론 시 사용합니다.
import logging as log
import os
import sys

from gi.repository import GLib, Gst

import nufi
from nufi.models import WarboyFallDetection

nufi.init()


def bus_call(_, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
log.info("End-of-stream")
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
log.warning("Warning: %s: %s", err, debug)
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
log.error("Error: %s: %s", err, debug)
loop.quit()
return True


def run_sdstreamer(video_path, result_file_path):
# 1. Elements 생성
filesrc = Gst.ElementFactory.make("filesrc", "filesrc")
decodebin = Gst.ElementFactory.make("decodebin", "decodebin")

videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
queue = Gst.ElementFactory.make("queue", "queue")

# - nufi element 생성
nufi = Gst.ElementFactory.make("nufi", "warboy-fall-detection")
draw = Gst.ElementFactory.make("draw", "draw")
qos = Gst.ElementFactory.make("qos", "qos")

videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
x264enc = Gst.ElementFactory.make("x264enc", "x264enc")
mp4mux = Gst.ElementFactory.make("mp4mux", "mp4mux")
sink = Gst.ElementFactory.make("filesink", "filesink")

elements = [
filesrc,
decodebin,
videoconvert1,
queue,
nufi,
draw,
qos,
videoconvert2,
x264enc,
mp4mux,
sink,
]

if not all(elements):
log.error("Unable to create Elements")
sys.exit(-1)


# 2. Elements properties 설정
filesrc.set_property("location", video_path)
videoconvert1.set_property("n-threads", 16)
videoconvert2.set_property("n-threads", 16)
sink.set_property("location", result_file_path)

# - nufi element models 설정
npu11pe0 = WarboyFallDetection(device="npu11pe0") # WarboyFallDetection 모델 이용
# 동일한 디바이스를 여러번 할당하지 않기 위해서 같은 인스턴스를 이용하여 list 생성
models = [npu11pe0 for _ in range(1)] # multi processing 이용 시 range iteration 횟수 변경
nufi.set_property("model", models)


# 3. 파이프라인 생성
pipeline = Gst.Pipeline()
pipeline.add(*elements)
for i in range(len(elements) - 1):
if elements[i].name != "decodebin":
if not Gst.Element.link(elements[i], elements[i + 1]):
# if not elements[i].link(elements[i + 1]):
log.error(
"Elements %s and %s could not be linked.",
elements[i].get_name(),
elements[i + 1].get_name(),
)
sys.exit(-1)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

def on_demuxer_pad_added(_, pad):
if not pad.has_current_caps():
log.error("Pad %s has no caps, can't link", pad.get_name())
sys.exit(-1)

caps = pad.get_current_caps()
struct = caps.get_structure(0)
if struct.get_name().startswith("video"):
pad.link(videoconvert1.get_static_pad("sink"))

decodebin.connect("pad-added", on_demuxer_pad_added)


# 4. 파이프라인 시작
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
log.error("Unable to set the pipeline to the playing state.")
sys.exit(-1)
try:
loop.run()
except Exception as e:
log.error("App finished : %s", e)
loop.quit()

pipeline.set_state(Gst.State.NULL)
return result_file_path

if __name__ == "__main__":
video_path = ""
result_file_path = ""
print(run_sdstreamer(video_path, result_file_path))