diff --git a/docs/user-guide/dev_guide/optimizer.md b/docs/user-guide/dev_guide/optimizer.md index 16362f4ca..d3f03dc62 100644 --- a/docs/user-guide/dev_guide/optimizer.md +++ b/docs/user-guide/dev_guide/optimizer.md @@ -46,10 +46,18 @@ Arguments: Options: --search-duration SEARCH_DURATION How long should the optimizer search for better pipelines. --sample-duration SAMPLE_DURATION How long should every pipeline be sampled for performance. + --detection-threshold THRESHOLD Minimum threshold of detections that tested pipelines are + not allowed to cross in order to count as valid alternatives. --multistream-fps-limit LIMIT Minimum fps limit which streams are not allowed to cross when optimizing for a multi-stream scenario. --enable-cross-stream-batching Enable cross stream batching for inference elements in fps mode. + --allowed-devices ALLOWED_DEVICES List of allowed devices (CPU, GPU, NPU) to be used by the optimizer. + If not specified, all available, detected devices will be used. + Tool does not support discrete GPU selection. + eg.--allowed-devices CPU NPU,--allowed-devices GPU --log-level LEVEL Configure the logging detail level. + -v, --verbose Print information about every candidate pipeline investigated during + optimization process. ``` **`search-duration`** default: `300` seconds \ Increasing the **search duration** will increase the chances of discovering more performant pipelines. @@ -64,9 +72,15 @@ but the final result is liable to support less streams overall. **`enable-cross-stream-batching`** \ Levy the inference instance feature of DL Streamer to batch work across multiple streams in fps mode. +**`allowed-devices`** \ +Allows you to limit the set of devices that will be considered during the optimization process. + **`log-level`** default: `INFO` \ Available **log levels** are: CRITICAL, FATAL, ERROR, WARN, INFO, DEBUG. +**`verbose`** \ +Prints extra information about the candidate pipelines which were examined during the optimization process. + >**Note**\ >Search duration and sample duration both affect the amount of pipelines that will be explored during the search. \ >The total amount should be approximately `search_duration / sample_duration` pipelines. @@ -99,14 +113,14 @@ In this case the optimizer started with a pipeline that ran at ~45fps, and found ## Using the optimizer as a library -The easiest way of importing the optimizer into your scripts is to include it in your `PYTHONPATH` environment variable: +The easiest way of importing the optimizer into your scripts is to include it in your `PYTHONPATH` environment variable: \ ```export PYTHONPATH=/opt/intel/dlstreamer/scripts/optimizer``` Targets which are exported in order to facilitate usage inside of scripts: ### `preprocess_pipeline(pipeline) -> processed_pipeline` -- `pipeline` - A string containing a valid DL Streamer pipeline. -- `processed_pipeline` - A string containing the pipeline with all relevant substitutions. +- `pipeline: string` - A string containing a valid DL Streamer pipeline. +- `processed_pipeline: string` - A string containing the pipeline with all relevant substitutions. Perform quick search and replace for known combinations of elements with more performant alternatives. @@ -117,59 +131,111 @@ Initialized without any arguments optimizer = DLSOptimizer() ``` #### Methods -**`set_search_duration(duration)`** -- `duration` - The duration of searching for optimized pipelines in seconds, default `300`. +**`get_baseline_pipeline() -> pipeline, fps, streams`** +- `pipeline: string` - The baseline pipeline from which optimization started. +- `fps: float` - Fps measured for the baseline pipeline. +- `streams: int` - Number of streams in the baseline pipeline. -Configures the search duration used in optimization sessions. +Returns information about the original pipeline used in the optimization process. Returned values are meaningless until at least one optimization operation is performed. ``` optimizer = DLSOptimizer() -optimizer.set_search_duration(600) +for (_, _) in optimizer.iter_optimize_for_fps(pipeline): + pass +pipeline, fps, streams = optimizer.get_baseline_pipeline() ``` +--- +**`get_optimal_pipeline() -> pipeline, fps, streams`** +- `pipeline: string` - The best pipeline found during optimization. +- `fps: float` - Fps measured for the optimal pipeline. +- `streams: int` - Number of streams in the optimal pipeline. +Returns information about the best pipeline found during the optimization process. Returned values are meaningless until at least one optimization operation is performed. +``` +optimizer = DLSOptimizer() +for (_, _) in optimizer.iter_optimize_for_streams(pipeline): + pass +best_pipeline, best_fps, best_streams = optimizer.get_optimal_pipeline() +``` +--- **`set_sample_duration(duration)`** -- `duration` - The duration of sampling each candidate pipeline in seconds, default `10`. +- `duration: int` - The duration of sampling each candidate pipeline in seconds, default `10`. Configures the sample duration used in optimization sessions. ``` optimizer = DLSOptimizer() optimizer.set_sample_duration(15) ``` +--- +**`set_detections_error_threshold(threshold)`** +- `threshold: float` - The threshold of counted detections, between `0.0` and `1.0`, default `0.95`. +Minimum threshold of detections that tested pipelines are not allowed to cross in order to count as valid alternatives. +``` +optimizer = DLSOptimizer() +optimizer.set_detections_error_threshold(0.8) +``` +--- **`enable_cross_stream_batching(enable)`** -- `enable` - Enable the cross stream batching feature, default `False`. +- `enable: bool` - Enable the cross stream batching feature, default `False`. Levy the inference instance feature of DL Streamer to batch work across multiple streams when optimizing for fps. ``` optimizer = DLSOptimizer() optimizer.enable_cross_stream_batching(True) ``` - +--- **`set_mutlistream_fps_limit(limit)`** -- `limit` - The minimum fps limit allowed for individual streams when optimizing for amount of streams, default `30`. +- `limit: int` - The minimum fps limit allowed for individual streams when optimizing for amount of streams, default `30`. Configures the minimum fps limit that streams are not allowed to fall below when optimizing for a multi-stream scenario. ``` optimizer = DLSOptimizer() optimizer.set_multistream_fps_limit(45) ``` +--- +**`set_allowed_devices(devices)`** +- `devices: list[string]` - A list of device identifiers. -**`optimize_for_fps(pipeline) -> optimized_pipeline, fps`** -- `pipeline` - A string containing a valid DL Streamer pipeline. -- `optimized_pipeline` - A string containing the best performing pipeline that has been found during the search. -- `fps` - The measured fps of the best perfmorming pipeline. +Limits the set of devices which will be considered during the optimization process. +``` +optimizer = DLSOptimizer() +optimizer.set_allowed_devices(["CPU", "GPU"]) +``` +--- +**`optimize_for_fps(pipeline, search_duration) -> optimized_pipeline, fps`** +- `pipeline: string` - A string containing a valid DL Streamer pipeline. +- `search_duration: int` - The duration of searching for better pipelines, default `300`. +- `optimized_pipeline: string` - A string containing the best performing pipeline that has been found during the search. +- `fps: float` - The measured fps of the best perfmorming pipeline. -Runs a series of optimization steps on the pipeline searching for version with better performance measured by fps. +Runs a series of optimization steps on the pipeline searching for a version with better performance measured by fps. ``` pipeline = "urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 ! decodebin ! gvadetect model=/home/optimizer/models/public/yolo11s/INT8/yolo11s.xml ! queue ! gvawatermark ! fakesink" optimizer = DLSOptimizer() optimizer.optimize_for_fps(pipeline) ``` +--- +**`iter_optimize_for_fps(pipeline) -> optimized_pipeline, fps`** +- `pipeline: string` - A string containing a valid DL Streamer pipeline. +- `optimized_pipeline: string` - A string containing a candidate pipeline that has been tested. +- `fps: float` - The measured fps of the candidate pipeline. -**`optimize_for_streams(pipeline) -> optimized_pipeline, fps, streams`** -- `pipeline` - A string containing a valid DL Streamer pipeline. -- `optimized_pipeline` - A string containing the best performing pipeline that has been found during the search. -- `fps` - The measured fps of the best perfmorming pipeline. -- `streams` - The number of streams capable of running above the fps limit with the optimized pipeline. +Runs a series of optimization steps on the pipeline searching for version with better performance measured by fps. Returns each and every candidate pipeline that has been considered. +``` +pipeline = "urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 ! decodebin ! gvadetect model=/home/optimizer/models/public/yolo11s/INT8/yolo11s.xml ! queue ! gvawatermark ! fakesink" +optimizer = DLSOptimizer() +for (pipeline, fps) in optimizer.iter_optimize_for_fps(pipeline): + print(f"Tested: {pipeline} @ {fps}") +best_pipeline, best_fps, _ = optimizer.get_optimal_pipeline() +print(f"Optimal pipeline: {best_pipeline} @ {best_fps}") +``` +--- +**`optimize_for_streams(pipeline, search_duration) -> optimized_pipeline, fps, streams`** +- `pipeline: string` - A string containing a valid DL Streamer pipeline. +- `search_duration: int` - The duration of searching for better pipelines, default `300`. +- `optimized_pipeline: string` - A string containing the best performing pipeline that has been found during the search. +- `fps: float` - The measured fps of the best perfmorming pipeline. +- `streams: int` - The number of streams capable of running above the fps limit with the optimized pipeline. Searching for a version of the input pipeline which can support the highest number of concurrent streams. ``` @@ -177,9 +243,22 @@ pipeline = "urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-fi optimizer = DLSOptimizer() optimizer.optimize_for_streams(pipeline) ``` +--- +**`iter_optimize_for_streams(pipeline) -> candidate_pipeline, fps, streams`** +- `pipeline: string` - A string containing a valid DL Streamer pipeline. +- `optimized_pipeline: string` - A string containing a candidate pipeline that has been tested. +- `fps: float` - The measured fps of the candidate pipeline. +- `streams: int` - The number of streams capable of running above the fps limit with the candidate pipeline. -Runs a series of optimization steps on the pipeline searching for a better performing versions. - +Searching for a version of the input pipeline which can support the highest number of concurrent streams. Returns each and every candidate pipeline that has been considered. +``` +pipeline = "urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 ! decodebin ! gvadetect model=/home/optimizer/models/public/yolo11s/INT8/yolo11s.xml ! queue ! gvawatermark ! fakesink" +optimizer = DLSOptimizer() +for (pipeline, fps, streams) in optimizer.iter_optimize_for_streams(pipeline): + print(f"Tested: {pipeline} @ {streams} & {fps}") +best_pipeline, best_fps, best_streams = optimizer.get_optimal_pipeline() +print(f"Optimal pipeline: {best_pipeline} @ {best_streams} & {best_fps}") +``` --- **Example:** @@ -190,9 +269,8 @@ from optimizer import get_optimized_pipeline pipeline = "urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 ! decodebin ! gvadetect model=/home/optimizer/models/public/yolo11s/INT8/yolo11s.xml ! queue ! gvawatermark ! fakesink" optimizer = DLSOptimizer() -optimizer.set_search_duration(600) optimizer.set_sample_duration(15) -optimized_pipeline, fps = optimizer.optimize_for_fps(pipeline) +optimized_pipeline, fps = optimizer.optimize_for_fps(pipeline, search_duration = 600) print("Best discovered pipeline: " + optimized_pipeline) print("Measured fps: " + fps) ``` diff --git a/scripts/optimizer/__main__.py b/scripts/optimizer/__main__.py index 6d8ffe119..9c8b4d007 100644 --- a/scripts/optimizer/__main__.py +++ b/scripts/optimizer/__main__.py @@ -8,9 +8,44 @@ import logging import textwrap import sys +import time from optimizer import DLSOptimizer # pylint: disable=no-name-in-module +def _display_result(pipeline, fps): + logger.info("============================== CANDIDATE =============================") + logger.info("Sampled pipeline: %s", str(pipeline)) + logger.info("") + logger.info("Recorded fps: %.2f", fps) + logger.info("======================================================================") + +def _display_summary_fps(best_pipeline, best_fps, initial_pipeline, initial_fps): + logger.info("=============================== SUMMARY ==============================") + if best_fps > initial_fps: + logger.info("Optimized pipeline found with %.2f fps improvement over the original pipeline.", best_fps - initial_fps) + logger.info("Original pipeline FPS: %.2f", initial_fps) + logger.info("Optimized pipeline: %s", str(best_pipeline)) + logger.info("Optimized pipeline FPS: %.2f", best_fps) + else: + logger.info("No optimized pipeline found that outperforms the original pipeline.") + logger.info("Original pipeline: %s", str(initial_pipeline)) + logger.info("Original pipeline FPS: %.2f", initial_fps) + logger.info("======================================================================") + +def _display_summary_streams(best_pipeline, best_fps, streams): + full_pipeline = [] + for _ in range(0, streams): + full_pipeline.append(best_pipeline) + full_pipeline = " ".join(full_pipeline) + + logger.info("=============================== SUMMARY ==============================") + logger.info("Optimized pipeline: %s", str(best_pipeline)) + logger.info("Number of streams pipeline can support: %d", streams) + logger.info("Optimized pipeline FPS at max streams: %.2f", best_fps) + logger.info("") + logger.info("Full pipeline: %s", full_pipeline) + logger.info("======================================================================") + parser = argparse.ArgumentParser( prog="DLStreamer Pipeline Optimization Tool", formatter_class=argparse.RawTextHelpFormatter, @@ -34,6 +69,8 @@ ''')) parser.add_argument("PIPELINE", nargs="+", help="Pipeline to be analyzed") +parser.add_argument("-v", "--verbose", action="store_true", + help="Print more information about the optimization progress") parser.add_argument("--search-duration", default=300, type=float, help="Duration in seconds of time which should be spent searching for optimized pipelines (default: %(default)s)") parser.add_argument("--sample-duration", default=10, type=float, @@ -58,7 +95,6 @@ try: optimizer = DLSOptimizer() - optimizer.set_search_duration(args.search_duration) optimizer.set_sample_duration(args.sample_duration) optimizer.set_detections_error_threshold(args.detection_threshold) optimizer.set_multistream_fps_limit(args.multistream_fps_limit) @@ -77,18 +113,35 @@ try: match args.mode: case "fps": - best_pipeline, best_fps = optimizer.optimize_for_fps(pipeline) - case "streams": - best_pipeline, best_fps, streams = optimizer.optimize_for_streams(pipeline) - - full_pipeline = [] - for _ in range(0, streams): - full_pipeline.append(best_pipeline) + start_time = time.time() + for (pipeline, fps) in optimizer.iter_optimize_for_fps(pipeline): + if args.verbose: + _display_result(pipeline, fps) + + cur_time = time.time() + if cur_time - start_time > args.search_duration: + break + + base_pipeline, base_fps, _ = optimizer.get_baseline_pipeline() + best_pipeline, best_fps, _ = optimizer.get_optimal_pipeline() + _display_summary_fps(best_pipeline, best_fps, base_pipeline, base_fps) - full_pipeline = " ".join(full_pipeline) - - logger.info("Optimized found pipeline for multi-streams: %s", full_pipeline) - logger.info("with fps: %.2f", best_fps) - logger.info("max achieved streams: %d", streams) -except Exception as e: # pylint: disable=broad-exception-caught - logger.error("Failed to optimize pipeline: %s", e) + case "streams": + start_time = time.time() + for (pipeline, fps, streams) in optimizer.iter_optimize_for_streams(pipeline): + full_pipeline = [] + for _ in range(0, streams): + full_pipeline.append(pipeline) + full_pipeline = " ".join(full_pipeline) + + if args.verbose: + _display_result(full_pipeline, fps) + + cur_time = time.time() + if cur_time - start_time > args.search_duration: + break + + best_pipeline, best_fps, streams = optimizer.get_optimal_pipeline() + _display_summary_streams(best_pipeline, best_fps, streams) +except RuntimeError as e: # pylint: disable=broad-exception-caught + logger.error("Failed to optimize pipeline: %s", e) \ No newline at end of file diff --git a/scripts/optimizer/optimizer.py b/scripts/optimizer/optimizer.py index 13445b64f..84e8ad998 100644 --- a/scripts/optimizer/optimizer.py +++ b/scripts/optimizer/optimizer.py @@ -8,6 +8,7 @@ import itertools import os import re +import warnings from preprocess import preprocess_pipeline from processors.inference import DeviceGenerator, BatchGenerator, NireqGenerator, add_instance_ids @@ -16,13 +17,16 @@ gi.require_version("Gst", "1.0") from gi.repository import Gst # pylint: disable=no-name-in-module +SINGLE_STREAM = 1 +DEFAULT_SEARCH_DURATION = 300 + ####################################### Init ###################################################### Gst.init() logger = logging.getLogger(__name__) -logger.info("GStreamer initialized successfully") +logger.debug("GStreamer initialized successfully") gst_version = Gst.version() -logger.info("GStreamer version: %d.%d.%d", +logger.debug("GStreamer version: %d.%d.%d", gst_version.major, gst_version.minor, gst_version.micro) @@ -31,24 +35,35 @@ class DLSOptimizer: def __init__(self): - self._search_duration = 300 + # configuration + self._start_time = time.time() self._sample_duration = 10 self._multistream_fps_limit = 30 self._enable_cross_stream_batching = False self._detections_error_threshold = 0.95 + + # internal fields + self._initial_detections = 0 + self._initial_fps = 0 + self._initial_pipeline = [] + self._optimal_pipeline = [] + self._optimal_fps = 0 + self._optimal_streams = SINGLE_STREAM self._generators = { "device": DeviceGenerator(), "batch": BatchGenerator(), "nireq": NireqGenerator() } - + def get_baseline_pipeline(self): + return "!".join(self._initial_pipeline), self._initial_fps, SINGLE_STREAM + + def get_optimal_pipeline(self): + return "!".join(self._optimal_pipeline), self._optimal_fps, self._optimal_streams + def enable_cross_stream_batching(self, enable): # pylint: disable=missing-function-docstring self._enable_cross_stream_batching = enable - def set_search_duration(self, duration): - self._search_duration = duration - def set_sample_duration(self, duration): self._sample_duration = duration @@ -61,6 +76,15 @@ def set_allowed_devices(self, devices): def set_detections_error_threshold(self, threshold): self._detections_error_threshold = threshold + # deprecated + def set_search_duration(self, duration): + warnings.warn( + "Function set_search_duration has been deprecated. " + "Please pass search duration when calling optimize_for_fps or optimize_for_streams instead.", + DeprecationWarning, + stacklevel=2 + ) + ################################### Main Logic ################################################ # Steps of pipeline optimization: @@ -71,126 +95,135 @@ def set_detections_error_threshold(self, threshold): # 5. Iterate over the suggestions from every generator # 6. Any time a better pipeline is found, save it and its performance information. # 7. Return the best discovered pipeline. - def optimize_for_fps(self, pipeline): + def optimize_for_fps(self, pipeline, search_duration = DEFAULT_SEARCH_DURATION): + start_time = time.time() + for (_, _) in self.iter_optimize_for_fps(pipeline): + cur_time = time.time() + if cur_time - start_time > search_duration: + break + + pipeline, fps, _ = self.get_optimal_pipeline() + return pipeline, fps + + def iter_optimize_for_fps(self, pipeline): # Test for tee element presence if re.search("[^a-zA-Z]tee[^a-zA-Z]", pipeline): raise RuntimeError("Pipelines containing the tee element are currently not supported!") - initial_pipeline = pipeline pipeline = pipeline.split("!") - # Measure the performance of the original pipeline - try: - logger.info("Measuring performance of the original pipeline...") - initial_fps, initial_detections = sample_pipeline([pipeline], self._sample_duration) - except Exception as e: - logger.error("Pipeline failed to start, unable to measure fps: %s", e) - raise RuntimeError("Provided pipeline is not valid") from e - - logger.info("FPS: %.2f", initial_fps) - - # Replace elements with known better alternatives. - try: - preproc_pipeline = " ! ".join(pipeline) - preproc_pipeline = preprocess_pipeline(preproc_pipeline) - preproc_pipeline = preproc_pipeline.split(" ! ") - - if preproc_pipeline != pipeline: - logger.info("Measuring performance of the original pipeline after pre-processing optimizations...") - sample_pipeline([preproc_pipeline], self._sample_duration) - - pipeline = preproc_pipeline - except Exception: - logger.error("Pipeline pre-processing failed, using original pipeline instead") + # Run pre-optimization steps + self._establish_baseline(pipeline) + pipeline = self._run_preprocessing(pipeline) if self._enable_cross_stream_batching: pipeline = add_instance_ids(pipeline) - logger.info("Starting optimization process for FPS improvements...") - start_time = time.time() - (best_pipeline, best_fps) = self._optimize_pipeline(pipeline, initial_fps, initial_detections, start_time, 1) + # Perform optimization + logger.debug("Starting optimization process for FPS improvements...") + self._optimal_pipeline = pipeline.copy() + self._optimal_fps = self._initial_fps + for (pipeline, fps) in self._optimize_pipeline(pipeline, self._initial_fps, self._initial_detections, 1): + if fps > self._optimal_fps: + self._optimal_fps = fps + self._optimal_pipeline = pipeline - # Reconstruct the pipeline as a single string and return it. - _reconstructed_pipeline = "!".join(best_pipeline) + yield "!".join(pipeline), fps - # Display summary of the optimization process, showing the improvement over the original pipeline if any. - self._display_summary(_reconstructed_pipeline, best_fps, initial_pipeline, initial_fps) + def optimize_for_streams(self, pipeline, search_duration = DEFAULT_SEARCH_DURATION): + start_time = time.time() + for (_, _, _) in self.iter_optimize_for_streams(pipeline): + cur_time = time.time() + if cur_time - start_time > search_duration: + break - return _reconstructed_pipeline, best_fps + pipeline, fps, streams = self.get_optimal_pipeline() + return pipeline, fps, streams - def optimize_for_streams(self, initial_pipeline): + def iter_optimize_for_streams(self, initial_pipeline): # Test for tee element presence if re.search("[^a-zA-Z]tee[^a-zA-Z]", initial_pipeline): raise RuntimeError("Pipelines containing the tee element are currently not supported!") initial_pipeline = initial_pipeline.split("!") + # Run pre-optimization steps + self._establish_baseline(initial_pipeline) + initial_pipeline = self._run_preprocessing(initial_pipeline) + + initial_pipeline = add_instance_ids(initial_pipeline) + + # Perform optimization + start_time = time.time() + self._optimal_pipeline = initial_pipeline.copy() + self._optimal_fps = self._initial_fps + best_streams = 0 + for streams in range(1, 128): + for (pipeline, fps) in self._optimize_pipeline(initial_pipeline, self._initial_fps, self._initial_detections, streams): + if fps > self._multistream_fps_limit and (fps > self._optimal_fps or streams > self._optimal_streams): + logger.info(f"limit: {fps > self._multistream_fps_limit}") + logger.info(f"fps: {fps > self._optimal_fps}") + logger.info(f"streams: {streams > self._optimal_streams}") + self._optimal_fps = fps + self._optimal_pipeline = pipeline + self._optimal_streams = streams + + yield "!".join(pipeline), fps, streams + + + def _establish_baseline(self, pipeline): # Measure the performance of the original pipeline try: - logger.info("Measuring performance of the original pipeline...") - initial_fps, initial_detections = sample_pipeline([initial_pipeline], self._sample_duration) + logger.debug("Measuring performance of the original pipeline...") + self._initial_pipeline = pipeline.copy() + self._initial_fps, self._initial_detections = sample_pipeline([pipeline], self._sample_duration) + self._optimal_pipeline = [] + self._optimal_fps = 0 + self._optimal_streams = SINGLE_STREAM except Exception as e: logger.error("Pipeline failed to start, unable to measure fps: %s", e) raise RuntimeError("Provided pipeline is not valid") from e - logger.info("FPS: %.2f", initial_fps) + logger.debug("FPS: %.2f", self._initial_fps) + def _run_preprocessing(self, pipeline): # Replace elements with known better alternatives. try: - preproc_pipeline = " ! ".join(initial_pipeline) + preproc_pipeline = " ! ".join(pipeline) preproc_pipeline = preprocess_pipeline(preproc_pipeline) preproc_pipeline = preproc_pipeline.split(" ! ") - sample_pipeline([preproc_pipeline], self._sample_duration) - initial_pipeline = preproc_pipeline + if preproc_pipeline != pipeline: + logger.info("Measuring performance of the original pipeline after pre-processing optimizations...") + sample_pipeline([preproc_pipeline], self._sample_duration) + + return preproc_pipeline + except Exception: logger.error("Pipeline pre-processing failed, using original pipeline instead") + + return pipeline - initial_pipeline = add_instance_ids(initial_pipeline) - - start_time = time.time() + def _optimize_pipeline(self, initial_pipeline, initial_fps, initial_detections, streams): best_pipeline = initial_pipeline - best_fps = 0 - best_streams = 0 - for streams in range(1, 65): - cur_time = time.time() - if cur_time - start_time > self._search_duration: - break - - pipeline, fps = self._optimize_pipeline(initial_pipeline, initial_fps, initial_detections, start_time, streams) - if fps > self._multistream_fps_limit: - best_fps = fps - best_pipeline = pipeline - best_streams = streams - else: - break - - return "!".join(best_pipeline), best_fps, best_streams - - def _optimize_pipeline(self, starting_pipeline, starting_fps, starting_detections, start_time, streams): - best_pipeline = starting_pipeline - best_fps = starting_fps + best_fps = initial_fps for generator in self._generators.values(): generator.init_pipeline(best_pipeline) for pipeline in generator: - cur_time = time.time() - if cur_time - start_time > self._search_duration: - break - - pipelines = [] - for _ in range(0, streams): - pipelines.append(pipeline) - try: + pipelines = [] + for _ in range(0, streams): + pipelines.append(pipeline) + fps, detections = sample_pipeline(pipelines, self._sample_duration) - if starting_detections == 0: + if initial_detections == 0: # skip only if we still have zero detections if detections == 0: logger.debug("Pipeline reporting detections under error margin, skipping") continue - elif detections / starting_detections < self._detections_error_threshold: + elif detections / initial_detections < self._detections_error_threshold: logger.debug("Pipeline reporting detections under error margin, skipping") continue @@ -198,25 +231,11 @@ def _optimize_pipeline(self, starting_pipeline, starting_fps, starting_detection best_fps = fps best_pipeline = pipeline + yield pipeline, fps + except Exception as e: logger.debug("Pipeline failed to start: %s", e) - return best_pipeline, best_fps - - def _display_summary(self, best_pipeline, best_fps, initial_pipeline, initial_fps): - logger.info("=============================== SUMMARY ==============================") - if best_fps > initial_fps: - logger.info("Optimized pipeline found with %.2f fps improvement over the original pipeline.", best_fps - initial_fps) - logger.info("Original pipeline FPS: %.2f", initial_fps) - logger.info("Optimized pipeline: %s", str(best_pipeline)) - logger.info("Optimized pipeline FPS: %.2f", best_fps) - else: - logger.info("No optimized pipeline found that outperforms the original pipeline.") - logger.info("Original pipeline: %s", str(initial_pipeline)) - logger.info("Original pipeline FPS: %.2f", initial_fps) - logger.info("======================================================================") - - ##################################### Pipeline Running ############################################ def sample_pipeline(pipelines, sample_duration): @@ -241,7 +260,7 @@ def sample_pipeline(pipelines, sample_duration): pipeline = Gst.parse_launch(pipeline) - logger.info("Sampling for %s seconds...", str(sample_duration)) + logger.debug("Sampling for %s seconds...", str(sample_duration)) fps_counter = next(filter(lambda element: "gvafpscounter" in element.name, reversed(pipeline.children))) # pylint: disable=line-too-long bus = pipeline.get_bus() diff --git a/tests/unit_tests/tests_gstgva/test_pipeline_optimizer.py b/tests/unit_tests/tests_gstgva/test_pipeline_optimizer.py index c39aa6bea..4add9afad 100644 --- a/tests/unit_tests/tests_gstgva/test_pipeline_optimizer.py +++ b/tests/unit_tests/tests_gstgva/test_pipeline_optimizer.py @@ -14,8 +14,7 @@ def test_optimizer_works(self): model_path = get_model_path("yolo11s") pipeline = f"urisourcebin buffer-size=4096 uri=https://videos.pexels.com/video-files/1192116/1192116-sd_640_360_30fps.mp4 ! decodebin ! gvadetect model={model_path} ! queue ! gvawatermark ! vah264enc ! h264parse ! mp4mux ! fakesink" optimizer = DLSOptimizer() - optimizer.set_search_duration(100) - optimized_pipeline, fps = optimizer.optimize_for_fps(pipeline) + optimized_pipeline, fps = optimizer.optimize_for_fps(pipeline, 100) self.assertIsNotNone(optimized_pipeline, "Optimizer did not return optimized pipeline") self.assertIsNotNone(fps, "Optimizer did not return FPS value")