Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce new FeedbackActor #793

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,162 @@ def __init__(self, metrics, next_task_scheduled_in):
self.metrics = metrics
self.next_task_scheduled_in = next_task_scheduled_in

class ClusterErrorMessage:
"""Message sent from the client when a request fails during load testing"""
def __init__(self, client_id, request_metadata):
self.client_id = client_id
self.request_metadata = request_metadata

class FeedbackState(Enum):
"""Various states for the FeedbackActor"""
NORMAL = "normal"
SCALING_DOWN = "scaling_down"
SLEEP = "sleep"
SCALING_UP = "scaling_up"

class FeedbackActor(actor.BenchmarkActor):
POST_SCALEDOWN_SECONDS = 30
WAKEUP_INTERVAL = 1

def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
self.state = FeedbackState.SCALING_UP
self.messageQueue = queue.Queue()
self.shared_client_states = {}
self.total_active_client_count = 0
self.sleep_start_time = None
self.last_error_time = None
self.last_scaleup_time = None

def handle_state(self):
current_time = time.time()

if self.state == FeedbackState.SLEEP:
# Check if we've slept for long enough to return to a normal state
if current_time - self.sleep_start_time >= self.POST_SCALEDOWN_SECONDS:
self.logger.info("Feedback Actor's sleep period complete, returning to NORMAL state")
self.state = FeedbackState.NORMAL
self.sleep_start_time = None
return

if self.messageQueue.qsize() > 0:
self.logger.info("Feedback Actor has received an error message, scaling down...")
self.state = FeedbackState.SCALING_DOWN
self.scale_down()
self.logger.info("Clients scaled down. Number of active clients: %d", self.total_active_client_count)

if self.state == FeedbackState.NORMAL:
# Check if we've waited long enough since the last scaledown
if current_time - self.last_error_time >= self.POST_SCALEDOWN_SECONDS:
if (self.last_scaleup_time is None or
current_time - self.last_scaleup_time >= self.WAKEUP_INTERVAL):
self.logger.info("no errors in the last 30 seconds, scaling up")
self.state = FeedbackState.SCALING_UP
self.scale_up()
self.last_scaleup_time = current_time
else:
self.logger.info("Cluster has errored too recently, waiting before scaling up")

def receiveMsg_ClusterErrorMessage(self, msg, sender):
self.last_error_time = time.time()
self.logger.info("Feedback actor has recevied an error message.")
if self.state == FeedbackState.SCALING_DOWN:
self.logger.info("Already scaling down, ignoring error")
return
elif self.state == FeedbackState.SLEEP:
self.logger.info("In sleep mode, ignoring error")
return
elif self.state == FeedbackState.NORMAL:
# Add error message to queue
try:
self.messageQueue.put(msg)
except Exception as e:
self.logger.error("Error adding message to queue: %s", e)

def scale_down(self, scale_down_percentage=0.10):
self.scaledown_timer = 0
try:
# calculate target number of clients to pause
clients_to_pause = int(self.total_active_client_count * scale_down_percentage)
clients_paused = 0

# Get active clients (True status) for each worker
active_clients_by_worker = {}
for worker_id, client_states in self.shared_client_states.items():
active_clients = [(client_id, status) for client_id, status in client_states.items() if status]
if active_clients:
active_clients_by_worker[worker_id] = active_clients

# round-robin through workers until we've paused enough clients
while clients_paused < clients_to_pause and active_clients_by_worker:
for worker_id in list(active_clients_by_worker.keys()):
if clients_paused >= clients_to_pause:
break
# if a client is already paused, remove it from the temporary dict of active clients
if not active_clients_by_worker[worker_id]:
del active_clients_by_worker[worker_id]
continue

# take one client from this worker
client_id, _ = active_clients_by_worker[worker_id].pop(0)
self.shared_client_states[worker_id][client_id] = False
clients_paused += 1
self.total_active_client_count -= 1
self.logger.info("Scaling down complete. Paused %d clients", clients_paused)

finally:
# enter sleep state to block any new messages
self.state = FeedbackState.SLEEP
# clear the message queue
while not self.messageQueue.empty():
self.messageQueue.get()
self.sleep_start_time = time.time()
self.last_scaleup_time = None

def scale_up(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bringing up 1 client at a time will be time consuming, especially when spinning up 1000s of clients.
How about we bring up clients in steps of n or multiple of n and also keep a check on self.state, if it is set to scale down whenever an error message is received, we can just ignore the logic and do nothing? WDYT?

Copy link
Member Author

@OVI3D0 OVI3D0 Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, 1 client/sec is time consuming. For now, I changed it to 5/sec but can increase it if 10 or more if you think it should be faster to start.

I think the scale_up function should just take a number of clients as an argument and attempt to activate that many clients round-robin style until it hits the target goal. For now it's just linear but the logic of changing the number of clients in multiples or steps can be added in a separate function. What do you think of this:

We check the state every second and choose whether to scale up or down depending on the message queue and a couple other factors (whether we errored recently, or scaled up too recently). For future scale-up methods, we can keep another class attribute n that we manipulate in a separate function if we want to scale up in different ways e.g. exponentially or percentage based, and then call scale_up to activate that many clients. Does that make sense?

try:
# Get inactive clients (False status) for each worker
inactive_clients_by_worker = {}
for worker_id, client_states in self.shared_client_states.items():
inactive_clients = [(client_id, status) for client_id, status in client_states.items() if not status]
if inactive_clients:
inactive_clients_by_worker[worker_id] = inactive_clients

# Find first inactive client and activate it
client_activated = False
for worker_id in inactive_clients_by_worker:
if inactive_clients_by_worker[worker_id]:
# Take one inactive client from this worker
client_id, _ = inactive_clients_by_worker[worker_id][0]
self.shared_client_states[worker_id][client_id] = True
client_activated = True
self.total_active_client_count += 1
self.logger.info("Unpaused client %d on worker %d", client_id, worker_id)
break

if not client_activated:
print("No inactive clients found to activate")
finally:
self.state = FeedbackState.NORMAL

def receiveMsg_dict(self, msg, sender):
try:
self.shared_client_states[msg['worker_id']] = msg['data']
self.handle_state()
except Exception as e:
print("Error processing client states: %s", e)

def receiveMsg_StartFeedbackActor(self, msg, sender):
self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL))

def receiveMsg_WakeupMessage(self, msg, sender):
# Upon waking up, check state
self.handle_state()
self.wakeupAfter(datetime.timedelta(seconds=FeedbackActor.WAKEUP_INTERVAL))

def receiveUnrecognizedMessage(self, msg, sender):
print("Received unrecognized message: %s", msg)

class WorkerCoordinatorActor(actor.BenchmarkActor):
RESET_RELATIVE_TIME_MARKER = "reset_relative_time"
Expand Down
Loading