-
Notifications
You must be signed in to change notification settings - Fork 192
Expand file tree
/
Copy pathdls_onvif_sample.py
More file actions
152 lines (123 loc) · 5.42 KB
/
dls_onvif_sample.py
File metadata and controls
152 lines (123 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
'''
This module discovers ONVIF cameras on the network and launches GStreamer DL Streamer
pipelines for each discovered camera profile. It manages multiple streaming processes
concurrently and handles their output in separate threads.
'''
# ==============================================================================
# Copyright (C) 2026 Intel Corporation
#
# SPDX-License-Identifier: MIT
# ==============================================================================
import argparse
import shlex
import subprocess
import threading
from typing import List
from onvif import ONVIFCamera
import dls_onvif_discovery_utils as dls_discovery
def run_single_streamer(gst_command: List[str]) -> subprocess.Popen:
"""Runs a single DL Streamer in non-blocking mode
Args:
gst_command: List of command arguments (avoids shell injection)
Returns:
subprocess.Popen object or None on failure
"""
def read_output(pipe, prefix, camera_id):
"""Read output from pipe in a separate thread"""
try:
for line in iter(pipe.readline, ''):
if line:
print(f"[{camera_id}] {prefix}: {line.strip()}")
except Exception as e: # pylint: disable=broad-exception-caught
print(f"[{camera_id}] Error reading {prefix}: {e}")
finally:
pipe.close()
try:
process = subprocess.Popen( # pylint: disable=consider-using-with
gst_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
camera_id = f"PID:{process.pid}"
print(f"DL Streamer started ({camera_id})")
# Start threads to read stdout and stderr
threading.Thread(
target=read_output,
args=(process.stdout, "OUT", camera_id),
daemon=True
).start()
threading.Thread(
target=read_output,
args=(process.stderr, "ERR", camera_id),
daemon=True
).start()
return process
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error starting DL Streamer: {e}")
return None
def prepare_commandline(camera_rtsp_url: str, pipeline_elements: str) -> List[str]:
"""Prepare GStreamer command line from RTSP URL and pipeline elements.
Args:
camera_rtsp_url: The RTSP stream URL
pipeline_elements: GStreamer pipeline elements as a string
Returns:
List of command arguments (safer than shell string)
"""
if not camera_rtsp_url or not pipeline_elements:
raise ValueError("URL and pipeline elements cannot be empty!")
# Build command as list to avoid shell injection
prepared_command = ["gst-launch-1.0", "rtspsrc", f"location={camera_rtsp_url}"]
# Safely parse pipeline elements
prepared_command.extend(shlex.split(pipeline_elements))
return prepared_command
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='ONVIF Camera Discovery and DL Streamer Pipeline Launcher')
parser.add_argument('--verbose', type=bool, default=False,
help='If False then no verbose output, if True then verbose output')
parser.add_argument('--user', type=str, help='ONVIF camera username')
parser.add_argument('--password', type=str, help='ONVIF camera password')
args = parser.parse_args()
# List to store all running processes
running_processes = []
# Start discovery (send broadcast):
cameras = dls_discovery.discover_onvif_cameras()
for camera in cameras:
# Get ONVIF camera capabilities:
camera_obj = ONVIFCamera(camera['hostname'], camera['port'], args.user, args.password)
# Get DL Streamer command line from config.json
command_json = dls_discovery.get_commandline_by_key("config.json", camera['hostname'])
if not command_json:
print(f"No command line found for {camera['hostname']}, skipping...")
continue
# Get camera profiles and start DL Streamer for each profile
profiles = dls_discovery.camera_profiles(camera_obj, args.verbose)
for i, profile in enumerate(profiles, 1):
rtsp_url = profile.rtsp_url
if not rtsp_url:
print(f"No RTSP URL found for profile {profile.name}, skipping...")
continue
commandline_executed = prepare_commandline(rtsp_url, command_json)
print(f"Executing command line for {camera['hostname']}: {commandline_executed}")
running_process = run_single_streamer(commandline_executed)
if running_process:
running_processes.append(running_process)
# Keep script running and wait for all processes
if running_processes:
print("Press Ctrl+C to stop all processes and exit.\n")
try:
# Wait for all processes to complete
for active_process in running_processes:
active_process.wait()
except KeyboardInterrupt:
print("\n\nStopping all processes...")
for process_problem in running_processes:
if process_problem.poll() is None: # If process is still running
process_problem.terminate()
print(f"Terminated process PID: {process_problem.pid}")
print("All processes stopped.")
else:
print("No processes started.")