1+ import threading
12import time
23import random
34
@@ -204,6 +205,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
204205 self .retry_task_cb = retry_task_cb
205206 self .idle_cb = idle_cb
206207 self .log_debug_data = log_debug_data
208+
209+ self ._lock = threading .Lock ()
207210 self .accept_task = True
208211 self .keep_running = True
209212 self .processed_tasks = 0
@@ -215,6 +218,7 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
215218 self .last_send_time = self .ssl_sock .create_time
216219 self .life_end_time = self .ssl_sock .create_time + \
217220 random .randint (self .config .connection_max_life , int (self .config .connection_max_life * 1.5 ))
221+ # self.logger.debug("worker.init %s", self.ip_str)
218222
219223 def __str__ (self ):
220224 o = ""
@@ -260,24 +264,30 @@ def update_debug_data(self, rtt, sent, received, speed):
260264 # else:
261265 # self.rtt = rtt
262266
263- # self.log_debug_data(rtt, sent, received)
267+ self .log_debug_data (rtt , sent , received )
264268 return
265269
266270 def close (self , reason ):
267- if not self .keep_running :
268- self .logger .warn ("worker already closed %s" , self .ip_str )
269- return
270-
271- self .accept_task = False
272- self .keep_running = False
273- self .ssl_sock .close ()
274- if reason not in ["idle timeout" , "life end" ]:
275- now = time .time ()
276- inactive_time = now - self .last_recv_time
277- if inactive_time < self .config .http2_ping_min_interval :
278- self .logger .debug ("%s worker close:%s inactive:%d" , self .ip_str , reason , inactive_time )
279- self .ip_manager .report_connect_closed (self .ssl_sock .ip_str , self .ssl_sock .sni , reason )
280- self .close_cb (self )
271+ with self ._lock :
272+ if not self .keep_running :
273+ # self.logger.warn("worker %s already closed %s", self.ip_str, reason)
274+ return
275+
276+ # self.logger.debug("worker.close %s reason:%s", self.ip_str, reason)
277+ self .accept_task = False
278+ self .keep_running = False
279+ self .ssl_sock .close ()
280+ if reason not in ["idle timeout" , "life end" ]:
281+ now = time .time ()
282+ inactive_time = now - self .last_recv_time
283+ if inactive_time < self .config .http2_ping_min_interval :
284+ self .logger .debug ("%s worker close:%s inactive:%d" , self .ip_str , reason , inactive_time )
285+ self .ip_manager .report_connect_closed (self .ssl_sock .ip_str , self .ssl_sock .sni , reason )
286+ self .close_cb (self )
287+
288+ def __del__ (self ):
289+ # self.logger.debug("__del__ %s", self.ip_str)
290+ self .close ("__del__" )
281291
282292 def get_score (self ):
283293 # The smaller, the better
0 commit comments