diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 8727e22a..e94159ed 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -652,6 +652,14 @@ def add_workload_source(subparser): help="Run a load test on your cluster, up to a certain QPS value (default: 0)", default=0 ) + test_execution_parser.add_argument( + "--redline-test", + help="Run a redline test on your cluster, up to a certain QPS value (default: 1000)", + nargs='?', + const=1000, # Value to use when flag is present but no value given + default=0, # Value to use when flag is not present + type=int + ) ############################################################################### # @@ -939,6 +947,8 @@ def configure_test(arg_parser, args, cfg): opts.csv_to_list(args.load_worker_coordinator_hosts)) cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) cfg.add(config.Scope.applicationOverride, "workload", "load.test.clients", int(args.load_test_qps)) + if args.redline_test: + cfg.add(config.Scope.applicationOverride, "workload", "redline.test", int(args.redline_test)) cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled) diff --git a/osbenchmark/utils/io.py b/osbenchmark/utils/io.py index dda18761..36bb9ed4 100644 --- a/osbenchmark/utils/io.py +++ b/osbenchmark/utils/io.py @@ -251,8 +251,6 @@ def ensure_symlink(source, link_name): os.remove(link_name) os.symlink(source, link_name) logger.info("Updated symlink: %s -> %s", link_name, source) - else: - logger.info("Symlink already correct: %s -> %s", link_name, source) elif os.path.isdir(link_name): shutil.rmtree(link_name) os.symlink(source, link_name) diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 4338d050..4eba93f3 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -32,9 +32,11 @@ import math import multiprocessing import queue +import random +import sys import threading from dataclasses import dataclass -from typing import Callable +from typing import Callable, List, Dict, Any import time from enum import Enum @@ -128,7 +130,7 @@ class StartWorker: Starts a worker. """ - def __init__(self, worker_id, config, workload, client_allocations): + def __init__(self, worker_id, config, workload, client_allocations, feedback_actor=None, error_queue=None, queue_lock=None, shared_states=None): """ :param worker_id: Unique (numeric) id of the worker. :param config: OSB internal configuration object. @@ -139,6 +141,10 @@ def __init__(self, worker_id, config, workload, client_allocations): self.config = config self.workload = workload self.client_allocations = client_allocations + self.feedback_actor = feedback_actor + self.error_queue = error_queue + self.queue_lock = queue_lock + self.shared_states = shared_states class Drive: @@ -198,6 +204,211 @@ def __init__(self, metrics, next_task_scheduled_in): self.metrics = metrics self.next_task_scheduled_in = next_task_scheduled_in +class ClusterErrorMessage: + """Message sent from the client when a request fails during load testing""" + def __init__(self, client_id, request_metadata): + self.client_id = client_id + self.request_metadata = request_metadata + +class SharedClientStateMessage: + """Message sent from the Worker to the FeedbackActor to share client state dictionaries""" + def __init__(self, worker_id=None, worker_clients_map=None): + self.worker_id = worker_id + self.worker_clients_map = worker_clients_map + +class EnableFeedbackScaling: + pass + +class DisableFeedbackScaling: + pass + +class FeedbackState(Enum): + """Various states for the FeedbackActor""" + NEUTRAL = "neutral" + SCALING_DOWN = "scaling_down" + SLEEP = "sleep" + SCALING_UP = "scaling_up" + DISABLED = "disabled" + +class StartFeedbackActor: + def __init__(self, error_queue=None, queue_lock=None, shared_states=None): + self.shared_states = shared_states + self.error_queue = error_queue + self.queue_lock = queue_lock + +# pylint: disable=too-many-public-methods +class FeedbackActor(actor.BenchmarkActor): + POST_SCALEDOWN_SECONDS = 30 + STARTUP_TIMEOUT = 30 + WAKEUP_INTERVAL = 1 + + def __init__(self) -> None: + super().__init__() + self.logger = logging.getLogger(__name__) + self.state = FeedbackState.DISABLED + self.shared_client_states = {} + self.workers_reported = 0 + self.total_client_count = 0 + self.total_active_client_count = 0 # must be tracked for scaling up/down + self.num_clients_to_scale_up = 5 + self.percentage_clients_to_scale_down = 0.10 + self.sleep_start_time = time.perf_counter() + self.last_error_time = time.perf_counter() - FeedbackActor.POST_SCALEDOWN_SECONDS + self.last_scaleup_time = time.perf_counter() - FeedbackActor.POST_SCALEDOWN_SECONDS + self.max_stable_clients = 0 # the value we want to return at the end of the test + # These will be passed in via StartFeedbackActor: + self.error_queue = None + self.queue_lock = None + + def receiveMsg_StartFeedbackActor(self, msg, sender) -> None: + """ + Initializes the FeedbackActor with expected worker count, client dictionaries, error queue, and queue lock. + """ + self.shared_client_states = msg.shared_states + self.total_client_count = sum(len(state) for state in self.shared_client_states.values()) + self.error_queue = msg.error_queue + self.queue_lock = msg.queue_lock + self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL)) + + def receiveMsg_WakeupMessage(self, msg, sender) -> None: + # Check state and re-schedule wakeups. + self.handle_state() + self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL)) + + def receiveUnrecognizedMessage(self, msg, sender) -> None: + self.logger.info("Received unrecognized message: %s", msg) + + def receiveMsg_EnableFeedbackScaling(self, msg, sender): + self.logger.info("FeedbackActor: scaling enabled.") + self.state = FeedbackState.SCALING_UP + + def receiveMsg_DisableFeedbackScaling(self, msg, sender): + self.logger.info("FeedbackActor: scaling disabled.") + self.state = FeedbackState.DISABLED + self.percentage_clients_to_scale_down = 1.0 + self.scale_down() + + def receiveMsg_ActorExitRequest(self, msg, sender): + print("Redline test finished. Maximum stable client number reached: %d" % self.max_stable_clients) + self.logger.info("FeedbackActor received ActorExitRequest and will shutdown") + if hasattr(self, 'shared_client_states'): + self.shared_client_states.clear() + + def check_for_errors(self) -> List[Dict[str, Any]]: + """Poll the error queue for errors.""" + errors = [] + try: + while True: + error = self.error_queue.get_nowait() + errors.append(error) + except queue.Empty: + pass # queue is empty + return errors + + def clear_queue(self) -> None: + """Clear any remaining items from the error queue.""" + while True: + try: + self.error_queue.get_nowait() + except queue.Empty: + break + + def handle_state(self) -> None: + current_time = time.perf_counter() + errors = self.check_for_errors() + + sys.stdout.write("\x1b[s") # Save cursor position + sys.stdout.write("\x1b[1B") # Move cursor down 1 line + sys.stdout.write("\r\x1b[2K") # Clear the line + sys.stdout.write(f"[Redline] Active clients: {self.total_active_client_count}") + sys.stdout.write("\x1b[u") # Restore cursor position + sys.stdout.flush() + + if self.state == FeedbackState.DISABLED: + return + + if self.state == FeedbackState.SLEEP: + if current_time - self.sleep_start_time >= self.POST_SCALEDOWN_SECONDS: + self.logger.info("Sleep period complete, returning to NEUTRAL state") + self.clear_queue() + self.state = FeedbackState.NEUTRAL + self.sleep_start_time = current_time + elif errors: + self.logger.info("Error messages detected, scaling down...") + self.state = FeedbackState.SCALING_DOWN + with self.queue_lock: # Block producers while scaling down. + self.scale_down() + self.logger.info("Clients scaled down. Active clients: %d", self.total_active_client_count) + self.last_error_time = current_time + elif self.state == FeedbackState.NEUTRAL: + self.max_stable_clients = max(self.max_stable_clients, self.total_active_client_count) # update the max number of stable clients + if (current_time - self.last_error_time >= self.POST_SCALEDOWN_SECONDS and + current_time - self.last_scaleup_time >= self.WAKEUP_INTERVAL): + self.logger.info("No errors in the last %d seconds, scaling up", self.POST_SCALEDOWN_SECONDS) + self.state = FeedbackState.SCALING_UP + + if self.state == FeedbackState.SCALING_UP: + self.logger.info("Scaling up...") + self.scale_up() + self.logger.info("Clients scaled up. Active clients: %d", self.total_active_client_count) + self.state = FeedbackState.NEUTRAL + + def scale_down(self) -> None: + try: + clients_to_pause = int(self.total_active_client_count * self.percentage_clients_to_scale_down) + if clients_to_pause <= 0: + self.logger.info("No clients to pause during scale down") + return + + # Create a flattened list of (worker_id, client_id) tuples for all active clients + all_active_clients = [] + for worker_id, client_states in self.shared_client_states.items(): + for client_id, status in client_states.items(): + if status: # Only include active clients + all_active_clients.append((worker_id, client_id)) + + # If we need to pause more clients than are active, adjust the count + clients_to_pause = min(clients_to_pause, len(all_active_clients)) + + # Select clients to pause - randomly sample for better distribution + clients_to_pause_indices = random.sample(range(len(all_active_clients)), clients_to_pause) + clients_to_pause_list = [all_active_clients[i] for i in clients_to_pause_indices] + + # Pause the selected clients in a single pass + for worker_id, client_id in clients_to_pause_list: + self.shared_client_states[worker_id][client_id] = False + self.total_active_client_count -= 1 + + self.logger.info("Scaling down complete. Paused %d clients", clients_to_pause) + finally: + self.state = FeedbackState.SLEEP + self.clear_queue() + self.sleep_start_time = self.last_scaleup_time = time.perf_counter() + + def scale_up(self) -> None: + try: + clients_activated = 0 + while clients_activated < self.num_clients_to_scale_up: + inactive_clients_by_worker = {} + for worker_id, client_states in self.shared_client_states.items(): + inactive_clients = [(client_id, status) for client_id, status in client_states.items() if not status] + if inactive_clients: + inactive_clients_by_worker[worker_id] = inactive_clients + for worker_id in inactive_clients_by_worker: + if clients_activated >= self.num_clients_to_scale_up: + break + if inactive_clients_by_worker[worker_id]: + client_id, _ = inactive_clients_by_worker[worker_id][0] + self.shared_client_states[worker_id][client_id] = True + self.total_active_client_count += 1 + clients_activated += 1 + self.logger.info("Unpaused client %d on worker %d", client_id, worker_id) + if clients_activated < self.num_clients_to_scale_up: + self.logger.info("Not enough inactive clients to activate. Activated %d clients", clients_activated) + break + finally: + self.last_scaleup_time = time.perf_counter() + self.state = FeedbackState.NEUTRAL class WorkerCoordinatorActor(actor.BenchmarkActor): RESET_RELATIVE_TIME_MARKER = "reset_relative_time" @@ -218,6 +429,8 @@ def __init__(self): self.status = "init" self.post_process_timer = 0 self.cluster_details = None + self.feedback_actor = None + self.worker_shared_states = {} def receiveMsg_PoisonMessage(self, poisonmsg, sender): self.logger.error("Main worker_coordinator received a fatal indication from load generator (%s). Shutting down.", poisonmsg.details) @@ -232,6 +445,11 @@ def receiveMsg_BenchmarkFailure(self, msg, sender): def receiveMsg_BenchmarkCancelled(self, msg, sender): self.logger.info("Main worker_coordinator received a notification that the benchmark has been cancelled.") self.coordinator.close() + # shut down FeedbackActor if it's active + # we do this manually in the workercoordinator since it's fully responsible for the feedback actor + if hasattr(self, "feedback_actor"): + self.logger.info("Shutting down FeedbackActor due to benchmark cancellation.") + self.send(self.feedback_actor, thespian.actors.ActorExitRequest()) self.send(self.start_sender, msg) def receiveMsg_ActorExitRequest(self, msg, sender): @@ -293,8 +511,18 @@ def receiveMsg_WakeupMessage(self, msg, sender): def create_client(self, host): return self.createActor(Worker, targetActorRequirements=self._requirements(host)) - def start_worker(self, worker_coordinator, worker_id, cfg, workload, allocations): - self.send(worker_coordinator, StartWorker(worker_id, cfg, workload, allocations)) + def start_worker(self, worker_coordinator, worker_id, cfg, workload, allocations, error_queue=None, queue_lock=None, shared_states=None): + self.send(worker_coordinator, StartWorker(worker_id, cfg, workload, allocations, self.feedback_actor, error_queue, queue_lock, shared_states)) + + def start_feedbackActor(self, shared_states): + self.send( + self.feedback_actor, + StartFeedbackActor( + shared_states=shared_states, + error_queue=self.coordinator.error_queue, + queue_lock=self.coordinator.queue_lock + ) + ) def drive_at(self, worker_coordinator, client_start_timestamp): self.send(worker_coordinator, Drive(client_start_timestamp)) @@ -532,6 +760,10 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor self.workers = [] # which client ids are assigned to which workers? self.clients_per_worker = {} + self.manager = multiprocessing.Manager() + self.shared_client_dict = self.manager.dict() + self.error_queue = None + self.queue_lock = self.manager.Lock() self.progress_results_publisher = console.progress() self.progress_counter = 0 @@ -658,15 +890,17 @@ def start_benchmark(self): self.logger.info("Attaching cluster-level telemetry devices.") self.telemetry.on_benchmark_start() self.logger.info("Cluster-level telemetry devices are now attached.") - # if load testing is enabled, modify the client + throughput number for the task(s) - # target throughput + clients will then be equal to the qps passed in through --load-test - load_test_clients = self.config.opts("workload", "load.test.clients", mandatory=False) + # if redline testing or load testing is enabled, modify the client + throughput number for the task(s) + # target throughput + clients will then be equal to the qps passed in through --redline-test or --load-test + load_test_clients = self.config.opts("workload", "redline.test", mandatory=False) or self.config.opts("workload", "load.test.clients", mandatory=False) if load_test_clients: + self.target.feedback_actor = self.target.createActor(FeedbackActor) + self.error_queue = self.manager.Queue(maxsize=1000) for task in self.test_procedure.schedule: for subtask in task: subtask.clients = load_test_clients subtask.params["target-throughput"] = load_test_clients - self.logger.info("Load test mode enabled - set client count to %d", load_test_clients) + self.logger.info("Load test mode enabled - set max client count to %d", load_test_clients) allocator = Allocator(self.test_procedure.schedule) self.allocations = allocator.allocations self.number_of_steps = len(allocator.join_points) - 1 @@ -680,6 +914,8 @@ def start_benchmark(self): worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients) worker_id = 0 + # redline testing: keep track of the total number of workers + # and report this to the feedbackActor before starting a redline test for assignment in worker_assignments: host = assignment["host"] for clients in assignment["workers"]: @@ -692,10 +928,20 @@ def start_benchmark(self): for client_id in clients: client_allocations.add(client_id, self.allocations[client_id]) self.clients_per_worker[client_id] = worker_id - self.target.start_worker(worker, worker_id, self.config, self.workload, client_allocations) + # if load testing is enabled, create a shared state dictionary for this worker + if load_test_clients: + self.shared_client_dict[worker_id] = self.manager.dict() + for client_id in clients: + self.shared_client_dict[worker_id][client_id] = False + # and send it along with the start_worker message. This way, the worker can pass it down to its assigned clients + self.target.start_worker(worker, worker_id, self.config, self.workload, client_allocations, + self.error_queue, self.queue_lock, shared_states=self.shared_client_dict[worker_id]) + else: + self.target.start_worker(worker, worker_id, self.config, self.workload, client_allocations) self.workers.append(worker) worker_id += 1 - + if load_test_clients: + self.target.start_feedbackActor(self.shared_client_dict) self.update_progress_message() def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations): @@ -703,6 +949,9 @@ def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations) self.workers_completed_current_step[worker_id] = (worker_local_timestamp, time.perf_counter()) self.logger.info("[%d/%d] workers reached join point [%d/%d].", self.currently_completed, len(self.workers), self.current_step + 1, self.number_of_steps) + # if we're in redline test mode, disable the feedback actor and pause all clients when we're at a joinpoint + if self.config.opts("workload", "redline.test", mandatory=False): + self.target.send(self.target.feedback_actor, DisableFeedbackScaling()) if self.currently_completed == len(self.workers): self.logger.info("All workers completed their tasks until join point [%d/%d].", self.current_step + 1, self.number_of_steps) # we can go on to the next step @@ -734,6 +983,9 @@ def joinpoint_reached(self, worker_id, worker_local_timestamp, task_allocations) self.target.on_benchmark_complete(m) else: self.move_to_next_task(workers_curr_step) + # re-enable the feedback actor for the next task if we're in redline testing + if self.config.opts("workload", "redline.test", mandatory=False): + self.target.send(self.target.feedback_actor, EnableFeedbackScaling()) else: self.may_complete_current_task(task_allocations) @@ -1077,6 +1329,10 @@ def __init__(self): self.start_driving = False self.wakeup_interval = Worker.WAKEUP_INTERVAL_SECONDS self.sample_queue_size = None + self.shared_states = None + self.feedback_actor = None + self.error_queue = None + self.queue_lock = None @actor.no_retry("worker") # pylint: disable=no-value-for-parameter def receiveMsg_StartWorker(self, msg, sender): @@ -1091,6 +1347,10 @@ def receiveMsg_StartWorker(self, msg, sender): self.client_allocations = msg.client_allocations self.current_task_index = 0 self.cancel.clear() + self.feedback_actor = msg.feedback_actor + self.shared_states = msg.shared_states + self.error_queue = msg.error_queue + self.queue_lock = msg.queue_lock # we need to wake up more often in test mode if self.config.opts("workload", "test.mode.enabled"): self.wakeup_interval = 0.5 @@ -1218,7 +1478,7 @@ def drive(self): self.logger.info("Worker[%d] is executing tasks at index [%d].", self.worker_id, self.current_task_index) self.sampler = Sampler(start_timestamp=time.perf_counter(), buffer_size=self.sample_queue_size) executor = AsyncIoAdapter(self.config, self.workload, task_allocations, self.sampler, - self.cancel, self.complete, self.on_error) + self.cancel, self.complete, self.on_error, self.shared_states, self.feedback_actor, self.error_queue, self.queue_lock) self.executor_future = self.pool.submit(executor) self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval)) @@ -1487,7 +1747,7 @@ def map_task_throughput(self, current_samples): class AsyncIoAdapter: - def __init__(self, cfg, workload, task_allocations, sampler, cancel, complete, abort_on_error): + def __init__(self, cfg, workload, task_allocations, sampler, cancel, complete, abort_on_error, shared_states=None, feedback_actor=None, error_queue=None, queue_lock=None): self.cfg = cfg self.workload = workload self.task_allocations = task_allocations @@ -1499,6 +1759,10 @@ def __init__(self, cfg, workload, task_allocations, sampler, cancel, complete, a self.assertions_enabled = self.cfg.opts("worker_coordinator", "assertions") self.debug_event_loop = self.cfg.opts("system", "async.debug", mandatory=False, default_value=False) self.logger = logging.getLogger(__name__) + self.shared_states = shared_states + self.feedback_actor = feedback_actor + self.error_queue = error_queue + self.queue_lock = queue_lock def __call__(self, *args, **kwargs): # only possible in Python 3.7+ (has introduced get_running_loop) @@ -1554,7 +1818,7 @@ def os_clients(all_hosts, all_client_options): schedule = schedule_for(task_allocation, params_per_task[task]) async_executor = AsyncExecutor( client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete, - task.error_behavior(self.abort_on_error), self.cfg) + task.error_behavior(self.abort_on_error), self.cfg, self.shared_states, self.feedback_actor, self.error_queue, self.queue_lock) final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor aws.append(final_executor()) run_start = time.perf_counter() @@ -1606,7 +1870,8 @@ async def __call__(self, *args, **kwargs): class AsyncExecutor: - def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error, config=None): + def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error, + config=None, shared_states=None, feedback_actor=None, error_queue=None, queue_lock=None): """ Executes tasks according to the schedule for a given operation. @@ -1630,8 +1895,15 @@ def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, compl self.logger = logging.getLogger(__name__) self.cfg = config self.message_producer = None # Producer will be lazily created when needed. + self.shared_states = shared_states + self.feedback_actor = feedback_actor + self.error_queue = error_queue + self.queue_lock = queue_lock + self.redline_enabled = self.cfg.opts("workload", "redline.test", mandatory=False) if self.cfg else False async def __call__(self, *args, **kwargs): + has_run_any_requests = False + was_paused = False task_completes_parent = self.task.completes_parent total_start = time.perf_counter() # lazily initialize the schedule @@ -1653,6 +1925,27 @@ async def __call__(self, *args, **kwargs): if self.cancel.is_set(): self.logger.info("User cancelled execution.") break + + # redline testing: check whether this client should be running + # if redline testing is not enabled, there won't be a dictionary shared to this client, + # so we evaluate to a truthy value by default, allowing it to run normally in a regular benchmark + client_state = (self.shared_states or {}).get(self.client_id, True) + if client_state and was_paused: + print("Client %d is now unpaused" % self.client_id) + now = time.perf_counter() + total_start = now - expected_scheduled_time + was_paused = False + elif not client_state: + was_paused = True + processing_end = time.perf_counter() + total_ops = 0 + total_ops_unit = "ops" + request_meta_data = {"success": False, "skipped": True} + self.schedule_handle.after_request(processing_end, total_ops, total_ops_unit, request_meta_data) + if self.complete.is_set(): + break + continue + absolute_expected_schedule_time = total_start + expected_scheduled_time throughput_throttled = expected_scheduled_time > 0 if throughput_throttled: @@ -1681,11 +1974,21 @@ async def __call__(self, *args, **kwargs): # Execute with the appropriate context manager async with context_manager as request_context: - total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error) + total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error, self.redline_enabled) request_start = request_context.request_start request_end = request_context.request_end client_request_start = request_context.client_request_start client_request_end = request_context.client_request_end + # redline testing: send any bad requests to the FeedbackActor + if not request_meta_data["success"] and not request_meta_data.get("skipped", False): + if self.error_queue is not None: + self.logger.error("Real error detected in client %s. Notifying FeedbackActor...", self.client_id) + error_info = { + "client_id": self.client_id, + "task": str(self.task), + "error_details": request_meta_data + } + self.report_error(error_info) processing_end = time.perf_counter() service_time = request_end - request_start @@ -1723,10 +2026,11 @@ async def __call__(self, *args, **kwargs): else: progress = percent_completed - self.sampler.add(self.task, self.client_id, sample_type, request_meta_data, - absolute_processing_start, request_start, - latency, service_time, client_processing_time, processing_time, throughput, total_ops, total_ops_unit, - time_period, progress, request_meta_data.pop("dependent_timing", None)) + if client_state: + self.sampler.add(self.task, self.client_id, sample_type, request_meta_data, + absolute_processing_start, request_start, + latency, service_time, client_processing_time, processing_time, throughput, total_ops, total_ops_unit, + time_period, progress, request_meta_data.pop("dependent_timing", None)) if completed: self.logger.info("Task [%s] is considered completed due to external event.", self.task) @@ -1736,7 +2040,7 @@ async def __call__(self, *args, **kwargs): raise exceptions.BenchmarkError(f"Cannot run task [{self.task}]: {e}") from None finally: # Actively set it if this task completes its parent - if task_completes_parent: + if task_completes_parent or (self.redline_enabled and not has_run_any_requests): self.logger.info("Task [%s] completes parent. Client id [%s] is finished executing it and signals completion.", self.task, self.client_id) self.complete.set() @@ -1744,10 +2048,17 @@ async def __call__(self, *args, **kwargs): await self.message_producer.stop() self.message_producer = None # Reset for future calls. + def report_error(self, error_info): + if self.error_queue is not None: + try: + self.error_queue.put_nowait(error_info) + except queue.Full: + self.logger.warning("Error queue full; dropping error from client %s", self.client_id) + request_context_holder = client.RequestContextHolder() -async def execute_single(runner, opensearch, params, on_error): +async def execute_single(runner, opensearch, params, on_error, redline_enabled=False): """ Invokes the given runner once and provides the runner's return value in a uniform structure. @@ -1785,7 +2096,7 @@ async def execute_single(runner, opensearch, params, on_error): "success": False, "error-type": "transport" } - # The ES client will sometimes return string like "N/A" or "TIMEOUT" for connection errors. + # The OS client will sometimes return string like "N/A" or "TIMEOUT" for connection errors. if isinstance(e.status_code, int): request_meta_data["http-status"] = e.status_code # connection timeout errors don't provide a helpful description @@ -1812,7 +2123,8 @@ async def execute_single(runner, opensearch, params, on_error): description = request_meta_data.get("error-description") if description: msg += ", Description: %s" % description - console.error(msg) + if not redline_enabled: + console.error(msg) raise exceptions.BenchmarkAssertionError(msg) if 'error-description' in request_meta_data: @@ -1820,12 +2132,14 @@ async def execute_single(runner, opensearch, params, on_error): error_metadata = json.loads(request_meta_data["error-description"]) # parse error-description metadata opensearch_operation_error = parse_error(error_metadata) - console.error(opensearch_operation_error.get_error_message()) + if not redline_enabled: + console.error(opensearch_operation_error.get_error_message()) except Exception as e: # error-description is not a valid json so we just print it - console.error(request_meta_data["error-description"]) - - logging.getLogger(__name__).error(request_meta_data["error-description"]) + if not redline_enabled: + console.error(request_meta_data["error-description"]) + if not redline_enabled: + logging.getLogger(__name__).error(request_meta_data["error-description"]) return total_ops, total_ops_unit, request_meta_data diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index 73af71d8..d5351633 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -25,6 +25,7 @@ import asyncio import collections import io +import queue import threading import time import unittest.mock as mock @@ -1854,3 +1855,104 @@ async def f(x): self.assertEqual(2, return_value) duration = end - start self.assertTrue(0.9 <= duration <= 1.2, "Should sleep for roughly 1 second but took [%.2f] seconds." % duration) + +class FeedbackActorTests(TestCase): + @pytest.fixture(autouse=True) + def setup_actor(self): + self.monkeypatch = pytest.MonkeyPatch() + self.monkeypatch.setattr("osbenchmark.log.post_configure_actor_logging", lambda: None) + self.actor = worker_coordinator.FeedbackActor() + self.actor.error_queue = queue.Queue() + self.actor.queue_lock = mock.MagicMock() + + def test_receive_shared_client_state_sets_total_client_count(self): + self.actor.wakeupAfter = mock.MagicMock() + shared_states = { + 0: {0: False, 1: False}, + 1: {2: False, 3: False, 4: False} + } + message = worker_coordinator.StartFeedbackActor( + shared_states=shared_states, + error_queue=queue.Queue(), + queue_lock=mock.MagicMock() + ) + self.actor.receiveMsg_StartFeedbackActor(message, sender=None) + + assert self.actor.total_client_count == 5 + assert self.actor.total_active_client_count == 0 + self.actor.wakeupAfter.assert_called_once() + + def test_receive_start_feedback_actor_sets_queue_refs(self): + self.actor.wakeupAfter = mock.MagicMock() + dummy_error_queue = mock.MagicMock() + dummy_queue_lock = mock.MagicMock() + dummy_states = {0: {0: False}} + + message = worker_coordinator.StartFeedbackActor( + shared_states=dummy_states, + error_queue=dummy_error_queue, + queue_lock=dummy_queue_lock + ) + self.actor.receiveMsg_StartFeedbackActor(message, sender=None) + + assert self.actor.error_queue == dummy_error_queue + assert self.actor.queue_lock == dummy_queue_lock + assert self.actor.shared_client_states == dummy_states + self.actor.wakeupAfter.assert_called_once() + + def test_scale_up_only_activates_n_clients(self): + self.actor.shared_client_states = { + 0: {0: False, 1: False}, + 1: {2: False, 3: False} + } + self.actor.total_active_client_count = 0 + + self.actor.num_clients_to_scale_up = 2 + self.actor.scale_up() + assert self.actor.total_active_client_count == 2 + + def test_scale_down_pauses_percentage(self): + self.actor.shared_client_states = { + 0: {0: True, 1: True}, + 1: {2: True, 3: True, 4: False} + } + self.actor.total_active_client_count = 4 # 4 active clients + + self.actor.percentage_clients_to_scale_down = 0.5 + self.actor.scale_down() + + assert self.actor.total_active_client_count == 2 + + def test_handle_state_scales_up_only_when_conditions_met(self): + self.actor.state = worker_coordinator.FeedbackState.NEUTRAL + self.actor.total_active_client_count = 0 + self.actor.last_error_time = time.perf_counter() - 31 + self.actor.last_scaleup_time = time.perf_counter() - 2 + self.actor.shared_client_states = { + 0: {0: False, 1: False}, + 1: {2: False, 3: False} + } + + self.monkeypatch.setattr(self.actor, "check_for_errors", lambda: []) + + self.actor.handle_state() + + assert self.actor.state == worker_coordinator.FeedbackState.NEUTRAL + assert self.actor.total_active_client_count > 0 + + self.monkeypatch.undo() + + + def test_handle_state_enters_sleep_on_error(self): + self.actor.state = worker_coordinator.FeedbackState.NEUTRAL + self.actor.total_active_client_count = 2 + self.actor.shared_client_states = { + 0: {0: True, 1: True}, + 1: {2: True, 3: True, 4: False} + } + + self.actor.error_queue.put({"error": "foo"}) + self.actor.handle_state() + + assert self.actor.state == worker_coordinator.FeedbackState.SLEEP + assert self.actor.total_active_client_count < 4