Skip to content

Commit 4d39376

Browse files
pbartosiktbujewsk
andauthored
[DLStreamer] DLS Optimizer Script (open-edge-platform#1026)
Co-authored-by: Bujewski, Tomasz <[email protected]>
1 parent c83f982 commit 4d39376

File tree

3 files changed

+368
-0
lines changed

3 files changed

+368
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Overview
2+
DLSOptimizer is a tool for helping users discover more optimal versions of the pipelines they run on DL Streamer. It will explore different modifications to your pipeline that are known to increase performance and measure it. As a result, you can expect to receive a pipeline that's better suited to your setup.
3+
4+
# Limitations
5+
Currently the DLSOptimizer focuses mainly on DL Streamers elements, specifically the `gvadetect` and `gvaclassify` elements. The produced pipeline could still have potential for further optimization by transforming other elements.
6+
7+
Multi-stream pipelines are also currently not supported.
8+
9+
# Usage
10+
```
11+
python3 optimizer.py [OPT] -- PIPELINE
12+
13+
Options:
14+
--search-duration SEARCH_DURATION How long should the optimizer search for better pipelines
15+
--sample-duration SAMPLE_DURATION How long should every pipeline be sampled for performance
16+
```
17+
18+
Increasing the search duration will increase the chances of discovering more performant pipelines. Increasing the sample duration will improve the stability of the search, but less pipelines will potentially be explored.
19+
20+
# Example
21+
```
22+
python3 optimizer.py -- urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 ! decodebin ! gvadetect model=/home/optimizer/models/public/yolo11s/INT8/yolo11s.xml ! queue ! gvawatermark ! vah264enc ! h264parse ! mp4mux ! fakesink
23+
[__main__] [ INFO] - GStreamer initialized successfully
24+
[__main__] [ INFO] - GStreamer version: 1.26.6
25+
[__main__] [ INFO] - Detected GPU Device
26+
[__main__] [ INFO] - No NPU Device detected
27+
[__main__] [ INFO] - Sampling for 10 seconds...
28+
FpsCounter(last 1.00sec): total=46.87 fps, number-streams=1, per-stream=46.87 fps
29+
FpsCounter(average 1.00sec): total=46.87 fps, number-streams=1, per-stream=46.87 fps
30+
FpsCounter(last 1.01sec): total=43.70 fps, number-streams=1, per-stream=43.70 fps
31+
FpsCounter(average 2.01sec): total=45.28 fps, number-streams=1, per-stream=45.28 fps
32+
33+
...
34+
35+
FpsCounter(last 1.09sec): total=73.45 fps, number-streams=1, per-stream=73.45 fps
36+
FpsCounter(average 8.70sec): total=73.65 fps, number-streams=1, per-stream=73.65 fps
37+
[__main__] [ INFO] - Best found pipeline: urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 !decodebin3!gvadetect model=/home/optimizer/models/public/yolo11s/INT8/yolo11s.xml device=GPU pre-process-backend=va-surface-sharing batch-size=2 nireq=2 ! queue ! gvawatermark ! vah264enc ! h264parse ! mp4mux ! fakesink with fps: 81.987923.2
38+
```
39+
In this case the optimizer started with a pipeline that ran at ~45fps, and found a pipeline that ran at ~82fps instead. The specific improvements were:
40+
- replacing the `decodebin` with the `decodebin3` element.
41+
- configuring the `gvadetect` element to use GPU for processing
42+
- setting the `batch-size` parameter to 2
43+
- setting the `nireq` parameter to 2
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
# ==============================================================================
2+
# Copyright (C) 2025-2025 Intel Corporation
3+
#
4+
# SPDX-License-Identifier: MIT
5+
# ==============================================================================
6+
7+
import argparse
8+
import time
9+
import logging
10+
import itertools
11+
import os
12+
import subprocess
13+
14+
import gi
15+
gi.require_version("Gst", "1.0")
16+
from gi.repository import Gst
17+
18+
####################################### Init ######################################################
19+
20+
Gst.init()
21+
logging.basicConfig(level=logging.INFO, format="[%(name)s] [%(levelname)8s] - %(message)s")
22+
logger = logging.getLogger(__name__)
23+
logger.info("GStreamer initialized successfully")
24+
gst_version = Gst.version()
25+
logger.info("GStreamer version: %d.%d.%d",
26+
gst_version.major,
27+
gst_version.minor,
28+
gst_version.micro)
29+
30+
####################################### Utils #####################################################
31+
32+
def parse_element_parameters(element):
33+
parameters = element.strip().split(" ")
34+
del parameters[0]
35+
parsed_parameters = {}
36+
for parameter in parameters:
37+
parts = parameter.split("=")
38+
parsed_parameters[parts[0]] = parts[1]
39+
40+
return parsed_parameters
41+
42+
def assemble_parameters(parameters):
43+
result = ""
44+
for parameter, value in parameters.items():
45+
result = result + parameter + "=" + value + " "
46+
47+
return result
48+
49+
def log_parameters_of_interest(pipeline):
50+
for element in pipeline:
51+
if "gvadetect" in element:
52+
parameters = parse_element_parameters(element)
53+
logger.info("Found Gvadetect, device: %s, batch size: %s, nireqs: %s",
54+
parameters.get("device", "not set"),
55+
parameters.get("batch-size", "not set"),
56+
parameters.get("nireq", "not set"))
57+
58+
if "gvaclassify" in element:
59+
parameters = parse_element_parameters(element)
60+
logger.info("Found Gvaclassify, device: %s, batch size: %s, nireqs: %s",
61+
parameters.get("device", "not set"),
62+
parameters.get("batch-size", "not set"),
63+
parameters.get("nireq", "not set"))
64+
65+
###################################### System Scanning ############################################
66+
67+
def scan_system():
68+
context = {"GPU": False,
69+
"NPU": False}
70+
71+
# check for presence of GPU
72+
try:
73+
gpu_query = subprocess.run(["dpkg", "-l", "intel-opencl-icd"],
74+
stderr=subprocess.DEVNULL,
75+
stdout=subprocess.DEVNULL,
76+
check=False)
77+
gpu_dir = os.listdir("/dev/dri")
78+
for file in gpu_dir:
79+
if "render" in file and gpu_query.returncode == 0:
80+
context["GPU"] = True
81+
82+
# can happen on missing directory, signifies no GPU support
83+
except Exception: # pylint: disable=broad-exception-caught
84+
pass
85+
86+
if context["GPU"]:
87+
logger.info("Detected GPU Device")
88+
else:
89+
logger.info("No GPU Device detected")
90+
91+
# check for presence of NPU
92+
try:
93+
npu_query = subprocess.run(["dpkg", "-l", "intel-driver-compiler-npu"],
94+
stderr=subprocess.DEVNULL,
95+
stdout=subprocess.DEVNULL,
96+
check=False)
97+
npu_dir = os.listdir("/dev/accel/")
98+
for file in npu_dir:
99+
if "accel" in file and npu_query.returncode == 0:
100+
context["NPU"] = True
101+
102+
# can happen on missing directory, signifies no NPU support
103+
except Exception: # pylint: disable=broad-exception-caught
104+
pass
105+
106+
if context["NPU"]:
107+
logger.info("Detected NPU Device")
108+
else:
109+
logger.info("No NPU Device detected")
110+
111+
return context
112+
113+
##################################### Pipeline Running ############################################
114+
115+
def explore_pipelines(suggestions, base_fps, search_duration, sample_duration):
116+
best_pipeline = []
117+
start_time = time.time()
118+
best_fps = base_fps
119+
for combination in itertools.product(*suggestions):
120+
combination = list(combination)
121+
log_parameters_of_interest(combination)
122+
123+
try:
124+
fps = sample_pipeline(combination, sample_duration)
125+
126+
if fps > best_fps:
127+
best_fps = fps
128+
best_pipeline = combination
129+
130+
except Exception as e:
131+
logger.debug("Pipeline failed to start: %s", e)
132+
133+
cur_time = time.time()
134+
if cur_time - start_time > search_duration:
135+
break
136+
137+
return best_pipeline, best_fps
138+
139+
def sample_pipeline(pipeline, sample_duration):
140+
pipeline = pipeline.copy()
141+
142+
# check if there is an fps counter after the last inference element
143+
for i, element in enumerate(reversed(pipeline)):
144+
# exit early if one is found before other elements
145+
if "gvafpscounter" in element:
146+
break
147+
148+
# add one if no counter was found before inference elements
149+
if "gvadetect" in element or "gvaclassify" in element:
150+
pipeline.insert(len(pipeline) - i, "gvafpscounter")
151+
152+
pipeline = "!".join(pipeline)
153+
logger.debug("Testing: %s", pipeline)
154+
155+
pipeline = Gst.parse_launch(pipeline)
156+
157+
logger.info("Sampling for %s seconds...", str(sample_duration))
158+
fps_counter = next(filter(lambda element: "gvafpscounter" in element.name, reversed(pipeline.children))) # pylint: disable=line-too-long
159+
160+
bus = pipeline.get_bus()
161+
162+
pipeline.set_state(Gst.State.PLAYING)
163+
terminate = False
164+
start_time = time.time()
165+
while not terminate:
166+
time.sleep(1)
167+
168+
# Incorrect pipelines sometimes get stuck in Ready state instead of failing.
169+
# Terminate in those cases.
170+
_, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
171+
if state == Gst.State.READY:
172+
del pipeline
173+
raise RuntimeError("Pipeline not healthy, terminating early")
174+
175+
cur_time = time.time()
176+
if cur_time - start_time > sample_duration:
177+
terminate = True
178+
179+
pipeline.set_state(Gst.State.NULL)
180+
181+
# Process any messages from the bus
182+
message = bus.pop()
183+
while message is not None:
184+
if message.type == Gst.MessageType.ERROR:
185+
error, _ = message.parse_error()
186+
logger.error("Pipeline error: %s", error.message)
187+
elif message.type == Gst.MessageType.WARNING:
188+
warning, _ = message.parse_warning()
189+
logger.warning("Pipeline warning: %s", warning.message)
190+
elif message.type == Gst.MessageType.STATE_CHANGED:
191+
old, new, _ = message.parse_state_changed()
192+
logger.debug("State changed: %s -> %s ", old, new)
193+
else:
194+
logger.error("Other message: %s", str(message))
195+
message = bus.pop()
196+
197+
del pipeline
198+
fps = fps_counter.get_property("avg-fps")
199+
logger.debug("Sampled fps: %f.2", fps)
200+
return fps
201+
202+
######################################## Preprocess ###############################################
203+
204+
def preprocess_pipeline(pipeline):
205+
for i, element in enumerate(pipeline):
206+
if "decodebin" in element:
207+
pipeline[i] = "decodebin3"
208+
209+
if "vaapipostproc" in element:
210+
pipeline[i] = "vapostproc"
211+
212+
if "vaapi-surface-sharing" in element:
213+
pipeline[i] = "va-surface-sharing"
214+
215+
#################################### Gvadetect & Gvaclassify ######################################
216+
217+
def add_gvadetect_suggestions(suggestions, context):
218+
add_classification_suggestions("gvadetect", suggestions, context)
219+
220+
def add_gvaclassify_suggestions(suggestions, context):
221+
add_classification_suggestions("gvaclassify", suggestions, context)
222+
223+
def add_classification_suggestions(element, suggestions, context):
224+
if context["GPU"]:
225+
add_parameter_suggestions(element, "GPU", "va-surface-sharing", suggestions)
226+
227+
if context["NPU"]:
228+
add_parameter_suggestions(element, "NPU", "va", suggestions)
229+
230+
add_parameter_suggestions(element, "CPU", "opencv", suggestions)
231+
232+
233+
def add_parameter_suggestions(element, device, backend, suggestions):
234+
batches = [1, 2, 4, 8, 16, 32]
235+
nireqs = range(1, 9)
236+
for suggestion in suggestions:
237+
if element in suggestion[0]:
238+
parameters = parse_element_parameters(suggestion[0])
239+
240+
for batch in batches:
241+
for nireq in nireqs:
242+
parameters["device"] = device
243+
parameters["pre-process-backend"] = backend
244+
parameters["batch-size"] = str(batch)
245+
parameters["nireq"] = str(nireq)
246+
suggestion.append(f"{element} {assemble_parameters(parameters)}")
247+
248+
####################################### Main Logic ################################################
249+
250+
# Steps of pipeline optimization:
251+
# 1. Measure the baseline pipeline's performace.
252+
# 2. Pre-process the pipeline to cover cases where we're certain of the best alternative.
253+
# 3. Run the pipeline through generators that provide suggestions for element alternatives.
254+
# 4. Create a cartesian product of the suggestions
255+
# and start running the combinations to measure performance.
256+
# 5. Any time a better pipeline is found, save it and its performance information.
257+
# 6. Return the best discovered pipeline.
258+
def get_optimized_pipeline(pipeline, search_duration = 300, sample_duration = 10):
259+
context = scan_system()
260+
261+
pipeline = " ".join(pipeline).split("!")
262+
263+
# Measure the performance of the original pipeline
264+
try:
265+
fps = sample_pipeline(pipeline, sample_duration)
266+
except Exception as e:
267+
logger.error("Pipeline failed to start, unable to measure fps: %s", e)
268+
raise RuntimeError("Provided pipeline is not valid") from e
269+
270+
logger.info("FPS: %f.2", fps)
271+
272+
# Replace any elements that we're sure have a best-in-class alternatives.
273+
preprocess_pipeline(pipeline)
274+
275+
# Prepare the suggestions structure
276+
# Suggestions structure:
277+
# [
278+
# ["element1 param1=value1", "element1 param1=value2", ...other variants],
279+
# ["element2 param1=value1", "element2 param1=value2", ...other variants],
280+
# ["element3 param1=value1", "element3 param1=value2", ...other variants],
281+
# ...other pipeline elements
282+
# ]
283+
suggestions = []
284+
for element in pipeline:
285+
suggestions.append([element])
286+
287+
# Collect suggestions for pipeline improvements
288+
add_gvadetect_suggestions(suggestions, context)
289+
add_gvaclassify_suggestions(suggestions, context)
290+
291+
# Explore the suggestions and try to discover pipelines with better performance
292+
best_pipeline, best_fps = explore_pipelines(suggestions, fps, search_duration, sample_duration)
293+
294+
# Fall back in case no better pipeline was found.
295+
if not best_pipeline:
296+
best_pipeline = pipeline
297+
best_fps = fps
298+
299+
# Reconstruct the pipeline as a single string and return it.
300+
return "!".join(best_pipeline), best_fps
301+
302+
def main():
303+
parser = argparse.ArgumentParser(
304+
prog="DLStreamer Pipeline Optimization Tool",
305+
description="Use this tool to try and find versions of your pipeline that will run with increased performance." # pylint: disable=line-too-long
306+
)
307+
parser.add_argument("--search-duration", default=300,
308+
help="Duration of time which should be spent searching for optimized pipelines (default: %(default)ss)") # pylint: disable=line-too-long
309+
parser.add_argument("--sample-duration", default=10,
310+
help="Duration of sampling individual pipelines. Longer duration should offer more stable results (default: %(default)ss)") # pylint: disable=line-too-long
311+
parser.add_argument("pipeline", nargs="+",
312+
help="Pipeline to be analyzed")
313+
args=parser.parse_args()
314+
315+
try:
316+
best_pipeline, best_fps = get_optimized_pipeline(args.pipeline,
317+
args.search_duration,
318+
args.sample_duration)
319+
logger.info("\nBest found pipeline: %s \nwith fps: %f.2", best_pipeline, best_fps)
320+
except Exception as e: # pylint: disable=broad-exception-caught
321+
logger.error("Failed to optimize pipeline: %s", e)
322+
323+
if __name__ == "__main__":
324+
main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
PyGObject==3.50.0

0 commit comments

Comments
 (0)