Skip to content
This repository was archived by the owner on Sep 13, 2024. It is now read-only.
This repository was archived by the owner on Sep 13, 2024. It is now read-only.

Question: Is this repo right for my use case? #110

@cmcguirk17

Description

@cmcguirk17

Describe the bug

I'm just curious if this is the right tool for my project, here is my use case:

Overview
I have a Python application that is using PyGObject and NVIDIA DeepStream 6.2 Python bindings. I currently have a working pipeline that makes N connections to RTSP sources and outputs the decoded frames as numpy arrays using an appsink 'new-sample' callback. I then take these numpy arrays and perform some post processing, AI model inference, etc..

Problem
The issue that I am running into is when I call MyPipeline.gst_loop.run() the method blocks and I have no good way to properly shutdown the pipeline. The only way that has worked for me is setting up a Python signal listener for SIGINT or SIGTERM and then running a graceful stop method to send the pipeline an eos_event, however, if I try to use my pipeline in a Process() as described below that no longer works.
In order to do my other tasks I put my pipeline into a Python mp.Process() and use an mp.Queue() to access the numpy arrays output by appsink, this allows me to go and run an inference model and perform some other work. However, when I stop my application it seems that I am unable to properly shutdown the pipeline with an eos_event as it is running in its own process.

Using the async method for GLib that this repo provides would I be able to implement some event/message handling system to 'gracefully' shutdown the pipeline? I am pretty new to PyGObject, so perhaps there is a more standard way to run/shutdown pipelines?

Steps to reproduce

Relevant code snippets

class DSSystem:
    perf_data = None
    t0 = time.time_ns()  # for startup time


    def __init__(self):
        self.MyEnvironmentVar = DSEnvironmentVar()
        self.SysConfigPath = ''
        self.UtilFolder = None  # Unused- variable in SysConfig file

        self.ds_config: dict = {}

        self.decode_mode: DecoderEnum = DecoderEnum.HARDWARE

        self.cameras: List[CameraSource] = []
        self.camera_config_paths = []
        self.num_cameras = 0

        self.buffer_queue = mp.Queue()  # .get(obj), .put(obj)

        self.gst_proc = None
        self.gst_loop: GLib.MainLoop = None
        self.gst_bus: Gst.Bus = None
        self.gst_pipeline: Gst.Pipeline = None

        self.performance_data: PERF_DATA = None  # For NV perf metrics
        # self.save_ds_graph = 0
        self.run_time = None


    def start_system(self)

        _creates Gst pipeline..._

        appsink = Gst.ElementFactory.make('appsink', 'batch_sink')
        appsink.set_property("emit-signals", True)
        sink_handler_id = appsink.connect("new-sample", on_new_sample, self.buffer_queue)

        # PIPELINE ADDING AND LINKING
        print("Adding elements to Pipeline \n")
        self.gst_pipeline.add(appsink)
        nvstrmux.link(appsink)

        # PREPARE EVENT/BUS LOOP AND MESSAGING
        self.gst_loop = GLib.MainLoop()
        self.gst_bus = self.gst_pipeline.get_bus()

        # Bus will be polled to watch for GObject.connect('message type', ..., ...)
        self.gst_bus.add_signal_watch()
        handler_id = self.gst_bus.connect("message", cb_gstloop_bus.bus_msg_handler, self.gst_loop)
        # connect returns a handler ID for the callback

        # PIPELINE PLAYING AND NULL CLEANUP
        print("Starting pipeline \n")
        self.gst_pipeline.set_state(Gst.State.PLAYING)

        signal.signal(signal.SIGINT, self.kill_signal_rec)
        signal.signal(signal.SIGTERM, self.kill_signal_rec)

        self.gst_loop.run()
        print('Loop.run() exited')
        pyds.unset_callback_funcs()
        self.gracefully_stop_system()


def main():
        ds_pipeline = DSSystem()
        proc = mp.Process(target=ds_pipeline.start_system)
        proc.start()

       while True:

           ds_pipeline.buffer_queue.get()
           for ix, br_frame in enumerate(br_batch.frame_meta_list):
               ...
               if stop:
                  break

       # How to properly shutdown the gst_loop.run() from here?

Expected behavior

I want to be able to run other processing tasks while decoding frames from the RTSP sources. Then when I am complete my processing I want to be able to shutdown all parts of code (inference engines and pipeline) in a proper manner like sending an eos event to the pipeline etc. With my current implementation putting gst_loop.run() in a seperate Process() I lose access to the bus for sending messages.
Again, I'm new to GLib and async stuff so was curious if this repo could help in my use case.

Screenshots

No response

Environment

  • Operating System: Ubuntu 20.04
  • Python version: 3.9
  • Software versions:
    • PyDs: 6.2
    • Gst: 1.16.3
    • GLib: 2.64.2

Logs

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugA crash or error in behavior.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions