@@ -35,7 +35,8 @@ def __init__(self, callback_class):
3535 self ._can_write = asyncio .Event ()
3636 self ._can_write .set ()
3737
38- self .task = asyncio .create_task (self ._process_queue ())
38+ self ._pause_task = None
39+ self .task = asyncio .create_task (self ._guard_process_queue ())
3940
4041 def connection_made (self , transport ):
4142 self .transport = transport
@@ -56,6 +57,8 @@ def connection_lost(self, exc):
5657 if hasattr (self ._callback , "disconnect" ):
5758 self ._callback .disconnect (self .source )
5859 self .task .cancel ()
60+ if self ._pause_task :
61+ self ._pause_task .cancel ()
5962
6063 async def _check_closed (self ):
6164 while True :
@@ -145,58 +148,39 @@ def receive_data(self, queue, data):
145148
146149 return data .tobytes ()
147150
148- async def _process_queue (self ):
151+ async def _guard_process_queue (self ):
149152 while True :
150- data = await self ._queue .get ()
151-
152- if hasattr (self ._callback , "receive_raw" ):
153- try :
154- if await self ._callback .receive_raw (self .source , data ):
155- continue
156- except SocketClosed :
157- # The other side is closing the connection; it can happen
158- # there is still some writes in the buffer, so force a close
159- # on our side too to free the resources.
160- self .transport .abort ()
161- return
162- except asyncio .CancelledError :
163- # Our coroutine is cancelled, pass it on the the caller.
164- raise
165- except Exception :
166- log .exception ("Internal error: receive_raw triggered an exception" )
167- self .transport .abort ()
168- return
169-
170153 try :
171- packet_type , kwargs = self .receive_packet (self .source , data )
172- except PacketInvalid as err :
173- log .info ("Dropping invalid packet from %s:%d: %r" , self .source .ip , self .source .port , err )
174- self .transport .close ()
175- return
154+ await self ._process_queue ()
176155 except asyncio .CancelledError :
177156 # Our coroutine is cancelled, pass it on the the caller.
178157 raise
179- except Exception :
180- log .exception ("Internal error: receive_packet triggered an exception" )
181- self .transport .abort ()
182- return
183-
184- try :
185- await getattr (self ._callback , f"receive_{ packet_type .name } " )(self .source , ** kwargs )
186158 except SocketClosed :
187159 # The other side is closing the connection; it can happen
188160 # there is still some writes in the buffer, so force a close
189161 # on our side too to free the resources.
190162 self .transport .abort ()
191163 return
192- except asyncio .CancelledError :
193- # Our coroutine is cancelled, pass it on the the caller.
194- raise
195164 except Exception :
196- log .exception (f "Internal error: receive_ { packet_type . name } triggered an exception" )
165+ log .exception ("Internal error: process_queue triggered an exception" )
197166 self .transport .abort ()
198167 return
199168
169+ async def _process_queue (self ):
170+ data = await self ._queue .get ()
171+
172+ if hasattr (self ._callback , "receive_raw" ):
173+ if await self ._callback .receive_raw (self .source , data ):
174+ return
175+
176+ try :
177+ packet_type , kwargs = self .receive_packet (self .source , data )
178+ except PacketInvalid as err :
179+ log .info ("Dropping invalid packet from %s:%d: %r" , self .source .ip , self .source .port , err )
180+ raise SocketClosed
181+
182+ await getattr (self ._callback , f"receive_{ packet_type .name } " )(self .source , ** kwargs )
183+
200184 def receive_packet (self , source , data ):
201185 # Check length of packet
202186 length , data = read_uint16 (data )
0 commit comments