Skip to content

Commit 1169700

Browse files
alexterceteidan-arm
andcommitted
remote/coordinator: run reservation scheduling under lock
Reservation scheduling mutates shared coordinator state (`reservations`, `places`, and the derived `place.reservation` view), but the periodic scheduler path was performing these updates without holding the coordinator lock. This can lead to races between the poll-based scheduler and concurrent RPC handlers (e.g. acquire/release/reservation operations), potentially resulting in inconsistent state or incorrect reservation transitions. We now run reservation scheduling under the coordinator lock by making it async and awaiting it in the poll path. This ensures that all mutations of coordinator state are serialized consistently with the rest of the coordinator operations. Renamed `schedule_reservations` to `_schedule_reservations` to reflect that it is now an internal async helper. All usages in the coordinator were updated. This method is not part of a public API, so renaming it is not expected to impact external users. Testing ======= - Ran existing unit tests covering reservation and acquire/release flows. - Exercised reservation creation/cancel and place acquire/release scenarios manually. - No deadlocks were observed in these runs. - This patch does not add a dedicated stress test, and the current tests do not prove the absence of deadlocks. Performance =========== - Measured `_schedule_reservations` runtime under load. With 100 places and 200 reservations created in parallel, observed runtime was ~0.0013s, with worst case ~0.0036 s. - RPC paths were already invoking reservation scheduling while holding the coordinator lock; the primary change is that the poll-based scheduler path is now also serialized under the same lock. This patch focuses on fixing the scheduler locking model and does not change coordinator scheduling behavior. Signed-off-by: Alex Tercete <alex.tercete@arm.com> Reviewed-by: Alex Tercete <alex.tercete@arm.com> # gatekeeper Co-authored-by: Idan Saadon <idan.saadon@arm.com>
1 parent c027b2d commit 1169700

1 file changed

Lines changed: 9 additions & 7 deletions

File tree

labgrid/remote/coordinator.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ async def _poll_step_sync_resources(self):
240240

241241
async def _poll_step_schedule(self):
242242
# update reservations
243-
with warn_if_slow("schedule reservations"):
244-
self.schedule_reservations()
243+
async with self.lock:
244+
with warn_if_slow("schedule reservations"):
245+
await self._schedule_reservations()
245246

246247
async def poll(self, step_func):
247248
while not self.loop.is_closed():
@@ -883,7 +884,7 @@ async def AcquirePlace(self, request, context):
883884
place.touch()
884885
self._publish_place(place)
885886
self.save_later()
886-
self.schedule_reservations()
887+
await self._schedule_reservations()
887888
print(f"{place.name}: place acquired by {place.acquired}")
888889
return labgrid_coordinator_pb2.AcquirePlaceResponse()
889890

@@ -910,7 +911,7 @@ async def ReleasePlace(self, request, context):
910911
place.touch()
911912
self._publish_place(place)
912913
self.save_later()
913-
self.schedule_reservations()
914+
await self._schedule_reservations()
914915
print(f"{place.name}: place released")
915916
return labgrid_coordinator_pb2.ReleasePlaceResponse()
916917

@@ -950,11 +951,12 @@ async def GetPlaces(self, unused_request, unused_context):
950951
except Exception:
951952
logging.exception("error during get places")
952953

953-
def schedule_reservations(self):
954+
async def _schedule_reservations(self):
954955
# The primary information is stored in the reservations and the places
955956
# only have a copy for convenience.
956957

957958
# expire reservations
959+
assert self.lock.locked()
958960
for res in list(self.reservations.values()):
959961
if res.state is ReservationState.acquired:
960962
# acquired reservations do not expire
@@ -1076,7 +1078,7 @@ async def CreateReservation(self, request: labgrid_coordinator_pb2.CreateReserva
10761078
owner = self.clients[peer].name
10771079
res = Reservation(owner=owner, prio=request.prio, filters=fltrs)
10781080
self.reservations[res.token] = res
1079-
self.schedule_reservations()
1081+
await self._schedule_reservations()
10801082
return labgrid_coordinator_pb2.CreateReservationResponse(reservation=res.as_pb2())
10811083

10821084
@locked
@@ -1087,7 +1089,7 @@ async def CancelReservation(self, request: labgrid_coordinator_pb2.CancelReserva
10871089
if token not in self.reservations:
10881090
await context.abort(grpc.StatusCode.FAILED_PRECONDITION, f"Reservation {token} does not exist")
10891091
del self.reservations[token]
1090-
self.schedule_reservations()
1092+
await self._schedule_reservations()
10911093
return labgrid_coordinator_pb2.CancelReservationResponse()
10921094

10931095
@locked

0 commit comments

Comments
 (0)