Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ indent-after-paren=4
indent-string=' '

# Maximum number of characters on a single line.
max-line-length=140
max-line-length=180

# Maximum number of lines in a module.
max-module-lines=1000
Expand Down
15 changes: 15 additions & 0 deletions osbenchmark/resources/workload-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"type": "integer",
"minimum": 1
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -75,6 +80,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -146,6 +156,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down
80 changes: 58 additions & 22 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ def os_clients(all_hosts, all_client_options):
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
schedule = schedule_for(task_allocation, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error), self.cfg)
Expand Down Expand Up @@ -1607,6 +1607,15 @@ async def __call__(self, *args, **kwargs):
# lazily initialize the schedule
self.logger.debug("Initializing schedule for client id [%s].", self.client_id)
schedule = self.schedule_handle()
self.schedule_handle.start()
rampup_wait_time = self.schedule_handle.ramp_up_wait_time
if rampup_wait_time:
self.logger.info("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time)
await asyncio.sleep(rampup_wait_time)

if rampup_wait_time:
console.println(f" Client id {self.client_id} is running now.")

self.logger.debug("Entering main loop for client id [%s].", self.client_id)
# noinspection PyBroadException
try:
Expand Down Expand Up @@ -1806,18 +1815,28 @@ def __repr__(self, *args, **kwargs):


class TaskAllocation:
def __init__(self, task, client_index_in_task):
def __init__(self, task, client_index_in_task, global_client_index, total_clients):
"""
:param task: The current task which is always a leaf task.
:param client_index_in_task: The task-specific index for the allocated client.
:param global_client_index: The globally unique index for the allocated client across
all concurrently executed tasks.
:param total_clients: The total number of clients executing tasks concurrently.
"""
self.task = task
self.client_index_in_task = client_index_in_task
self.global_client_index = global_client_index
self.total_clients = total_clients

def __hash__(self):
return hash(self.task) ^ hash(self.client_index_in_task)
return hash(self.task) ^ hash(self.global_client_index)

def __eq__(self, other):
return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task
return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index

def __repr__(self, *args, **kwargs):
return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task)
return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \
f"and [{self.global_client_index}/{self.total_clients}] in total"


class Allocator:
Expand Down Expand Up @@ -1858,12 +1877,16 @@ def allocations(self):
clients_executing_completing_task = []
for sub_task in task:
for client_index in range(start_client_index, start_client_index + sub_task.clients):
# this is the actual client that will execute the task. It may differ from the logical one in case we over-commit (i.e.
# more tasks than actually available clients)
physical_client_index = client_index % max_clients
if sub_task.completes_parent:
clients_executing_completing_task.append(physical_client_index)
allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index))
ta = TaskAllocation(task = sub_task,
client_index_in_task = client_index - start_client_index,
global_client_index=client_index,
# if task represents a parallel structure this is the total number of clients
# executing sub-tasks concurrently.
total_clients=task.clients)
allocations[physical_client_index].append(ta)
start_client_index += sub_task.clients

# uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them
Expand Down Expand Up @@ -1941,7 +1964,7 @@ def clients(self):

# Runs a concrete schedule on one worker client
# Needs to determine the runners and concrete iterations per client.
def schedule_for(task, client_index, parameter_source):
def schedule_for(task_allocation, parameter_source):
"""
Calculates a client's schedule for a given task.

Expand All @@ -1951,15 +1974,17 @@ def schedule_for(task, client_index, parameter_source):
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
task = task_allocation.task
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task)

client_index = task_allocation.client_index_in_task
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)
params_for_op = parameter_source.partition(client_index, task.clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
Expand Down Expand Up @@ -1992,7 +2017,7 @@ def schedule_for(task, client_index, parameter_source):
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)
return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, task_runner, params):
Expand All @@ -2009,27 +2034,40 @@ def requires_time_period_schedule(task, task_runner, params):


class ScheduleHandle:
def __init__(self, task_name, sched, task_progress_control, runner, params):
def __init__(self, task_allocation, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.

:param task_name: The name of the task for which the schedule is generated.
:param task_allocation: The task allocation for which the schedule is generated.
:param sched: The scheduler for this task.
:param task_progress_control: Controls how and how often this generator will loop.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
self.task_name = task_name
self.task_allocation = task_allocation
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
self.params = params
# TODO: Can we offload the parameter source execution to a different thread / process? Is this too heavy-weight?
#from concurrent.futures import ThreadPoolExecutor
#import asyncio
#self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
#self.loop = asyncio.get_event_loop()
# from concurrent.futures import ThreadPoolExecutor
# import asyncio
# self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
# self.loop = asyncio.get_event_loop()
@property
def ramp_up_wait_time(self):
"""
:return: the number of seconds to wait until this client should start so load can gradually ramp-up.
"""
ramp_up_time_period = self.task_allocation.task.ramp_up_time_period
if ramp_up_time_period:
return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients)
else:
return 0

def start(self):
self.task_progress_control.start()

def before_request(self, now):
self.sched.before_request(now)
Expand All @@ -2041,20 +2079,18 @@ async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
try:
next_scheduled = self.sched.next(next_scheduled)
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = self.params.percent_completed if param_source_knows_progress else None
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
# current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner,
self.params.params())
self.task_progress_control.next()
except StopIteration:
return
else:
self.task_progress_control.start()
while not self.task_progress_control.completed:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand Down
35 changes: 31 additions & 4 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1765,14 +1765,24 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False)
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False)
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False)

# now descent to each operation
tasks = []
for task in self._r(ops_spec, "tasks", error_ctx="parallel"):
tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations,
default_warmup_time_period, default_time_period, completed_by))
default_warmup_time_period, default_time_period, default_ramp_up_time_period, completed_by))

for task in tasks:
if task.ramp_up_time_period != default_ramp_up_time_period:
if default_ramp_up_time_period is None:
self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies "
f"a ramp-up-time-period but it is only allowed on the 'parallel' element.")
else:
self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing "
f"'parallel' element in test-procedure '{test_procedure_name}'.")
if completed_by:
completion_task = None
for task in tasks:
Expand All @@ -1788,7 +1798,8 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
return workload.Parallel(tasks, clients)

def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None,
default_warmup_time_period=None, default_time_period=None, completed_by_name=None):
default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None,
completed_by_name=None):

op_spec = task_spec["operation"]
if isinstance(op_spec, str) and op_spec in ops:
Expand All @@ -1811,6 +1822,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
default_value=default_warmup_time_period),
time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False,
default_value=default_time_period),
ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name,
mandatory=False, default_value=default_ramp_up_time_period),
clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1),
completes_parent=(task_name == completed_by_name),
schedule=schedule,
Expand All @@ -1819,11 +1832,25 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
if task.warmup_iterations is not None and task.time_period is not None:
self._error(
"Operation '%s' in test_procedure '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period))
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period))
elif task.warmup_time_period is not None and task.iterations is not None:
self._error(
"Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))

if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and "
f"{task.iterations} iterations but mixing time periods and iterations is not allowed.")

if task.ramp_up_time_period is not None:
if task.warmup_time_period is None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds but no warmup-time-period.")
elif task.warmup_time_period < task.ramp_up_time_period:
self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is "
f"{task.warmup_time_period} seconds but must be greater than or equal to the "
f"ramp-up-time-period of {task.ramp_up_time_period} seconds.")

return task

Expand Down
Loading
Loading