-
Notifications
You must be signed in to change notification settings - Fork 100
Introduce Redline Testing to OSB #793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments but overall LGTM
83a605c
to
9749d60
Compare
except Exception as e: | ||
self.logger.error("Error processing client states: %s", e) | ||
|
||
def receiveMsg_StartFeedbackActor(self, msg, sender): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message should only be called once all the workers have updated the client mapping.
We need to have some kind of validation done to confirm if all workers have updated the client mappings, send an ack back to calling actor and then that actor can start the feedback actor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, noting this - I'll add this in a follow up revision
except Exception as e: | ||
self.logger.error("Error processing client states: %s", e) | ||
|
||
def receiveMsg_StartFeedbackActor(self, msg, sender): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actor should also call the handle_state, not receiveMsg_SharedClientStateMessage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message will start the wake up loop which in turn will begin calling handle_state
self.messageQueue.clear() | ||
self.sleep_start_time = time.perf_counter() | ||
|
||
def scale_up(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bringing up 1 client at a time will be time consuming, especially when spinning up 1000s of clients.
How about we bring up clients in steps of n or multiple of n and also keep a check on self.state
, if it is set to scale down whenever an error message is received, we can just ignore the logic and do nothing? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, 1 client/sec is time consuming. For now, I changed it to 5/sec but can increase it if 10 or more if you think it should be faster to start.
I think the scale_up function should just take a number of clients as an argument and attempt to activate that many clients round-robin style until it hits the target goal. For now it's just linear but the logic of changing the number of clients in multiples or steps can be added in a separate function. What do you think of this:
We check the state every second and choose whether to scale up or down depending on the message queue and a couple other factors (whether we errored recently, or scaled up too recently). For future scale-up methods, we can keep another class attribute n
that we manipulate in a separate function if we want to scale up in different ways e.g. exponentially or percentage based, and then call scale_up to activate that many clients. Does that make sense?
During the development of this PR, we discovered a bug where the OSB benchmark would not complete due to Workers not reaching their designated joinpoints. This was caused by how clients handled the 'paused' state. Instead of progressing through their schedules, they would enter a loop where they slept ( As a result, many clients remained stuck which prevented the test from completing. To fix this, we updated the logic so clients now continue executing as normal, but without sending requests when they are meant to be paused. This change allows the FeedbackActor to control the number of active clients sending requests to a cluster and throttle the load generation without interrupting the overall flow of the benchmark. |
@OVI3D0 It would be great if you could add some details around how clients are reporting error to feedback actor using a blocking queue, and how it is different from the method you first implemented. |
We also introduced the use of We decided to go with a shared Queue between the FeedbackActor and individual clients because Thespianpy Actors handle messaging in a single-threaded, synchronized fashion unless using an actor troupe. (see docs). Because of the nature of load testing and the strong chance of hundreds or even thousands of clients failing simultaneously, the original approach would often cause significant lag due to the overwhelming volume of messages being sent synchronously. With shared multiprocessing queue's and locks, individual clients can now simply enqueue failed request metadata to the shared queue without sending any messages, and the FeedbackActor can freely snoop through this queue at regular intervals. This has proven to be able to be far more scalable than the previous implementation in tests with thousands of active clients. |
osbenchmark/benchmark.py
Outdated
@@ -939,6 +947,7 @@ def configure_test(arg_parser, args, cfg): | |||
opts.csv_to_list(args.load_worker_coordinator_hosts)) | |||
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) | |||
cfg.add(config.Scope.applicationOverride, "workload", "load.test.clients", int(args.load_test_qps)) | |||
cfg.add(config.Scope.applicationOverride, "workload", "redline.test", int(args.redline_test)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config should only be added when the appropriate flag is passed.
@@ -251,8 +251,6 @@ def ensure_symlink(source, link_name): | |||
os.remove(link_name) | |||
os.symlink(source, link_name) | |||
logger.info("Updated symlink: %s -> %s", link_name, source) | |||
else: | |||
logger.info("Symlink already correct: %s -> %s", link_name, source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very spammy (will log 20+ times per benchmark) & does not provide much value to the user IMO
self.logger.info("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients)) | ||
worker = self.target.create_client(host) | ||
|
||
client_allocations = ClientAllocations() | ||
for client_id in clients: | ||
client_allocations.add(client_id, self.allocations[client_id]) | ||
self.clients_per_worker[client_id] = worker_id | ||
self.target.start_worker(worker, worker_id, self.config, self.workload, client_allocations) | ||
self.target.start_worker(worker, worker_id, self.config, self.workload, client_allocations, self.error_queue, self.queue_lock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't want to modify any logic with respect to existing functionality of OSB. It would be better to add a check here, if load testing enabled then send it with additional parameters else keep the original.
self.total_client_count = 0 | ||
self.total_active_client_count = 0 # must be tracked for scaling up/down | ||
self.sleep_start_time = time.perf_counter() | ||
self.last_error_time = time.perf_counter() - FeedbackActor.POST_SCALEDOWN_SECONDS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not able to wrap my head around this and below initialization. Why are we setting the timestamp to past, instead of initializing to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FeedbackActor only scales if there are no errors in the past 30 seconds, so by initializing the last error time to '30 seconds in the past' we can begin scaling up immediately
self.scale_down() | ||
self.logger.info("Clients scaled down. Active clients: %d", self.total_active_client_count) | ||
self.last_error_time = current_time | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove the extra lines between if-elif blocks.
self.logger.info("Clients scaled up. Active clients: %d", self.total_active_client_count) | ||
self.state = FeedbackState.NEUTRAL | ||
|
||
def scale_down(self, scale_down_percentage=0.10) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about rewriting this with below:
def scale_down(self, scale_down_percentage=0.10) -> None:
try:
clients_to_pause = int(self.total_active_client_count * scale_down_percentage)
if clients_to_pause <= 0:
self.logger.info("No clients to pause during scale down")
return
# Create a flattened list of (worker_id, client_id) tuples for all active clients
all_active_clients = []
for worker_id, client_states in self.shared_client_states.items():
for client_id, status in client_states.items():
if status: # Only include active clients
all_active_clients.append((worker_id, client_id))
# If we need to pause more clients than are active, adjust the count
clients_to_pause = min(clients_to_pause, len(all_active_clients))
# Select clients to pause - randomly sample for better distribution
import random
clients_to_pause_indices = random.sample(range(len(all_active_clients)), clients_to_pause)
clients_to_pause_list = [all_active_clients[i] for i in clients_to_pause_indices]
# Pause the selected clients in a single pass
for worker_id, client_id in clients_to_pause_list:
self.shared_client_states[worker_id][client_id] = False
self.total_active_client_count -= 1
self.logger.info("Scaling down complete. Paused %d clients", clients_to_pause)
finally:
self.state = FeedbackState.SLEEP
self.clear_queue()
self.sleep_start_time = self.last_scaleup_time = time.perf_counter()
Complexity reduced from O(n^2) to O(n) and on the second thought randomization provides better load distribution. You can ignore randomization point if you don't agree and this seems to be working as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this looks good thank you. Will add this
@@ -1215,10 +1495,12 @@ def drive(self): | |||
self.logger.info("Worker[%d] skips tasks at index [%d] because it has been asked to complete all " | |||
"tasks until next join point.", self.worker_id, self.current_task_index) | |||
else: | |||
if self.config.opts("workload", "redline.test", mandatory=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too sure about this, why each worker has to send this signal.
Ideally all the coordination between Worker actors and Feedback Actor should happen via WorkerCoordinatorActor. Can you look at any other ways to handle joinpoint condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same goes with disabling.
I'm sure there must be an overall status being tracked at WCA level to track when all the workers have synchronized to jointpoint, we can use that to signal enabling or disabling Feedback actor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can track the overall status but the issue comes when some workers reached the joinpoint and are waiting for others to catch up. I've come across workers waiting 30+ seconds at a joinpoint during testing and it will result in inaccurate results since some clients are 'unpaused' but are actually just waiting at a joinpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't get the last part. My point is that WCA is already tracking if all the actors have reached joint-point here. Can we send a message from here to Feedback actor to stop?
Signed-off-by: Michael Oviedo <[email protected]>
Signed-off-by: Michael Oviedo <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments but LGTM. This is will be really useful @OVI3D0
…ling the feedback actor Signed-off-by: Michael Oviedo <[email protected]>
Signed-off-by: Michael Oviedo <[email protected]>
@@ -1091,6 +1342,10 @@ def receiveMsg_StartWorker(self, msg, sender): | |||
self.client_allocations = msg.client_allocations | |||
self.current_task_index = 0 | |||
self.cancel.clear() | |||
self.feedback_actor = msg.feedback_actor | |||
self.shared_states = msg.shared_states | |||
self.error_queue = msg.error_queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be declared in init method.
self.feedback_actor = msg.feedback_actor | ||
self.shared_states = msg.shared_states | ||
self.error_queue = msg.error_queue | ||
self.queue_lock = msg.queue_lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
Looks good. Can you please address the nits and also add some test run data? |
Signed-off-by: Michael Oviedo <[email protected]>
Here is a comparison of two regular benchmark runs, on versions of OSB without (baseline) and with (contender) my changes applied. These are both run against a 3 manager node, 3 data node cluster along with 1 client node running OS 2.15.0:
|
Signed-off-by: Michael Oviedo <[email protected]>
Signed-off-by: Michael Oviedo <[email protected]>
Description
Redline testing enables OSB to determine the maximum request throughput a cluster can handle under increasing load, which can help with capacity planning and detecting regressions in OpenSearch clusters.
This PR introduces redline testing to OSB, and enables automatic scaling of clients until the cluster begins to error. It implements:
FeedbackActor
to throttle active clientsWorkerCoordinatorActor
,Worker
, andAsyncExecutor
Redline Testing Overview
If the
--redline-test
flag is passed with a timed test procedure, such as:OSB will then:
--redline-test=<int>
)FeedbackActor
which controls which clients should runmultiprocessing
) to enable real-time coordination:FeedbackActor
Shared State Format
The client state dict managed by the
FeedbackActor
looks like:Clients only send requests when their status is True.
At the end of the test, OSB will print out the maximum number of clients the cluster was able to reach without error. This value is updated every time the FeedbackActor is in a
NEUTRAL
state, meaning no recent errors.Based on the RFC introduced recently
Issues Resolved
#790 #791 #792
Testing
make it
+make test
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.