@@ -301,35 +301,41 @@ def __init__( # noqa: PLR0913
301
301
}
302
302
self .actor_ref = self .protocol .start (** protocol_kwargs )
303
303
304
- async def serve (self ) -> None :
305
- while not self .stopping :
306
- try :
307
- task = asyncio .create_task (self ._loop .sock_recv (self ._sock , 4096 ))
308
- done , _ = await asyncio .wait (
309
- {task }, timeout = self .timeout , return_when = asyncio .FIRST_COMPLETED
310
- )
311
-
312
- if not done :
313
- self .stop (
314
- f"Client inactive for { self .timeout :d} s; closing connection"
315
- )
316
- return
317
-
318
- data = task .result ()
319
- except OSError as exc :
320
- if exc .errno in (errno .EWOULDBLOCK , errno .EINTR ):
321
- continue
322
- self .stop (f"Unexpected client error: { exc } " )
323
- return
304
+ async def recv (self ) -> bool :
305
+ try :
306
+ task = asyncio .create_task (self ._loop .sock_recv (self ._sock , 4096 ))
307
+ tasks , _ = await asyncio .wait (
308
+ [task ], timeout = self .timeout , return_when = asyncio .FIRST_COMPLETED
309
+ )
324
310
325
- if not data :
326
- self .actor_ref . tell ({ "close" : True } )
327
- return
311
+ if not ( tasks and task in tasks ) :
312
+ self .stop ( f"Client inactive for { self . timeout :d } s; closing connection" )
313
+ return False
328
314
329
- try :
330
- self .actor_ref .tell ({"received" : data })
331
- except pykka .ActorDeadError :
332
- self .stop ("Actor is dead." )
315
+ data = task .result ()
316
+ except OSError as exc :
317
+ if exc .errno in (errno .EWOULDBLOCK , errno .EINTR ):
318
+ return True
319
+ self .stop (f"Unexpected client error: { exc } " )
320
+ return False
321
+
322
+ if not data :
323
+ self .actor_ref .tell ({"close" : True })
324
+ return False
325
+
326
+ try :
327
+ self .actor_ref .tell ({"received" : data })
328
+ except pykka .ActorDeadError :
329
+ self .stop ("Actor is dead." )
330
+ return False
331
+
332
+ return True
333
+
334
+ async def serve (self ) -> None :
335
+ while not self .stopping :
336
+ should_continue = await self .recv ()
337
+ if not should_continue :
338
+ break
333
339
334
340
def stop (self , reason : str , level : int = logging .DEBUG ) -> None :
335
341
if self .stopping :
@@ -353,13 +359,13 @@ async def queue_send(self, data: bytes) -> None:
353
359
await asyncio .wait ({task }, timeout = self .timeout )
354
360
self .send_buffer = b""
355
361
356
- async def send (self , data : bytes ):
362
+ async def send (self , data : bytes ) -> None :
357
363
"""Send data to client."""
358
364
try :
359
365
await self ._loop .sock_sendall (self ._sock , data )
360
366
except OSError as exc :
361
367
if exc .errno in (errno .EWOULDBLOCK , errno .EINTR ):
362
- return data
368
+ return
363
369
self .stop (f"Unexpected client error: { exc } " )
364
370
365
371
def timeout_callback (self ) -> bool :
0 commit comments