Skip to content

Commit a7402eb

Browse files
added ramp down feature (#1012)
Signed-off-by: Rishabh Singh <sngri@amazon.com> (cherry picked from commit 92982c5) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent b946424 commit a7402eb

File tree

5 files changed

+366
-10
lines changed

5 files changed

+366
-10
lines changed

osbenchmark/worker_coordinator/worker_coordinator.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2813,11 +2813,17 @@ def schedule_for(task_allocation, parameter_source):
28132813

28142814
if requires_time_period_schedule(task, runner_for_op, params_for_op):
28152815
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
2816+
ramp_down_time_period = task.ramp_down_time_period if task.ramp_down_time_period else 0
28162817
if client_index == 0:
28172818
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
28182819
"seconds and a time period of [%s] seconds.", task.schedule, task.name,
28192820
str(warmup_time_period), str(task.time_period))
2820-
loop_control = TimePeriodBased(warmup_time_period, task.time_period)
2821+
loop_control = TimePeriodBased(warmup_time_period, task.time_period, ramp_down_time_period,
2822+
client_index, task.clients)
2823+
# Log individual client duration if ramp-down is enabled
2824+
if ramp_down_time_period > 0 and client_index == 0:
2825+
logger.info("Ramp-down enabled: clients will stop in reverse order over [%s] seconds",
2826+
str(ramp_down_time_period))
28212827
else:
28222828
warmup_iterations = task.warmup_iterations if task.warmup_iterations else 0
28232829
if task.iterations:
@@ -2927,13 +2933,32 @@ async def __call__(self):
29272933

29282934

29292935
class TimePeriodBased:
2930-
def __init__(self, warmup_time_period, time_period):
2936+
def __init__(self, warmup_time_period, time_period, ramp_down_time_period=None,
2937+
client_index=None, total_clients=None):
29312938
self._warmup_time_period = warmup_time_period
29322939
self._time_period = time_period
2940+
self._ramp_down_time_period = ramp_down_time_period or 0
2941+
self._client_index = client_index
2942+
self._total_clients = total_clients
2943+
self.logger = logging.getLogger(__name__)
2944+
29332945
if warmup_time_period is not None and time_period is not None:
2934-
self._duration = self._warmup_time_period + self._time_period
2946+
self._base_duration = self._warmup_time_period + self._time_period
2947+
2948+
# Calculate how early this client should stop during ramp-down
2949+
# Clients stop in REVERSE order: Client 0 stops first, Client (N-1) stops last
2950+
if self._ramp_down_time_period > 0 and client_index is not None and total_clients is not None:
2951+
reverse_index = (total_clients - 1) - client_index
2952+
client_early_stop = self._ramp_down_time_period * (reverse_index / total_clients)
2953+
self._duration = self._base_duration - client_early_stop
2954+
self.logger.info("Client [%d/%d] will run for %.2f seconds (base: %.2f, early stop: %.2f due to ramp-down)",
2955+
client_index, total_clients, self._duration, self._base_duration, client_early_stop)
2956+
else:
2957+
self._duration = self._base_duration
29352958
else:
29362959
self._duration = None
2960+
self._base_duration = None
2961+
29372962
self._start = None
29382963
self._now = None
29392964

osbenchmark/workload/loader.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,14 +1809,16 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
18091809
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
18101810
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
18111811
default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False)
1812+
default_ramp_down_time_period = self._r(ops_spec, "ramp-down-time-period", error_ctx="parallel", mandatory=False)
18121813
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
18131814
completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False)
18141815

18151816
# now descent to each operation
18161817
tasks = []
18171818
for task in self._r(ops_spec, "tasks", error_ctx="parallel"):
18181819
tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations,
1819-
default_warmup_time_period, default_time_period, default_ramp_up_time_period, completed_by))
1820+
default_warmup_time_period, default_time_period, default_ramp_up_time_period,
1821+
default_ramp_down_time_period, completed_by))
18201822

18211823
for task in tasks:
18221824
if task.ramp_up_time_period != default_ramp_up_time_period:
@@ -1826,6 +1828,13 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
18261828
else:
18271829
self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing "
18281830
f"'parallel' element in test-procedure '{test_procedure_name}'.")
1831+
if task.ramp_down_time_period != default_ramp_down_time_period:
1832+
if default_ramp_down_time_period is None:
1833+
self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies "
1834+
f"a ramp-down-time-period but it is only allowed on the 'parallel' element.")
1835+
else:
1836+
self._error(f"task '{task.name}' specifies a different ramp-down-time-period than its enclosing "
1837+
f"'parallel' element in test-procedure '{test_procedure_name}'.")
18291838
if completed_by:
18301839
completion_task = None
18311840
for task in tasks:
@@ -1841,7 +1850,7 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
18411850
return workload.Parallel(tasks, clients)
18421851

18431852
def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None,
1844-
default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None,
1853+
default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None, default_ramp_down_time_period=None,
18451854
completed_by_name=None):
18461855

18471856
op_spec = task_spec["operation"]
@@ -1867,6 +1876,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
18671876
default_value=default_time_period),
18681877
ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name,
18691878
mandatory=False, default_value=default_ramp_up_time_period),
1879+
ramp_down_time_period=self._r(task_spec, "ramp-down-time-period", error_ctx=op.name,
1880+
mandatory=False, default_value=default_ramp_down_time_period),
18701881
clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1),
18711882
completes_parent=(task_name == completed_by_name),
18721883
schedule=schedule,
@@ -1894,6 +1905,19 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
18941905
self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is "
18951906
f"{task.warmup_time_period} seconds but must be greater than or equal to the "
18961907
f"ramp-up-time-period of {task.ramp_up_time_period} seconds.")
1908+
if task.ramp_down_time_period is not None:
1909+
if task.time_period is None:
1910+
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-down time period of "
1911+
f"{task.ramp_down_time_period} seconds but no time-period.")
1912+
elif task.time_period < task.ramp_down_time_period:
1913+
self._error(f"The time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is "
1914+
f"{task.time_period} seconds but must be greater than or equal to the "
1915+
f"ramp-down-time-period of {task.ramp_down_time_period} seconds.")
1916+
1917+
if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_down_time_period is not None:
1918+
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-down time period of "
1919+
f"{task.ramp_down_time_period} seconds as well as {task.warmup_iterations} warmup iterations and "
1920+
f"{task.iterations} iterations but mixing time periods and iterations is not allowed.")
18971921

18981922
return task
18991923

osbenchmark/workload/workload.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,8 @@ class Task:
925925
IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]
926926

927927
def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None,
928-
warmup_time_period=None, time_period=None, ramp_up_time_period=None, clients=1, completes_parent=False,
928+
warmup_time_period=None, time_period=None, ramp_up_time_period=None, ramp_down_time_period=None,
929+
clients=1, completes_parent=False,
929930
schedule=None, params=None):
930931
self.name = name
931932
self.operation = operation
@@ -941,6 +942,7 @@ def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations
941942
self.warmup_time_period = warmup_time_period
942943
self.time_period = time_period
943944
self.ramp_up_time_period = ramp_up_time_period
945+
self.ramp_down_time_period = ramp_down_time_period
944946
self.clients = clients
945947
self.completes_parent = completes_parent
946948
self.schedule = schedule
@@ -1022,16 +1024,17 @@ def __hash__(self):
10221024
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
10231025
return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \
10241026
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ \
1025-
hash(self.clients) ^ hash(self.schedule) ^ hash(self.completes_parent)
1027+
hash(self.ramp_down_time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ hash(self.completes_parent)
10261028

10271029
def __eq__(self, other):
10281030
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
10291031
return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations,
10301032
self.warmup_time_period, self.time_period, self.ramp_up_time_period,
1031-
self.clients, self.schedule,self.completes_parent) == (other.name,
1032-
other.operation, other.warmup_iterations,
1033+
self.ramp_down_time_period, self.clients, self.schedule,
1034+
self.completes_parent) == (other.name, other.operation, other.warmup_iterations,
10331035
other.iterations, other.warmup_time_period, other.time_period,
1034-
self.ramp_up_time_period, other.clients, other.schedule,
1036+
other.ramp_up_time_period, other.ramp_down_time_period,
1037+
other.clients, other.schedule,
10351038
other.completes_parent)
10361039

10371040
def __iter__(self):

tests/worker_coordinator/worker_coordinator_test.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,3 +2311,166 @@ def test_check_cpu_usage_drops_error_when_queue_full(self):
23112311
# Should not raise
23122312
self.actor._check_cpu_usage() # pylint: disable=protected-access
23132313
assert full_queue.qsize() == 1 # Still full; error dropped
2314+
2315+
class TimePeriodBasedTests(TestCase):
2316+
# pylint: disable=protected-access
2317+
def test_time_period_based_without_ramp_down(self):
2318+
# Test basic time-period based schedule without ramp-down
2319+
loop_control = worker_coordinator.TimePeriodBased(
2320+
warmup_time_period=10,
2321+
time_period=100,
2322+
ramp_down_time_period=None,
2323+
client_index=0,
2324+
total_clients=4
2325+
)
2326+
2327+
# Verify duration calculation
2328+
self.assertEqual(110, loop_control._duration)
2329+
self.assertEqual(110, loop_control._base_duration)
2330+
self.assertEqual(10, loop_control._warmup_time_period)
2331+
self.assertEqual(100, loop_control._time_period)
2332+
self.assertFalse(loop_control.infinite)
2333+
2334+
def test_time_period_based_with_ramp_down_client_0(self):
2335+
# Test ramp-down for client 0 (first client, stops earliest)
2336+
loop_control = worker_coordinator.TimePeriodBased(
2337+
warmup_time_period=10,
2338+
time_period=100,
2339+
ramp_down_time_period=20,
2340+
client_index=0,
2341+
total_clients=4
2342+
)
2343+
2344+
# Client 0: reverse_index = 3, early_stop = 20 * (3/4) = 15
2345+
# duration = 110 - 15 = 95
2346+
self.assertEqual(110, loop_control._base_duration)
2347+
self.assertEqual(95, loop_control._duration)
2348+
self.assertEqual(20, loop_control._ramp_down_time_period)
2349+
2350+
def test_time_period_based_with_ramp_down_client_3(self):
2351+
# Test ramp-down for client 3 (last client, runs full duration)
2352+
loop_control = worker_coordinator.TimePeriodBased(
2353+
warmup_time_period=10,
2354+
time_period=100,
2355+
ramp_down_time_period=20,
2356+
client_index=3,
2357+
total_clients=4
2358+
)
2359+
2360+
# Client 3: reverse_index = 0, early_stop = 20 * (0/4) = 0
2361+
# duration = 110 - 0 = 110
2362+
self.assertEqual(110, loop_control._base_duration)
2363+
self.assertEqual(110, loop_control._duration)
2364+
2365+
def test_time_period_based_with_ramp_down_all_clients(self):
2366+
# Test that clients stop in reverse order with correct spacing
2367+
warmup = 10
2368+
time_period = 100
2369+
ramp_down = 20
2370+
total_clients = 4
2371+
2372+
durations = []
2373+
for client_index in range(total_clients):
2374+
loop_control = worker_coordinator.TimePeriodBased(
2375+
warmup_time_period=warmup,
2376+
time_period=time_period,
2377+
ramp_down_time_period=ramp_down,
2378+
client_index=client_index,
2379+
total_clients=total_clients
2380+
)
2381+
durations.append(loop_control._duration)
2382+
2383+
# Expected: [95, 100, 105, 110] - clients stop 5s apart
2384+
self.assertEqual([95, 100, 105, 110], durations)
2385+
2386+
# Verify spacing is correct
2387+
for i in range(1, len(durations)):
2388+
spacing = durations[i] - durations[i-1]
2389+
expected_spacing = ramp_down / total_clients
2390+
self.assertAlmostEqual(expected_spacing, spacing, places=2)
2391+
2392+
class TaskRampDownTests(TestCase):
2393+
def test_task_with_ramp_down_time_period(self):
2394+
# Test that Task accepts ramp_down_time_period parameter
2395+
op = workload.Operation("test-op", workload.OperationType.Bulk.to_hyphenated_string(), {})
2396+
task = workload.Task(
2397+
name="test-task",
2398+
operation=op,
2399+
warmup_time_period=10,
2400+
time_period=100,
2401+
ramp_up_time_period=20,
2402+
ramp_down_time_period=30,
2403+
clients=4
2404+
)
2405+
2406+
self.assertEqual(10, task.warmup_time_period)
2407+
self.assertEqual(100, task.time_period)
2408+
self.assertEqual(20, task.ramp_up_time_period)
2409+
self.assertEqual(30, task.ramp_down_time_period)
2410+
self.assertEqual(4, task.clients)
2411+
2412+
def test_task_without_ramp_down_defaults_to_none(self):
2413+
# Test that ramp_down_time_period defaults to None
2414+
op = workload.Operation("test-op", workload.OperationType.Bulk.to_hyphenated_string(), {})
2415+
task = workload.Task(
2416+
name="test-task",
2417+
operation=op,
2418+
time_period=100,
2419+
clients=4
2420+
)
2421+
2422+
self.assertIsNone(task.ramp_down_time_period)
2423+
2424+
def test_task_equality_with_ramp_down(self):
2425+
# Test that tasks with different ramp_down_time_period are not equal
2426+
op = workload.Operation("test-op", workload.OperationType.Bulk.to_hyphenated_string(), {})
2427+
2428+
task1 = workload.Task(
2429+
name="test-task",
2430+
operation=op,
2431+
time_period=100,
2432+
ramp_down_time_period=20,
2433+
clients=4
2434+
)
2435+
2436+
task2 = workload.Task(
2437+
name="test-task",
2438+
operation=op,
2439+
time_period=100,
2440+
ramp_down_time_period=30,
2441+
clients=4
2442+
)
2443+
2444+
task3 = workload.Task(
2445+
name="test-task",
2446+
operation=op,
2447+
time_period=100,
2448+
ramp_down_time_period=20,
2449+
clients=4
2450+
)
2451+
2452+
self.assertNotEqual(task1, task2)
2453+
self.assertEqual(task1, task3)
2454+
2455+
def test_task_hash_includes_ramp_down(self):
2456+
# Test that hash includes ramp_down_time_period
2457+
op = workload.Operation("test-op", workload.OperationType.Bulk.to_hyphenated_string(), {})
2458+
2459+
task1 = workload.Task(
2460+
name="test-task",
2461+
operation=op,
2462+
time_period=100,
2463+
ramp_down_time_period=20,
2464+
clients=4
2465+
)
2466+
2467+
task2 = workload.Task(
2468+
name="test-task",
2469+
operation=op,
2470+
time_period=100,
2471+
ramp_down_time_period=30,
2472+
clients=4
2473+
)
2474+
2475+
# Different ramp_down should produce different hashes
2476+
self.assertNotEqual(hash(task1), hash(task2))

0 commit comments

Comments
 (0)