Skip to content

Commit 4e6d27e

Browse files
alexterceteidan-arm
authored andcommitted
remote/coordinator: run reservation scheduling under lock
Reservation scheduling mutates shared coordinator state (`reservations`, `places`, and the derived `place.reservation` view), but the periodic scheduler path was performing these updates without holding the coordinator lock. This can lead to races between the poll-based scheduler and concurrent RPC handlers (e.g. acquire/release/reservation operations), potentially resulting in inconsistent state or incorrect reservation transitions. We now run reservation scheduling under the coordinator lock by making it async and awaiting it in the poll path. This ensures that all mutations of coordinator state are serialized consistently with the rest of the coordinator operations. Renamed `schedule_reservations` to `_schedule_reservations` to reflect that it is now an internal async helper. All usages in the coordinator were updated. This method is not part of a public API, so renaming it is not expected to impact external users. Testing ======= - Ran existing unit tests covering reservation and acquire/release flows. - Exercised reservation creation/cancel and place acquire/release scenarios manually. - No deadlocks were observed in these runs. - This patch does not add a dedicated stress test, and the current tests do not prove the absence of deadlocks. Performance =========== - Measured `_schedule_reservations` runtime under load. With 100 places and 200 reservations created in parallel, observed runtime was ~0.0013s, with worst case ~0.0036 s. - RPC paths were already invoking reservation scheduling while holding the coordinator lock; the primary change is that the poll-based scheduler path is now also serialized under the same lock. This patch focuses on fixing the scheduler locking model and does not change coordinator scheduling behavior. Signed-off-by: Alex Tercete <alex.tercete@arm.com> Reviewed-by: Alex Tercete <alex.tercete@arm.com> # gatekeeper Co-authored-by: Idan Saadon <idan.saadon@arm.com>
1 parent a5adaa5 commit 4e6d27e

1 file changed

Lines changed: 9 additions & 7 deletions

File tree

labgrid/remote/coordinator.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,9 @@ async def _poll_step_sync_resources(self):
241241

242242
async def _poll_step_schedule(self):
243243
# update reservations
244-
with warn_if_slow("schedule reservations"):
245-
self.schedule_reservations()
244+
async with self.lock:
245+
with warn_if_slow("schedule reservations"):
246+
self._schedule_reservations()
246247

247248
async def poll(self, step_func):
248249
while not self.loop.is_closed():
@@ -884,7 +885,7 @@ async def AcquirePlace(self, request, context):
884885
place.touch()
885886
self._publish_place(place)
886887
self.save_later()
887-
self.schedule_reservations()
888+
self._schedule_reservations()
888889
print(f"{place.name}: place acquired by {place.acquired}")
889890
return labgrid_coordinator_pb2.AcquirePlaceResponse()
890891

@@ -911,7 +912,7 @@ async def ReleasePlace(self, request, context):
911912
place.touch()
912913
self._publish_place(place)
913914
self.save_later()
914-
self.schedule_reservations()
915+
self._schedule_reservations()
915916
print(f"{place.name}: place released")
916917
return labgrid_coordinator_pb2.ReleasePlaceResponse()
917918

@@ -951,11 +952,12 @@ async def GetPlaces(self, unused_request, unused_context):
951952
except Exception:
952953
logging.exception("error during get places")
953954

954-
def schedule_reservations(self):
955+
def _schedule_reservations(self):
955956
# The primary information is stored in the reservations and the places
956957
# only have a copy for convenience.
957958

958959
# expire reservations
960+
assert self.lock.locked()
959961
for res in list(self.reservations.values()):
960962
if res.state is ReservationState.acquired:
961963
# acquired reservations do not expire
@@ -1077,7 +1079,7 @@ async def CreateReservation(self, request: labgrid_coordinator_pb2.CreateReserva
10771079
owner = self.clients[peer].name
10781080
res = Reservation(owner=owner, prio=request.prio, filters=fltrs)
10791081
self.reservations[res.token] = res
1080-
self.schedule_reservations()
1082+
self._schedule_reservations()
10811083
return labgrid_coordinator_pb2.CreateReservationResponse(reservation=res.as_pb2())
10821084

10831085
@locked
@@ -1088,7 +1090,7 @@ async def CancelReservation(self, request: labgrid_coordinator_pb2.CancelReserva
10881090
if token not in self.reservations:
10891091
await context.abort(grpc.StatusCode.FAILED_PRECONDITION, f"Reservation {token} does not exist")
10901092
del self.reservations[token]
1091-
self.schedule_reservations()
1093+
self._schedule_reservations()
10921094
return labgrid_coordinator_pb2.CancelReservationResponse()
10931095

10941096
@locked

0 commit comments

Comments
 (0)