Skip to content

Commit e6ca40b

Browse files
committed
fix
1 parent 0a9681c commit e6ca40b

2 files changed

Lines changed: 9 additions & 10 deletions

File tree

src/backend/server/rpc_connection_handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def node_update(self, message):
6363
current_requests=node.current_requests,
6464
layer_latency_ms=node.layer_latency_ms,
6565
new_rtt_to_nodes=node.rtt_to_nodes,
66+
is_active=node.is_active,
6667
)
6768
return {}
6869
except Exception as e:

src/scheduling/scheduler.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def __init__(
8888
self._pending_joins: "queue.Queue[Node]" = queue.Queue()
8989
self._pending_leaves: "queue.Queue[str]" = queue.Queue()
9090
self._pending_node_updates: (
91-
"queue.Queue[Tuple[str, Optional[int], Optional[float], Optional[Dict[str, float]]]]"
91+
"queue.Queue[Tuple[str, Optional[int], Optional[float], Optional[Dict[str, float]], Optional[bool]]]"
9292
) = queue.Queue()
9393

9494
# Concurrency controls
@@ -192,6 +192,7 @@ def update_node_info(
192192
current_requests: Optional[int] = None,
193193
layer_latency_ms: Optional[float] = None,
194194
new_rtt_to_nodes: Optional[Dict[str, float]] = None,
195+
is_active: Optional[bool] = None,
195196
) -> None:
196197
"""Update the info of a node."""
197198
if current_requests is not None:
@@ -200,6 +201,8 @@ def update_node_info(
200201
node.set_layer_latency_ms(layer_latency_ms)
201202
if new_rtt_to_nodes is not None:
202203
node.rtt_to_nodes.update(new_rtt_to_nodes)
204+
if is_active is not None:
205+
node.is_active = is_active
203206
node.last_heartbeat = time.time()
204207
# logger.debug(
205208
# "Node updated: %s (requests=%s, latency_ms=%s, rtt_updates=%s)",
@@ -228,19 +231,13 @@ def enqueue_node_update(
228231
current_requests: Optional[int] = None,
229232
layer_latency_ms: Optional[float] = None,
230233
new_rtt_to_nodes: Optional[Dict[str, float]] = None,
234+
is_active: Optional[bool] = None,
231235
) -> None:
232236
"""Enqueue a node update event."""
233237
self._pending_node_updates.put(
234-
(node_id, current_requests, layer_latency_ms, new_rtt_to_nodes)
238+
(node_id, current_requests, layer_latency_ms, new_rtt_to_nodes, is_active)
235239
)
236240
self._wake_event.set()
237-
# logger.debug(
238-
# "Enqueued node update: %s (requests=%s, latency_ms=%s, rtt_updates=%s)",
239-
# node_id,
240-
# current_requests,
241-
# layer_latency_ms,
242-
# 0 if new_rtt_to_nodes is None else len(new_rtt_to_nodes),
243-
# )
244241

245242
def checking_node_heartbeat(self) -> None:
246243
"""Check the heartbeat of all nodes."""
@@ -457,14 +454,15 @@ def _process_node_updates(self) -> None:
457454
"""Apply pending node stats updates from the queue."""
458455
while True:
459456
try:
460-
node_id, cur, lat, rtts = self._pending_node_updates.get_nowait()
457+
node_id, cur, lat, rtts, is_active = self._pending_node_updates.get_nowait()
461458
except queue.Empty:
462459
break
463460
self.update_node_info(
464461
self.node_id_to_node[node_id],
465462
current_requests=cur,
466463
layer_latency_ms=lat,
467464
new_rtt_to_nodes=rtts,
465+
is_active=is_active,
468466
)
469467

470468
def _process_joins(self) -> None:

0 commit comments

Comments
 (0)