Skip to content

Commit 2576348

Browse files
oommensydnoliver
andauthored
Improve Stream Density Analysis Benchmarking (#177)
* Use Intel Retail Stream Density Benchmark logic * Add tests for multiple benchmarking cases See https://github.com/intel-retail/performance-tools/blob/main/benchmark-scripts/stream_density.sh Fixes ITEP-66536 Co-authored-by: Nicolas Oliver <dario.n.oliver@intel.com>
1 parent 1d8f2dc commit 2576348

2 files changed

Lines changed: 274 additions & 77 deletions

File tree

Lines changed: 88 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1-
import time
1+
"""benchmark.py
2+
3+
This module provides the Benchmark class for evaluating pipeline performance
4+
based on configurable parameters and stream counts.
5+
"""
26
from typing import List, Dict, Tuple
7+
import math
38
import logging
49
from utils import run_pipeline_and_extract_metrics
5-
import math
610

711
logging.basicConfig(level=logging.INFO)
812

913

1014
class Benchmark:
15+
16+
"""Benchmarking class for pipeline evaluation."""
17+
1118
def __init__(
1219
self,
1320
video_path: str,
@@ -16,101 +23,105 @@ def __init__(
1623
rate: int,
1724
parameters: Dict[str, str],
1825
constants: Dict[str, str],
19-
elements: List[tuple[str, str, str]] = [],
26+
elements: List[tuple[str, str, str]] = None,
2027
):
2128
self.video_path = video_path
2229
self.pipeline_cls = pipeline_cls
2330
self.fps_floor = fps_floor
2431
self.rate = rate
2532
self.parameters = parameters
2633
self.constants = constants
27-
self.elements = elements
34+
self.elements = elements if elements is not None else []
2835
self.best_result = None
2936
self.results = []
3037

3138
self.logger = logging.getLogger("Benchmark")
3239

40+
def _run_pipeline_and_extract_metrics(
41+
self,
42+
pipeline_cls,
43+
constants: Dict[str, str],
44+
parameters: Dict[str, str],
45+
channels: Tuple[int, int],
46+
elements: List[tuple[str, str, str]],
47+
) -> List[Dict[str, float]]:
48+
"""Run the pipeline and extract metrics."""
49+
return run_pipeline_and_extract_metrics(
50+
pipeline_cls,
51+
constants=constants,
52+
parameters=parameters,
53+
channels=channels,
54+
elements=elements,
55+
)
56+
3357
def run(self) -> Tuple[int, int, int, float]:
34-
start_time = time.time()
35-
streams = 1
36-
last_good_config = (0, 0, 0, 0.0)
58+
"""Run the benchmark and return the best configuration."""
59+
n_streams = 1
60+
increments = 1
61+
incrementing = True
62+
best_config = (0, 0, 0, 0.0)
3763

38-
# Phase 1: Exponential Expansion
3964
while True:
40-
if time.time() - start_time > 300:
41-
self.logger.info("Time limit reached during exponential phase")
42-
break
43-
44-
ai_streams = math.ceil(streams * (self.rate/100))
45-
non_ai_streams = streams - ai_streams
46-
results = run_pipeline_and_extract_metrics(
47-
self.pipeline_cls,
48-
constants=self.constants,
49-
parameters=self.parameters,
50-
channels=(non_ai_streams, ai_streams),
51-
elements=self.elements,
52-
)
53-
result = results[0]
65+
ai_streams = math.ceil(n_streams * (self.rate / 100))
66+
non_ai_streams = n_streams - ai_streams
5467

5568
try:
56-
raw_fps_value = result["per_stream_fps"]
57-
per_stream_fps = float(raw_fps_value)
58-
if per_stream_fps >= self.fps_floor:
59-
last_good_config = (
60-
result["num_streams"],
61-
ai_streams,
62-
non_ai_streams,
63-
per_stream_fps,
64-
)
65-
streams *= 2
66-
else:
67-
failed_streams = streams
68-
break
69-
except (ValueError, TypeError):
70-
self.logger.info(
71-
"Invalid FPS value, skipping this result:", per_stream_fps
72-
)
73-
failed_streams = streams
74-
break
75-
76-
# Phase 2: Binary Search
77-
low = last_good_config[0] + 1
78-
high = failed_streams - 1
79-
best_config = last_good_config
80-
81-
while low <= high:
82-
if time.time() - start_time > 300:
83-
self.logger.info("Time limit reached during Binary phase.")
84-
break
85-
mid = (low + high) // 2
86-
ai_streams = math.ceil(mid * (self.rate/100))
87-
non_ai_streams = mid - ai_streams
88-
89-
results = run_pipeline_and_extract_metrics(
90-
self.pipeline_cls,
91-
constants=self.constants,
92-
parameters=self.parameters,
93-
channels=(non_ai_streams, ai_streams),
94-
elements=self.elements,
95-
)
96-
97-
if not results:
98-
self.logger.info(
99-
"No results returned from run_pipeline_and_extract_metrics"
69+
results = self._run_pipeline_and_extract_metrics(
70+
self.pipeline_cls,
71+
constants=self.constants,
72+
parameters=self.parameters,
73+
channels=(non_ai_streams, ai_streams),
74+
elements=self.elements,
10075
)
101-
break
76+
except StopIteration:
77+
return (0, 0, 0, 0.0)
10278

79+
if not results or results[0] is None or not isinstance(results[0], dict):
80+
return (0, 0, 0, 0.0)
81+
if results[0].get("exit_code") != 0:
82+
return (0, 0, 0, 0.0)
83+
10384
result = results[0]
85+
try:
86+
total_fps = float(result["total_fps"])
87+
per_stream_fps = total_fps / n_streams if n_streams > 0 else 0.0
88+
except (ValueError, TypeError, ZeroDivisionError):
89+
return (0, 0, 0, 0.0)
90+
if total_fps == 0 or math.isnan(per_stream_fps):
91+
return (0,0,0,0.0)
92+
93+
self.logger.info(
94+
"n_streams=%d, total_fps=%f, per_stream_fps=%f, increments=%d, incrementing=%s",
95+
n_streams, total_fps, per_stream_fps, increments, incrementing
96+
)
10497

105-
per_stream_fps = float(result["per_stream_fps"])
106-
if (
107-
isinstance(per_stream_fps, (int, float))
108-
and per_stream_fps >= self.fps_floor
109-
):
110-
if result["num_streams"] > best_config[0]:
111-
best_config = (mid, ai_streams, non_ai_streams, per_stream_fps)
112-
low = mid + 1
98+
if incrementing:
99+
if per_stream_fps >= self.fps_floor:
100+
increments = int(per_stream_fps / self.fps_floor)
101+
self.logger.info(
102+
"n_streams=%d, total_fps=%f, per_stream_fps=%f, increments=%d, incrementing=%s",
103+
n_streams, total_fps, per_stream_fps, increments, incrementing
104+
)
105+
if increments <= 1:
106+
increments = 5
107+
else:
108+
incrementing = False
109+
increments = -1
113110
else:
114-
high = mid - 1
111+
if per_stream_fps >= self.fps_floor:
112+
best_config = (n_streams, ai_streams, non_ai_streams, per_stream_fps)
113+
break # Success
114+
else:
115+
if n_streams <= 1:
116+
self.logger.info("Failed to find a valid configuration.")
117+
break # Fail
118+
119+
n_streams += increments
120+
if n_streams <= 0:
121+
n_streams = 1 # Prevent N from going below 1
115122

116-
return best_config
123+
return (
124+
best_config
125+
if best_config[0] > 0
126+
else (n_streams, ai_streams, non_ai_streams, per_stream_fps)
127+
)
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import unittest
2+
from unittest.mock import patch
3+
import sys
4+
from pathlib import Path
5+
sys.path.append(str(Path(__file__).resolve().parent.parent))
6+
from benchmark import (
7+
Benchmark,
8+
)
9+
10+
from pipeline import SmartNVRPipeline
11+
12+
13+
class TestBenchmark(unittest.TestCase):
14+
def setUp(self):
15+
self.video_path = "test_video.mp4"
16+
self.pipeline_cls = SmartNVRPipeline
17+
self.fps_floor = 30.0
18+
self.rate = 50
19+
self.parameters = {"object_detection_device": "cpu"}
20+
self.constants = {"const1": "value1"}
21+
self.elements = [("element1", "type1", "name1")]
22+
self.benchmark = Benchmark(
23+
video_path=self.video_path,
24+
pipeline_cls=self.pipeline_cls,
25+
fps_floor=self.fps_floor,
26+
rate=self.rate,
27+
parameters=self.parameters,
28+
constants=self.constants,
29+
elements=self.elements,
30+
)
31+
def test_run_successful_scaling(self):
32+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
33+
mock_run.side_effect = [
34+
# First call with 1 stream
35+
[
36+
{
37+
"params": {},
38+
"exit_code": 0,
39+
"total_fps": 30,
40+
"per_stream_fps": 30,
41+
"num_streams": 1,
42+
}
43+
],
44+
# Second call with 2 streams
45+
[
46+
{
47+
"params": {},
48+
"exit_code": 0,
49+
"total_fps": 168,
50+
"per_stream_fps": 28,
51+
"num_streams": 6,
52+
}
53+
],
54+
# Third call with 3 streams
55+
[
56+
{
57+
"params": {},
58+
"exit_code": 0,
59+
"total_fps": 155,
60+
"per_stream_fps": 31,
61+
"num_streams": 5,
62+
}
63+
],
64+
[]
65+
]
66+
result = self.benchmark.run()
67+
self.assertEqual(result, (5, 3, 2, 31))
68+
69+
def test_zero_total_fps(self):
70+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
71+
mock_run.side_effect = [
72+
# First call with 1 stream
73+
[
74+
{
75+
"params": {},
76+
"exit_code": 0,
77+
"total_fps": 0,
78+
"per_stream_fps": 30,
79+
"num_streams": 1,
80+
}
81+
],
82+
[]
83+
]
84+
result = self.benchmark.run()
85+
self.assertEqual(result, (0, 0, 0, 0.0))
86+
87+
def test_invalid_fps(self):
88+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
89+
mock_run.side_effect = [
90+
# First call with 1 stream
91+
[
92+
{
93+
"params": {},
94+
"exit_code": 0,
95+
"total_fps": 0,
96+
"per_stream_fps": "NaN",
97+
"num_streams": 1,
98+
}
99+
],
100+
[]
101+
]
102+
result = self.benchmark.run()
103+
self.assertEqual(result, (0, 0, 0, 0.0))
104+
105+
def test_decrementing_below_one(self):
106+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
107+
mock_run.side_effect = [
108+
# First call with 1 stream
109+
[
110+
{
111+
"params": {},
112+
"exit_code": 0,
113+
"total_fps": 60,
114+
"per_stream_fps": 60,
115+
"num_streams": 1,
116+
}
117+
],
118+
# Second call with 2 streams
119+
[
120+
{
121+
"params": {},
122+
"exit_code": 0,
123+
"total_fps": 10,
124+
"per_stream_fps": 2,
125+
"num_streams": 6,
126+
}
127+
],
128+
# Third call with 3 streams
129+
[
130+
{
131+
"params": {},
132+
"exit_code": 0,
133+
"total_fps": 8,
134+
"per_stream_fps": 2,
135+
"num_streams": 5,
136+
}
137+
],
138+
[]
139+
]
140+
result = self.benchmark.run()
141+
self.assertEqual(result, (0, 0, 0, 0.0))
142+
143+
def test_pipeline_crash(self):
144+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
145+
mock_run.side_effect = [
146+
# First call with 1 stream
147+
[
148+
{
149+
"params": {},
150+
"exit_code": 0,
151+
"total_fps": 30,
152+
"per_stream_fps": 30,
153+
"num_streams": 1,
154+
}
155+
],
156+
[]
157+
]
158+
result = self.benchmark.run()
159+
self.assertEqual(result, (0, 0, 0, 0.0))
160+
161+
def test_pipeline_returns_none(self):
162+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
163+
mock_run.side_effect = [
164+
[None]
165+
]
166+
result = self.benchmark.run()
167+
self.assertEqual(result, (0, 0, 0, 0.0))
168+
169+
def test_pipeline_low_fps(self):
170+
with patch.object(Benchmark, "_run_pipeline_and_extract_metrics") as mock_run:
171+
mock_run.side_effect = [
172+
[
173+
{
174+
"params": {},
175+
"exit_code": 0,
176+
"total_fps": 8,
177+
"per_stream_fps": 8,
178+
"num_streams": 1,
179+
}
180+
]
181+
]
182+
result = self.benchmark.run()
183+
self.assertEqual(result, (0, 0, 0, 0.0))
184+
185+
if __name__ == "__main__":
186+
unittest.main()

0 commit comments

Comments
 (0)