Skip to content

Add load-test-qps parameter for dynamic client ramp up #739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,11 @@ def add_workload_source(subparser):
action="store_true",
help="Stop executing tests if an error occurs in one of the test iterations (default: false).",
)
test_execution_parser.add_argument(
"--load-test-qps",
help="Run a load test on your cluster, up to a certain QPS value (default: 0)",
default=0
)

###############################################################################
#
Expand Down Expand Up @@ -920,6 +925,7 @@ def configure_test(arg_parser, args, cfg):
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "load.test.clients", int(args.load_test_qps))
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
Expand Down
12 changes: 10 additions & 2 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,15 @@ def start_benchmark(self):
self.logger.info("Attaching cluster-level telemetry devices.")
self.telemetry.on_benchmark_start()
self.logger.info("Cluster-level telemetry devices are now attached.")

# if load testing is enabled, modify the client + throughput number for the task(s)
# target throughput + clients will then be equal to the qps passed in through --load-test
load_test_clients = self.config.opts("workload", "load.test.clients", mandatory=False)
if load_test_clients:
for task in self.test_procedure.schedule:
for subtask in task:
subtask.clients = load_test_clients
subtask.params["target-throughput"] = load_test_clients
self.logger.info("Load test mode enabled - set client count to %d", load_test_clients)
allocator = Allocator(self.test_procedure.schedule)
self.allocations = allocator.allocations
self.number_of_steps = len(allocator.join_points) - 1
Expand Down Expand Up @@ -1634,7 +1642,7 @@ async def __call__(self, *args, **kwargs):
await asyncio.sleep(rampup_wait_time)

if rampup_wait_time:
console.println(f" Client id {self.client_id} is running now.")
self.logger.info("Client id [%s] is running now.", self.client_id)

self.logger.debug("Entering main loop for client id [%s].", self.client_id)
# noinspection PyBroadException
Expand Down
18 changes: 18 additions & 0 deletions tests/worker_coordinator/worker_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,24 @@ def test_client_reaches_join_point_which_completes_parent(self):
self.assertEqual(1, target.on_task_finished.call_count)
self.assertEqual(4, target.drive_at.call_count)

@run_async
async def test_load_test_clients_override(self):
self.cfg.add(config.Scope.applicationOverride, "workload", "load.test.clients", 100)

task = self.workload.find_test_procedure_or_default("default").schedule[0]
original_clients = task.clients

d = worker_coordinator.WorkerCoordinator(self.create_test_worker_coordinator_target(), self.cfg,
os_client_factory_class=WorkerCoordinatorTests.StaticClientFactory)

d.prepare_benchmark(t=self.workload)
d.start_benchmark()

# verify the task is modified with the load test client count
self.assertEqual(original_clients, 4)
self.assertEqual(task.clients, 100)
self.assertEqual(task.params["target-throughput"], 100)


def op(name, operation_type):
return workload.Operation(name, operation_type, param_source="worker-coordinator-test-param-source")
Expand Down
Loading