Skip to content

handle distributed.core.Server startup and shutdown excellently #6616

Open
@graingert

Description

@graingert

Server startup and shutdown is currently confusing and error prone see #6615 and a8244bd (#6603)

patterns evolving concurrent and re-entrant close and cancellation are prone to deadlocks:

try:
    self.comm.read()  # close call cancels this task and waits for this task to finish
finally:
    await self.close()  # this waits for close to finish

eg

@fail_hard
async def handle_scheduler(self, comm: Comm) -> None:
await self.handle_stream(comm)
logger.info(
"Connection to scheduler broken. Closing without reporting. ID: %s Address %s Status: %s",
self.id,
self.address,
self.status,
)
await self.close()

I think a pattern where only the task that calls async with Server(...): ... are allowed to call await self.finish() or await self.close()

a sketch here

class Server:
    def __init__(self):
        self.__close_done = asyncio.Event()
        self.__start_event = asyncio.Event()
        self.__close_event = asyncio.Event()
        
    def request_close(self):
        self.__start_event.set()
        self.__close_event.set()

    async def __lifecycle(self):
        try:
            await self.__start_event.wait()
            async with self.listen(), self.open_rpc_pool():
                try:
                    await self.start()
                    self.__start_event.set()
                    await self.__close_event.wait()
                    await self.close()
                finally:
                    v = self.abort()  # abort comms by calling socket.close()
                    assert v is None  # abort must not be an async def
        finally:
            self.__close_done.set()
            
    async def __aenter__(self):
        self.__parent_task = asyncio.current_task()
        self.__lifecyle_task = asyncio.create_task(self.__lifecyle())
        await self.__start_event.wait()
        
    def __await__(self):
        warnings.warn("await Server() is deprecated, use async with Server()")
        # ??? some background task magic here
        
    async def __aexit__(self):
        self.request_close()
        await self.finished()
        
    async def close(self):
        try:
            assert asyncio.current_task() is self.__lifecyle_task
            # close comms, wait for tasks to cancel
        finally:
            v = self.abort()  # abort comms by calling socket.close()
            assert v is None
        
    async def finished(self):
        assert asyncio.current_task() is self.__parent_task
        await self.__close_done.wait()

Metadata

Metadata

Assignees

No one assigned

    Labels

    asynciodeprecationSomething is being removedenhancementImprove existing functionality or make things work betterstabilityIssue or feature related to cluster stability (e.g. deadlock)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions