Commit 24460ef5 authored by das-qa's avatar das-qa
Browse files

Add python demo: deepstream-test1, deepstream-test2, deepstream-test3 and deepstream-test4

parent 77e9fc8c
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import gi
import sys
gi.require_version('Gst', '1.0')
from gi.repository import Gst
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import platform
from threading import Lock
from cuda.bindings import runtime
from cuda.bindings import driver
guard_platform_info = Lock()
class PlatformInfo:
def __init__(self):
self.is_wsl_system = False
self.wsl_verified = False
self.is_integrated_gpu_system = False
self.is_integrated_gpu_verified = False
self.is_aarch64_platform = False
self.is_aarch64_verified = False
def is_wsl(self):
with guard_platform_info:
# Check if its already verified as WSL system or not.
if not self.wsl_verified:
try:
# Open /proc/version file
with open("/proc/version", "r") as version_file:
# Read the content
version_info = version_file.readline()
version_info = version_info.lower()
self.wsl_verified = True
# Check if "microsoft" is present in the version information
if "microsoft" in version_info:
self.is_wsl_system = True
except Exception as e:
print(f"ERROR: Opening /proc/version failed: {e}")
return self.is_wsl_system
def is_integrated_gpu(self):
#Using cuda apis to identify whether integrated/discreet
#This is required to distinguish Tegra and ARM_SBSA devices
with guard_platform_info:
#Cuda initialize
if not self.is_integrated_gpu_verified:
cuda_init_result, = driver.cuInit(0)
if cuda_init_result == driver.CUresult.CUDA_SUCCESS:
#Get cuda devices count
device_count_result, num_devices = driver.cuDeviceGetCount()
if device_count_result == driver.CUresult.CUDA_SUCCESS:
#If atleast one device is found, we can use the property from
#the first device
if num_devices >= 1:
#Get properties from first device
property_result, properties = runtime.cudaGetDeviceProperties(0)
if property_result == runtime.cudaError_t.cudaSuccess:
print("Is it Integrated GPU? :", properties.integrated)
self.is_integrated_gpu_system = properties.integrated
self.is_integrated_gpu_verified = True
else:
print("ERROR: Getting cuda device property failed: {}".format(property_result))
else:
print("ERROR: No cuda devices found to check whether iGPU/dGPU")
else:
print("ERROR: Getting cuda device count failed: {}".format(device_count_result))
else:
print("ERROR: Cuda init failed: {}".format(cuda_init_result))
return self.is_integrated_gpu_system
def is_platform_aarch64(self):
#Check if platform is aarch64 using uname
if not self.is_aarch64_verified:
if platform.uname()[4] == 'aarch64':
self.is_aarch64_platform = True
self.is_aarch64_verified = True
return self.is_aarch64_platform
sys.path.append('/opt/nvidia/deepstream/deepstream/lib')
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import ctypes
import sys
sys.path.append('/opt/nvidia/deepstream/deepstream/lib')
def long_to_uint64(l):
value = ctypes.c_uint64(l & 0xffffffffffffffff).value
return value
......@@ -22,9 +22,8 @@ sys.path.append('../')
import os
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from gi.repository import Gst
# from common.platform_info import PlatformInfo
# from common.bus_call import bus_call
import pyds
......@@ -34,19 +33,34 @@ PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_BATCH_TIMEOUT_USEC = 33000
def osd_sink_pad_buffer_probe(pad,info,u_data):
frame_number=0
num_rects=0
def osd_sink_pad_buffer_probe(pad, info, u_data):
"""nvdsosd sink BUFFER probe.
注意:不要在这里改写 obj_meta 的几何/rect_params 等(例如 border_color)。
deepstream_test_2_fileout 的 probe 只做只读统计 + display_meta,因此稳定;
原 test1 在 probe 里对 obj_meta.rect_params.border_color 就地修改,易与 nvdsosd /
元数据生命周期冲突,在 EOS/NULL 时表现为 coredump。若需改框颜色,改用 osd 配置或
在 pgie 之后、进入 nvdsosd 之前的专用环节处理。
"""
# IMPORTANT:
# 不要在 probe 回调里依赖 u_data/state(不要把 Python 对象作为 u_data 传入),
# 否则在 EOS/teardown 阶段 probe callback closure 销毁时可能触发 PyGObject/GI
# 相关的释放崩溃。这里完全忽略 u_data。
frame_number = 0
num_rects = 0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
return Gst.PadProbeReturn.OK
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
......@@ -76,8 +90,8 @@ def osd_sink_pad_buffer_probe(pad,info,u_data):
except StopIteration:
break
obj_counter[obj_meta.class_id] += 1
obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.8) #0.8 is alpha (opacity)
try:
# 勿在此处修改 obj_meta.rect_params(与 deepstream_test_2_fileout 的只读 probe 对齐)
try:
l_obj=l_obj.next
except StopIteration:
break
......@@ -109,15 +123,82 @@ def osd_sink_pad_buffer_probe(pad,info,u_data):
py_nvosd_text_params.set_bg_clr = 1
# set(red, green, blue, alpha); set to Black
py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# Using pyds.get_string() to get display_text as string
print(pyds.get_string(py_nvosd_text_params.display_text))
# 避免在 streaming 线程里每帧 print(test2 的 probe 也不打印,降低 GIL/stdio 风险)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
return Gst.PadProbeReturn.OK
class _PipelineRunState:
"""用于总线轮询时的 pipeline 与运行状态(与 deepstream_test_1_simple 一致的模式)"""
def __init__(self):
self.pipeline = None
self.bus = None
self.is_running = False
self.osd_sink_pad = None
self.osd_probe_id = None
self._cleaned = False
def cleanup(self):
# 参考 deepstream_test_4 的 gdb 回溯:
# coredump 落在 gst_pad_remove_probe() / PyGObject closure free 阶段。
# 因此此处不显式调用 remove_probe,让 GStreamer 在 set_state(NULL)
# 的 teardown 流程中统一清理 probe/回调,避免双重释放或竞态。
if self._cleaned:
return
self._cleaned = True
self.is_running = False
self.osd_probe_id = None
self.osd_sink_pad = None
if self.bus is not None:
try:
self.bus.set_flushing(True)
except Exception:
pass
self.bus = None
if self.pipeline:
print("Stopping pipeline...")
self.pipeline.set_state(Gst.State.NULL)
self.pipeline.get_state(5 * Gst.SECOND)
print("Pipeline stopped")
self.pipeline = None
def handle_message(msg, state):
"""处理总线消息(与 deepstream_test_1_simple.py 中的 handle_message 一致)"""
msg_type = msg.type
if msg_type == Gst.MessageType.EOS:
print("\nEnd of stream")
# 注意:不要在这里直接 cleanup。
# EOS/ERROR 的 bus message 仍在当前栈帧中被释放时,若同时触发 pipeline teardown,
# 可能导致 nvdsosd pad dispose / PyGObject closure free 与 GstMessage free 竞态,进而 coredump。
state.is_running = False
elif msg_type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"\nGStreamer Error: {err}")
if debug:
print(f"Debug info: {debug}")
state.is_running = False
elif msg_type == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
print(f"\nGStreamer Warning: {err}")
if debug:
print(f"Debug info: {debug}")
elif msg_type == Gst.MessageType.STATE_CHANGED:
if state.pipeline and msg.src == state.pipeline:
old_state, new_state, pending_state = msg.parse_state_changed()
print(f"Pipeline state: {old_state.value_nick} -> {new_state.value_nick}")
def main(args):
......@@ -153,7 +234,7 @@ def main(args):
# Use nvdec_h264 for hardware accelerated decode on GPU
print("Creating Decoder \n")
decoder = Gst.ElementFactory.make("sginfer", "nvv4l2-decoder")
decoder = Gst.ElementFactory.make("mach264dec", "mach264-decoder")
if not decoder:
sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")
......@@ -205,7 +286,9 @@ def main(args):
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property('batch-size', 1)
pgie.set_property('config-file-path', "dstest1_pgie_config.txt")
pgie.set_property('config-file-path', "dstest1_pgie_nvinferserver_config.txt")
# 与 deepstream_test_2_fileout 一致:用元素属性画框,勿在 probe 里改写 rect_params
nvosd.set_property("display-bbox", 1)
print("Adding elements to Pipeline \n")
pipeline.add(source)
......@@ -218,8 +301,8 @@ def main(args):
pipeline.add(sink)
# we link the elements together
# file-source -> h264-parser -> nvh264-decoder ->
# nvinfer -> nvvidconv -> nvosd -> video-renderer
# file-source -> h264-parser -> mach264-decoder ->
# nvinferserver -> nvosd -> fakesink
print("Linking elements in the Pipeline \n")
source.link(h264parser)
h264parser.link(decoder)
......@@ -230,18 +313,14 @@ def main(args):
srcpad = decoder.get_static_pad("src")
if not srcpad:
sys.stderr.write(" Unable to get source pad of decoder \n")
srcpad.link(sinkpad)
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
sys.stderr.write("Failed to link decoder to streammux\n")
sys.exit(1)
streammux.link(pgie)
pgie.link(nvosd)
# nvvidconv.link(nvosd)
nvosd.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
# bus = pipeline.get_bus()
# bus.add_signal_watch()
# bus.connect ("message", bus_call, loop)
# Lets add probe to get informed of the meta data generated, we add probe to
# the sink pad of the osd element, since by that time, the buffer would have
# had got all the metadata.
......@@ -249,17 +328,50 @@ def main(args):
if not osdsinkpad:
sys.stderr.write(" Unable to get sink pad of nvosd \n")
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
state = _PipelineRunState()
state.pipeline = pipeline
state.is_running = True
state.osd_sink_pad = osdsinkpad
if osdsinkpad:
state.osd_probe_id = osdsinkpad.add_probe(
Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0
)
# start play back and listen to events
# 使用 timed_pop_filtered + handle_message(与 deepstream_test_1_simple.py 一致)
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
sys.stderr.write("Failed to set pipeline to PLAYING\n")
state.cleanup()
sys.exit(1)
bus = pipeline.get_bus()
state.bus = bus
print("Pipeline is running...\n")
try:
loop.run()
except:
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
while state.is_running:
try:
msg = bus.timed_pop_filtered(
100 * Gst.MSECOND,
Gst.MessageType.EOS
| Gst.MessageType.ERROR
| Gst.MessageType.STATE_CHANGED
| Gst.MessageType.WARNING,
)
if msg:
handle_message(msg, state)
# 解除对 msg 的引用,尽快触发 GstMessage/GObject free,
# 避免 cleanup 与 message free 竞态。
msg = None
if not state.is_running:
break
except KeyboardInterrupt:
print("\nPipeline interrupted by user")
state.is_running = False
break
finally:
state.cleanup()
if __name__ == '__main__':
sys.exit(main(sys.argv))
......
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
DeepStream test1 的简化版本:
- 插件与 pipeline 结构来自 deepstream_test_1.py(filesrc -> h264parse -> mach264dec -> nvstreammux -> nvinferserver -> nvdsosd)
- 非插件相关代码(类封装、轮询式 bus、main 参数与文件检查)借鉴 sample_test.py
"""
import sys
import os
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
# DeepStream 插件相关常量(与 deepstream_test_1.py 一致)
MUXER_BATCH_TIMEOUT_USEC = 33000
PGIE_CONFIG_FILE = "dstest1_pgie_nvinferserver_config.txt"
OUTPUT_LOCATION = "/workspace/shared_docker/video/deepstream_python_app1_output.mkv"
class DeepStreamTest1Pipeline:
def __init__(self):
self.pipeline = None
self.is_running = False
def create_pipeline(self, video_file):
"""创建并配置 DeepStream pipeline(插件与 deepstream_test_1.py 一致)"""
try:
Gst.init(None)
self.pipeline = Gst.Pipeline.new("deepstream-test1-pipeline")
if not self.pipeline:
print("Failed to create pipeline")
return False
# 1. filesrc
source = Gst.ElementFactory.make("filesrc", "file-source")
if not source:
print("Failed to create filesrc")
return False
# 2. h264parse
h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
if not h264parser:
print("Failed to create h264parse")
return False
# 3. mach264dec(硬件解码,与 deepstream_test_1.py 一致)
decoder = Gst.ElementFactory.make("mach264dec", "mach264-decoder")
if not decoder:
print("Failed to create mach264dec")
return False
# 4. nvstreammux
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
print("Failed to create nvstreammux")
return False
# 5. nvinferserver
pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
if not pgie:
print("Failed to create nvinferserver")
return False
# 6. nvdsosd
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
print("Failed to create nvdsosd")
return False
# 7. encoder: maccudah264enc
encoder = Gst.ElementFactory.make("maccudah264enc", "h264-encoder")
if not encoder:
print("Failed to create maccudah264enc")
return False
# 8. h264parse (for muxing)
h264parse_out = Gst.ElementFactory.make("h264parse", "h264-parser-out")
if not h264parse_out:
print("Failed to create h264parse (out)")
return False
# 9. matroskamux
mux = Gst.ElementFactory.make("matroskamux", "mkv-mux")
if not mux:
print("Failed to create matroskamux")
return False
# 10. filesink
sink = Gst.ElementFactory.make("filesink", "file-sink")
if not sink:
print("Failed to create filesink")
return False
# 设置 source 文件路径
source.set_property("location", video_file)
# streammux 属性(与 deepstream_test_1.py 一致)
if os.environ.get("USE_NEW_NVSTREAMMUX") != "yes":
streammux.set_property("width", 1920)
streammux.set_property("height", 1080)
streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property("batch-size", 1)
# pgie 配置文件
pgie.set_property("config-file-path", PGIE_CONFIG_FILE)
# nvosd 属性
nvosd.set_property("display-bbox", 1)
# encoder 属性
encoder.set_property("bitrate", 4000)
# filesink 属性
sink.set_property("location", OUTPUT_LOCATION)
# 添加元素
self.pipeline.add(source)
self.pipeline.add(h264parser)
self.pipeline.add(decoder)
self.pipeline.add(streammux)
self.pipeline.add(pgie)
self.pipeline.add(nvosd)
self.pipeline.add(encoder)
self.pipeline.add(h264parse_out)
self.pipeline.add(mux)
self.pipeline.add(sink)
# 链接:source -> h264parse -> decoder
if not source.link(h264parser):
print("Failed to link source to h264parse")
return False
if not h264parser.link(decoder):
print("Failed to link h264parse to decoder")
return False
# decoder -> streammux (request pad sink_0)
sinkpad = streammux.request_pad_simple("sink_0")
if not sinkpad:
print("Failed to get sink pad of streammux")
return False
srcpad = decoder.get_static_pad("src")
if not srcpad:
print("Failed to get source pad of decoder")
return False
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
print("Failed to link decoder to streammux")
return False
# streammux -> pgie -> nvosd -> encoder -> h264parse -> matroskamux -> filesink
if not streammux.link(pgie):
print("Failed to link streammux to pgie")
return False
if not pgie.link(nvosd):
print("Failed to link pgie to nvosd")
return False
if not nvosd.link(encoder):
print("Failed to link nvosd to encoder")
return False
if not encoder.link(h264parse_out):
print("Failed to link encoder to h264parse")
return False
if not h264parse_out.link(mux):
print("Failed to link h264parse to matroskamux")
return False
if not mux.link(sink):
print("Failed to link matroskamux to filesink")
return False
print("Pipeline created successfully")
return True
except Exception as e:
print(f"Error creating pipeline: {e}")
return False
def cleanup(self):
"""清理资源(与 sample_test.py 一致)"""
if not self.is_running:
return
self.is_running = False
if self.pipeline:
print("Stopping pipeline...")
self.pipeline.set_state(Gst.State.NULL)
print("Pipeline stopped")
def handle_message(self, msg):
"""处理总线消息(与 sample_test.py 一致)"""
msg_type = msg.type
if msg_type == Gst.MessageType.EOS:
print("\nEnd of stream")
self.cleanup()
elif msg_type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"\nGStreamer Error: {err}")
if debug:
print(f"Debug info: {debug}")
self.cleanup()
elif msg_type == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
print(f"\nGStreamer Warning: {err}")
if debug:
print(f"Debug info: {debug}")
elif msg_type == Gst.MessageType.STATE_CHANGED:
if msg.src == self.pipeline:
old_state, new_state, pending_state = msg.parse_state_changed()
print(f"Pipeline state: {old_state.value_nick} -> {new_state.value_nick}")
def run(self, video_file):
"""运行 pipeline(轮询式 bus,与 sample_test.py 一致)"""
if not self.create_pipeline(video_file):
return 1
try:
self.is_running = True
print(f"Playing file: {video_file}")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Failed to start pipeline")
self.cleanup()
return 1
print("Pipeline is running...")
print("Press Ctrl+C to stop\n")
bus = self.pipeline.get_bus()
while self.is_running:
try:
msg = bus.timed_pop_filtered(
100 * Gst.MSECOND,
Gst.MessageType.EOS | Gst.MessageType.ERROR
| Gst.MessageType.STATE_CHANGED | Gst.MessageType.WARNING
)
if msg:
self.handle_message(msg)
except KeyboardInterrupt:
print("\nPipeline interrupted by user")
self.cleanup()
break
return 0
except Exception as e:
print(f"Error running pipeline: {e}")
self.cleanup()
return 1
def main():
"""入口(参数与文件检查与 sample_test.py 一致)"""
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <video_file>")
print("Example: python3 deepstream_test_1_simple.py test.h264")
return 1
video_file = sys.argv[1]
if not os.path.exists(video_file):
print(f"Error: File '{video_file}' does not exist")
return 1
if not os.access(video_file, os.R_OK):
print(f"Error: File '{video_file}' is not readable")
return 1
print(f"Starting DeepStream test1 pipeline for file: {video_file}")
app = DeepStreamTest1Pipeline()
return app.run(video_file)
if __name__ == "__main__":
sys.exit(main())
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
################################################################################
infer_config {
unique_id: 1
gpu_ids: [0]
max_batch_size: 30
backend {
inputs: [ {
name: "input_1:0"
}]
outputs: [
{name: "output_cov/Sigmoid:0"},
{name: "output_bbox/BiasAdd:0"}
]
triton {
model_name: "Primary_Detector"
version: -1
model_repo {
root: "../../../../sugon_samples/triton_model_repo"
strict_model_config: true
backend_dir: "/opt/deepstream/third_party/backends"
}
}
}
preprocess {
network_format: MEDIA_FORMAT_NONE
tensor_order: TENSOR_ORDER_LINEAR
tensor_name: "input_1:0"
maintain_aspect_ratio: 0
frame_scaling_hw: FRAME_SCALING_HW_DEFAULT
frame_scaling_filter: 1
normalize {
scale_factor: 0.00392156862745098
channel_offsets: [0, 0, 0]
}
}
postprocess {
labelfile_path: "../../../../sugon_samples/labels/Primary_Detector/labels.txt"
detection {
num_detected_classes: 4
per_class_params {
key: 0
value { pre_threshold: 0.4 }
}
nms {
confidence_threshold:0.2
topk:20
iou_threshold:0.5
}
}
}
extra {
copy_input_to_host_buffers: false
output_buffer_pool_size: 2
}
}
input_control {
process_mode: PROCESS_MODE_FULL_FRAME
operate_on_gie_id: -1
interval: 0
}
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Python reimplementation of `sugon_apps/sample_apps/deepstream-test2/deepstream_test2_app.c`.
Pipeline (H264 elementary stream):
filesrc -> h264parse -> mach264dec -> nvstreammux -> (pgie) -> nvtracker ->
(sgie1) -> (sgie2) -> nvdsosd -> maccudah264enc -> h264parse -> qtmux -> filesink
Notes:
- Inference elements are `nvinferserver` (matching the C app default).
- Tracker properties are loaded from `dstest2_tracker_config.txt` (same parsing logic as C app).
"""
from __future__ import annotations
import configparser
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
sys.path.append("../")
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
import pyds
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_PERSON = 2
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 40000
PGIE_CONFIG_FILE = "dstest2_pgie_nvinferserver_config.txt"
SGIE1_CONFIG_FILE = "dstest2_sgie1_nvinferserver_config.txt"
SGIE2_CONFIG_FILE = "dstest2_sgie2_nvinferserver_config.txt"
TRACKER_CONFIG_FILE = "dstest2_tracker_config.txt"
@dataclass(frozen=True)
class AppConfig:
input_h264_path: str
output_path: str
def osd_sink_pad_buffer_probe(pad: Gst.Pad, info: Gst.PadProbeInfo, u_data) -> Gst.PadProbeReturn:
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
vehicle_count = 0
person_count = 0
num_rects = 0
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
l_obj = frame_meta.obj_meta_list
while l_obj is not None:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
if obj_meta.class_id == PGIE_CLASS_ID_VEHICLE:
vehicle_count += 1
num_rects += 1
elif obj_meta.class_id == PGIE_CLASS_ID_PERSON:
person_count += 1
num_rects += 1
try:
l_obj = l_obj.next
except StopIteration:
break
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
text_params = display_meta.text_params[0]
text_params.display_text = f"Person = {person_count} Vehicle = {vehicle_count}"
text_params.x_offset = 10
text_params.y_offset = 12
text_params.font_params.font_name = "Serif"
text_params.font_params.font_size = 10
text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
text_params.set_bg_clr = 1
text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def _resolve_path_relative_to_config_file(config_file_path: str, file_path: str) -> str:
if os.path.isabs(file_path):
return file_path
return str((Path(config_file_path).resolve().parent / file_path).resolve())
def set_tracker_properties_from_file(tracker: Gst.Element, tracker_config_path: str) -> None:
config = configparser.ConfigParser()
files_read = config.read(tracker_config_path)
if not files_read:
raise RuntimeError(f"Failed to load tracker config file: {tracker_config_path}")
if "tracker" not in config:
raise RuntimeError(f"Missing [tracker] group in {tracker_config_path}")
tracker_section = config["tracker"]
if "tracker-width" in tracker_section:
tracker.set_property("tracker-width", int(tracker_section["tracker-width"]))
if "tracker-height" in tracker_section:
tracker.set_property("tracker-height", int(tracker_section["tracker-height"]))
if "gpu-id" in tracker_section:
tracker.set_property("gpu_id", int(tracker_section["gpu-id"]))
if "ll-lib-file" in tracker_section:
tracker.set_property(
"ll-lib-file",
_resolve_path_relative_to_config_file(tracker_config_path, tracker_section["ll-lib-file"]),
)
if "ll-config-file" in tracker_section:
tracker.set_property(
"ll-config-file",
_resolve_path_relative_to_config_file(tracker_config_path, tracker_section["ll-config-file"]),
)
def create_pipeline(app_config: AppConfig) -> Gst.Pipeline:
Gst.init(None)
pipeline = Gst.Pipeline.new("dstest2-pipeline")
if not pipeline:
raise RuntimeError("Failed to create pipeline")
source = Gst.ElementFactory.make("filesrc", "file-source")
h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
decoder = Gst.ElementFactory.make("mach264dec", "mach264-decoder")
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
pgie = Gst.ElementFactory.make("nvinferserver", "primary-nvinference-engine")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
sgie1 = Gst.ElementFactory.make("nvinferserver", "secondary1-nvinference-engine")
sgie2 = Gst.ElementFactory.make("nvinferserver", "secondary2-nvinference-engine")
nvosd = Gst.ElementFactory.make("nvdsosd", "nv-onscreendisplay")
encoder = Gst.ElementFactory.make("maccudah264enc", "mac-cudah264enc")
h264parser1 = Gst.ElementFactory.make("h264parse", "h264-parser1")
mux = Gst.ElementFactory.make("qtmux", "qt-mux")
sink = Gst.ElementFactory.make("filesink", "file-sink")
required_elements = [
("filesrc", source),
("h264parse", h264parser),
("mach264dec", decoder),
("nvstreammux", streammux),
("nvinferserver(pgie)", pgie),
("nvtracker", tracker),
("nvinferserver(sgie1)", sgie1),
("nvinferserver(sgie2)", sgie2),
("nvdsosd", nvosd),
("maccudah264enc", encoder),
("h264parse(1)", h264parser1),
("qtmux", mux),
("filesink", sink),
]
missing = [name for name, element in required_elements if not element]
if missing:
raise RuntimeError(f"Failed to create elements: {', '.join(missing)}")
source.set_property("location", app_config.input_h264_path)
streammux.set_property("batch-size", 1)
streammux.set_property("gpu-id", 3)
streammux.set_property("width", MUXER_OUTPUT_WIDTH)
streammux.set_property("height", MUXER_OUTPUT_HEIGHT)
streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
pgie.set_property("config-file-path", PGIE_CONFIG_FILE)
sgie1.set_property("config-file-path", SGIE1_CONFIG_FILE)
sgie2.set_property("config-file-path", SGIE2_CONFIG_FILE)
set_tracker_properties_from_file(tracker, TRACKER_CONFIG_FILE)
nvosd.set_property("display-bbox", 1)
sink.set_property("location", app_config.output_path)
for element in [
source,
h264parser,
decoder,
streammux,
pgie,
tracker,
sgie1,
sgie2,
nvosd,
encoder,
h264parser1,
mux,
sink,
]:
pipeline.add(element)
if not source.link(h264parser):
raise RuntimeError("Failed to link filesrc -> h264parse")
if not h264parser.link(decoder):
raise RuntimeError("Failed to link h264parse -> decoder")
sinkpad = streammux.request_pad_simple("sink_0")
if not sinkpad:
raise RuntimeError("Streammux request sink pad failed")
srcpad = decoder.get_static_pad("src")
if not srcpad:
raise RuntimeError("Decoder request src pad failed")
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
raise RuntimeError("Failed to link decoder -> streammux(sink_0)")
if not streammux.link(pgie):
raise RuntimeError("Failed to link streammux -> pgie")
if not pgie.link(tracker):
raise RuntimeError("Failed to link pgie -> nvtracker")
if not tracker.link(sgie1):
raise RuntimeError("Failed to link nvtracker -> sgie1")
if not sgie1.link(sgie2):
raise RuntimeError("Failed to link sgie1 -> sgie2")
if not sgie2.link(nvosd):
raise RuntimeError("Failed to link sgie2 -> nvdsosd")
if not nvosd.link(encoder):
raise RuntimeError("Failed to link nvdsosd -> encoder")
if not encoder.link(h264parser1):
raise RuntimeError("Failed to link encoder -> h264parse1")
if not h264parser1.link(mux):
raise RuntimeError("Failed to link h264parse1 -> qtmux")
if not mux.link(sink):
raise RuntimeError("Failed to link qtmux -> filesink")
osd_sink_pad = nvosd.get_static_pad("sink")
if not osd_sink_pad:
raise RuntimeError("Unable to get sink pad of nvdsosd")
osd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
return pipeline
class DeepStreamTest2FileOutApp:
def __init__(self, app_config: AppConfig):
self.app_config = app_config
self.pipeline: Optional[Gst.Pipeline] = None
self.is_running = False
def cleanup(self) -> None:
if not self.is_running:
return
self.is_running = False
if self.pipeline:
try:
self.pipeline.set_state(Gst.State.NULL)
finally:
self.pipeline = None
def handle_message(self, msg: Gst.Message) -> None:
msg_type = msg.type
if msg_type == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
self.cleanup()
elif msg_type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
sys.stderr.write(f"Error: {err}: {debug}\n")
self.cleanup()
elif msg_type == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
sys.stderr.write(f"Warning: {err}: {debug}\n")
elif msg_type == Gst.MessageType.STATE_CHANGED:
if self.pipeline and msg.src == self.pipeline:
old_state, new_state, pending_state = msg.parse_state_changed()
sys.stdout.write(f"Pipeline state: {old_state.value_nick} -> {new_state.value_nick}\n")
def run(self) -> int:
try:
self.pipeline = create_pipeline(self.app_config)
except Exception as e:
sys.stderr.write(f"Failed to create pipeline: {e}\n")
return 1
self.is_running = True
print(f"Using file: {self.app_config.input_h264_path}")
print("Running...")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
sys.stderr.write("Failed to start pipeline\n")
self.cleanup()
return 1
bus = self.pipeline.get_bus()
while self.is_running:
try:
msg = bus.timed_pop_filtered(
100 * Gst.MSECOND,
Gst.MessageType.EOS
| Gst.MessageType.ERROR
| Gst.MessageType.WARNING
| Gst.MessageType.STATE_CHANGED,
)
if msg:
self.handle_message(msg)
except KeyboardInterrupt:
break
self.cleanup()
return 0
def parse_args(argv: list[str]) -> Optional[AppConfig]:
if len(argv) != 3:
sys.stderr.write(f"Usage: {argv[0]} <H264 filename> <output_stream>\\n")
return None
input_path = argv[1]
output_path = argv[2]
if not os.path.exists(input_path):
sys.stderr.write(f"Error: input file does not exist: {input_path}\\n")
return None
if not os.access(input_path, os.R_OK):
sys.stderr.write(f"Error: input file is not readable: {input_path}\\n")
return None
output_parent = os.path.dirname(output_path) or "."
if not os.path.exists(output_parent):
sys.stderr.write(f"Error: output directory does not exist: {output_parent}\\n")
return None
return AppConfig(input_h264_path=input_path, output_path=output_path)
def main(argv: list[str]) -> int:
app_config = parse_args(argv)
if not app_config:
return 1
return DeepStreamTest2FileOutApp(app_config).run()
if __name__ == "__main__":
raise SystemExit(main(sys.argv))
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
################################################################################
infer_config {
unique_id: 1
gpu_ids: [3]
max_batch_size: 30
backend {
inputs: [ {
name: "input_1:0"
}]
outputs: [
{name: "output_cov/Sigmoid:0"},
{name: "output_bbox/BiasAdd:0"}
]
triton {
model_name: "Primary_Detector"
version: -1
model_repo {
root: "../../../../sugon_samples/triton_model_repo"
strict_model_config: true
backend_dir: "/opt/deepstream/third_party/backends"
}
}
}
preprocess {
network_format: MEDIA_FORMAT_NONE
tensor_order: TENSOR_ORDER_LINEAR
tensor_name: "input_1:0"
maintain_aspect_ratio: 0
frame_scaling_hw: FRAME_SCALING_HW_DEFAULT
frame_scaling_filter: 1
normalize {
scale_factor: 0.00392156862745098
channel_offsets: [0, 0, 0]
}
}
postprocess {
labelfile_path: "../../../../sugon_samples/labels/Primary_Detector/labels.txt"
detection {
num_detected_classes: 4
per_class_params {
key: 0
value { pre_threshold: 0.4 }
}
nms {
confidence_threshold:0.2
topk:20
iou_threshold:0.5
}
}
}
extra {
copy_input_to_host_buffers: false
output_buffer_pool_size: 2
}
}
input_control {
process_mode: PROCESS_MODE_FULL_FRAME
operate_on_gie_id: -1
interval: 0
}
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
################################################################################
infer_config {
unique_id: 2
gpu_ids: [3]
max_batch_size: 16
backend {
triton {
model_name: "Secondary_VehicleMake"
version: -1
model_repo {
root: "../../../../sugon_samples/triton_model_repo"
strict_model_config: true
backend_dir: "/opt/deepstream/third_party/backends"
}
}
}
preprocess {
network_format: IMAGE_FORMAT_BGR
tensor_order: TENSOR_ORDER_LINEAR
maintain_aspect_ratio: 0
frame_scaling_hw: FRAME_SCALING_HW_DEFAULT
frame_scaling_filter: 1
normalize {
scale_factor: 1
}
}
postprocess {
labelfile_path: "../../../../sugon_samples/labels/Secondary_VehicleMake/labels.txt"
classification {
threshold: 0.51
}
}
}
input_control {
process_mode: PROCESS_MODE_CLIP_OBJECTS
operate_on_gie_id: 1
operate_on_class_ids: [0]
secondary_reinfer_interval: 90
async_mode: true
object_control {
bbox_filter {
min_width: 64
min_height: 64
}
}
}
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
################################################################################
infer_config {
unique_id: 3
gpu_ids: [3]
max_batch_size: 16
backend {
triton {
model_name: "Secondary_VehicleTypes"
version: -1
model_repo {
root: "../../../../sugon_samples/triton_model_repo"
strict_model_config: true
backend_dir: "/opt/deepstream/third_party/backends"
}
}
}
preprocess {
network_format: IMAGE_FORMAT_BGR
tensor_order: TENSOR_ORDER_LINEAR
maintain_aspect_ratio: 0
frame_scaling_hw: FRAME_SCALING_HW_DEFAULT
frame_scaling_filter: 1
normalize {
scale_factor: 1
}
}
postprocess {
labelfile_path: "../../../../sugon_samples/labels/Secondary_VehicleTypes/labels.txt"
classification {
threshold: 0.51
}
}
}
input_control {
process_mode: PROCESS_MODE_CLIP_OBJECTS
operate_on_gie_id: 1
operate_on_class_ids: [0]
secondary_reinfer_interval: 90
async_mode: true
object_control {
bbox_filter {
min_width: 64
min_height: 64
}
}
}
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
################################################################################
# Mandatory properties for the tracker:
# tracker-width, tracker-height: needs to be multiple of 32 for NvDCF and NvDeepSORT
# gpu-id
# ll-lib-file: path to low-level tracker lib
# ll-config-file: required to set different tracker types
#
[tracker]
tracker-width=960
tracker-height=544
gpu-id=3
ll-lib-file= /opt/deepstream/lib/libnvds_nvmultiobjecttracker.so
# ll-config-file required to set different tracker types
# ll-config-file=../../../../samples/configs/deepstream-app/config_tracker_IOU.yml
ll-config-file=../../../../sugon_samples/configs/deepstream-app/config_tracker_NvSORT.yml
# ll-config-file=../../../../samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml
# ll-config-file=../../../../samples/configs/deepstream-app/config_tracker_NvDCF_accuracy.yml
# ll-config-file=../../../../samples/configs/deepstream-app/config_tracker_NvDeepSORT.yml
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Python port of sugon_apps/sample_apps/deepstream-test3/deepstream_multi_decoder_test.c
#
# Pipeline:
# For each of 2 streams: filesrc -> h264parse -> hyh264dec -> nvstreammux.sink_N
# nvstreammux -> nvinferserver -> fakesink
#
# 总线采用 timed_pop_filtered + handle_message(与 deepstream_test_1_simple 一致),
# 避免 GLib.MainLoop + bus_call 在 EOS 后退出时易发的 coredump。
################################################################################
import os
import sys
sys.path.append("../")
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst
# 与 C 版 multi_decoder_test_nvinferserver_config.txt 一致
CONFIG_FILE = "multi_decoder_test_nvinferserver_config.txt"
NUM_STREAMS = 2
# C 源码使用 hyh264dec;若环境无此插件,可设置环境变量覆盖,例如:
# export DEEPSTREAM_DECODER=mach264dec
DEFAULT_DECODER = os.environ.get("DEEPSTREAM_DECODER", "mach264dec")
class _PipelineRunState:
"""轮询总线时的 pipeline / bus 状态。"""
def __init__(self):
self.pipeline = None
self.bus = None
self.is_running = False
self._cleaned = False
def cleanup(self):
if self._cleaned:
return
self.is_running = False
if self.bus is not None:
try:
self.bus.set_flushing(True)
except Exception:
pass
self.bus = None
if self.pipeline:
print("Stopping pipeline...")
self.pipeline.set_state(Gst.State.NULL)
ret, _cur, _pending = self.pipeline.get_state(5 * Gst.SECOND)
if ret == Gst.StateChangeReturn.FAILURE:
sys.stderr.write("Warning: pipeline failed to reach NULL state\n")
print("Pipeline stopped")
self.pipeline = None
self._cleaned = True
def handle_message(msg: Gst.Message, state: _PipelineRunState) -> None:
"""处理总线消息(与 deepstream_test_1_simple.py 中的 handle_message 一致)"""
msg_type = msg.type
if msg_type == Gst.MessageType.EOS:
print("\nEnd of stream")
state.cleanup()
elif msg_type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"\nGStreamer Error: {err}")
if debug:
print(f"Debug info: {debug}")
state.cleanup()
elif msg_type == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
print(f"\nGStreamer Warning: {err}")
if debug:
print(f"Debug info: {debug}")
elif msg_type == Gst.MessageType.STATE_CHANGED:
if state.pipeline and msg.src == state.pipeline:
old_state, new_state, pending_state = msg.parse_state_changed()
print(f"Pipeline state: {old_state.value_nick} -> {new_state.value_nick}")
def main(argv: list[str]) -> int:
if len(argv) != 3:
sys.stderr.write(f"Usage: {argv[0]} <video1.h264> <video2.h264>\n")
return 1
Gst.init(None)
pipeline = Gst.Pipeline.new("multi-decode-inference-pipeline")
if not pipeline:
sys.stderr.write("Failed to create pipeline\n")
return 1
streammux = Gst.ElementFactory.make("nvstreammux", "streammux")
if not streammux:
sys.stderr.write("Failed to create nvstreammux\n")
return 1
streammux.set_property("batch-size", NUM_STREAMS)
pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
if not pgie:
sys.stderr.write("Failed to create nvinferserver\n")
return 1
pgie.set_property("config-file-path", CONFIG_FILE)
sink = Gst.ElementFactory.make("fakesink", "fakesink")
if not sink:
sys.stderr.write("Failed to create fakesink\n")
return 1
pipeline.add(streammux)
pipeline.add(pgie)
pipeline.add(sink)
if not streammux.link(pgie):
sys.stderr.write("Failed to link streammux -> pgie\n")
return 1
if not pgie.link(sink):
sys.stderr.write("Failed to link pgie -> fakesink\n")
return 1
for i in range(NUM_STREAMS):
src_name = f"file-source-{i}"
parser_name = f"h264-parser-{i}"
decoder_name = f"hy-decoder-{i}"
src = Gst.ElementFactory.make("filesrc", src_name)
h264parser = Gst.ElementFactory.make("h264parse", parser_name)
decoder = Gst.ElementFactory.make("mach264dec", decoder_name)
if not src or not h264parser or not decoder:
sys.stderr.write(
f"Failed to create decode chain for stream {i} "
f"(decoder={DEFAULT_DECODER})\n"
)
return 1
src.set_property("location", argv[i + 1])
pipeline.add(src)
pipeline.add(h264parser)
pipeline.add(decoder)
if not src.link(h264parser):
sys.stderr.write(f"Failed to link filesrc -> h264parse (stream {i})\n")
return 1
if not h264parser.link(decoder):
sys.stderr.write(f"Failed to link h264parse -> decoder (stream {i})\n")
return 1
sinkpad = streammux.request_pad_simple(f"sink_{i}")
if not sinkpad:
sys.stderr.write(f"Failed to request streammux sink_{i}\n")
return 1
srcpad = decoder.get_static_pad("src")
if not srcpad:
sys.stderr.write(f"Failed to get decoder src pad (stream {i})\n")
return 1
if srcpad.link(sinkpad) != Gst.PadLinkReturn.OK:
sys.stderr.write(f"Failed to link decoder -> streammux (stream {i})\n")
return 1
state = _PipelineRunState()
state.pipeline = pipeline
state.is_running = True
print("Running pipeline...")
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
sys.stderr.write("Failed to set pipeline to PLAYING\n")
state.cleanup()
return 1
bus = pipeline.get_bus()
state.bus = bus
try:
while state.is_running:
try:
msg = bus.timed_pop_filtered(
100 * Gst.MSECOND,
Gst.MessageType.EOS
| Gst.MessageType.ERROR
| Gst.MessageType.STATE_CHANGED
| Gst.MessageType.WARNING,
)
if msg:
handle_message(msg, state)
except KeyboardInterrupt:
print("\nPipeline interrupted by user")
state.cleanup()
break
except Exception:
if not state._cleaned:
state.cleanup()
raise
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: LicenseRef-NvidiaProprietary
#
# NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
# property and proprietary rights in and to this material, related
# documentation and any modifications thereto. Any use, reproduction,
# disclosure or distribution of this material and related documentation
# without an express license agreement from NVIDIA CORPORATION or
# its affiliates is strictly prohibited.
################################################################################
infer_config {
unique_id: 1
gpu_ids: [0]
max_batch_size: 30
backend {
inputs: [ {
name: "input_1:0"
}]
outputs: [
{name: "output_cov/Sigmoid:0"},
{name: "output_bbox/BiasAdd:0"}
]
triton {
model_name: "Primary_Detector"
version: 1
model_repo {
root: "../../../../sugon_samples/triton_model_repo"
strict_model_config: true
backend_dir: "/opt/deepstream/third_party/backends"
#log_level: 3
}
}
}
preprocess {
network_format: MEDIA_FORMAT_NONE
tensor_order: TENSOR_ORDER_LINEAR
tensor_name: "input_1:0"
maintain_aspect_ratio: 0
frame_scaling_hw: FRAME_SCALING_HW_DEFAULT
frame_scaling_filter: 1
normalize {
scale_factor: 0.00392156862745098
channel_offsets: [0, 0, 0]
}
}
postprocess {
labelfile_path: "../../../../sugon_samples/labels/Primary_Detector/labels.txt"
detection {
num_detected_classes: 4
per_class_params {
key: 0
value { pre_threshold: 0.4 }
}
nms {
confidence_threshold:0.2
topk:20
iou_threshold:0.5
}
}
}
extra {
copy_input_to_host_buffers: false
output_buffer_pool_size: 2
}
}
input_control {
process_mode: PROCESS_MODE_FULL_FRAME
operate_on_gie_id: -1
interval: 0
}
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
[message-broker]
hostname = localhost
port = 5672
username = guest
password = guest
exchange = amq.topic
topic = topicname
amqp-framesize = 131072
#amqp-heartbeat = 0
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2018-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
[message-broker]
#connection_str = HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;SharedAccessKey=<my-policy-key>
#shared_access_key = <my-policy-key>
#custom_msg_properties = <key>=<value>;
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
[message-broker]
partition-key = sensor.id
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
[message-broker]
username = user
password = password
client-id = uniqueID
#enable-tls = 1
#tls-cafile =
#tls-capath =
#tls-certfile =
#tls-keyfile =
#share-connection = 1
#loop-timeout = 2000
#keep-alive = 60
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment