1919import logging
2020from config import config
2121import threading
22+ import operator
2223
2324current_path = os .path .dirname (os .path .abspath (__file__ ))
2425python_path = os .path .abspath ( os .path .join (current_path , os .pardir , os .pardir , os .pardir , 'python27' , '1.0' ))
4142g_cacertfile = os .path .join (current_path , "cacert.pem" )
4243
4344
45+ class Connect_pool (object ):
46+ pool_lock = threading .Lock ()
47+ not_empty = threading .Condition (pool_lock )
48+ pool = {}
49+
50+ def qsize (self ):
51+ return len (self .pool )
52+
53+ def put (self , item ):
54+ speed , sock = item
55+ self .pool_lock .acquire ()
56+ try :
57+ self .pool [sock ] = speed
58+ self .not_empty .notify ()
59+ finally :
60+ self .pool_lock .release ()
61+
62+ def get (self , block = True ):
63+ self .not_empty .acquire ()
64+ try :
65+ if not block :
66+ if not self .qsize ():
67+ raise
68+ else :
69+ while not self .qsize ():
70+ self .not_empty .wait ()
71+ item = self ._get ()
72+ return item
73+ finally :
74+ self .not_empty .release ()
75+
76+ def get_nowait (self ):
77+ return self .get (False )
78+
79+ def _get (self ):
80+ #pool = sorted(self.pool.items(), key=operator.itemgetter(1))
81+ #k,v = pool[0]
82+ #self.pool.pop(k)
83+ #return (v, k)
84+
85+ fastest_time = 9999
86+ fastest_sock = None
87+ for sock in self .pool :
88+ time = self .pool [sock ]
89+ if time < fastest_time :
90+ fastest_time = time
91+ fastest_sock = sock
92+
93+ self .pool .pop (fastest_sock )
94+ return (fastest_time , fastest_sock )
95+
96+
97+
98+
4499class Https_connection_manager (object ):
45100
46101 thread_num_lock = threading .Lock ()
@@ -102,10 +157,10 @@ def __init__(self):
102157 self .max_retry = 3
103158 self .timeout = 3
104159 self .max_timeout = 5
105- self .max_thread_num = 30
106- self .min_connection_num = 30
160+ self .max_thread_num = 20
161+ self .min_connection_num = 20
107162
108- self .conn_pool = Queue .Queue ()
163+ self .conn_pool = Connect_pool () # Queue.PriorityQueue ()
109164
110165
111166 # set_ciphers as Modern Browsers
@@ -119,8 +174,15 @@ def __init__(self):
119174 if hasattr (OpenSSL .SSL , 'SESS_CACHE_BOTH' ):
120175 self .openssl_context .set_session_cache_mode (OpenSSL .SSL .SESS_CACHE_BOTH )
121176
122- def save_ssl_connection_for_reuse (self , socket ):
123- self .conn_pool .put ( (time .time (), socket ) )
177+ def save_ssl_connection_for_reuse (self , ssl_sock ):
178+ if self .conn_pool .qsize () > 5 :
179+ if ssl_sock .handshake_time > 200 and ssl_sock .create_time - time .time () > 10 :
180+ return
181+ if self .conn_pool .qsize () > self .min_connection_num - 5 :
182+ if ssl_sock .handshake_time > 300 :
183+ return
184+ ssl_sock .last_use_time = time .time ()
185+ self .conn_pool .put ( (ssl_sock .handshake_time , ssl_sock ) )
124186
125187 def create_ssl_connection (self ):
126188
@@ -160,9 +222,12 @@ def _create_ssl_connection(ip_port):
160222 handshake_time = int ((time_handshaked - time_connected ) * 1000 )
161223
162224 google_ip .update_ip (ip , handshake_time )
163-
225+ #logging.debug("create_ssl update ip:%s time:%d", ip, handshake_time)
164226 # sometimes, we want to use raw tcp socket directly(select/epoll), so setattr it to ssl socket.
227+ ssl_sock .ip = ip
165228 ssl_sock .sock = sock
229+ ssl_sock .create_time = time_begin
230+ ssl_sock .handshake_time = handshake_time
166231
167232 # verify SSL certificate issuer.
168233 def check_ssl_cert (ssl_sock ):
@@ -178,9 +243,10 @@ def check_ssl_cert(ssl_sock):
178243
179244 return ssl_sock
180245 except Exception as e :
181- logging .debug ("create_ssl %s fail:%s" , ip , e )
246+ # logging.debug("create_ssl %s fail:%s", ip, e)
182247 google_ip .report_connect_fail (ip )
183248
249+
184250 if ssl_sock :
185251 ssl_sock .close ()
186252 if sock :
@@ -197,10 +263,11 @@ def connect_thread():
197263 break
198264
199265 port = 443
200- logging .debug ("create ssl conn %s" , ip_str )
266+ # logging.debug("create ssl conn %s", ip_str)
201267 ssl_sock = _create_ssl_connection ( (ip_str , port ) )
202268 if ssl_sock :
203- self .conn_pool .put ((time .time (), ssl_sock ))
269+ ssl_sock .last_use_time = time .time ()
270+ self .conn_pool .put ((ssl_sock .handshake_time , ssl_sock ))
204271 finally :
205272 self .thread_num_lock .acquire ()
206273 self .thread_num -= 1
@@ -219,30 +286,31 @@ def create_more_connection():
219286
220287 while True :
221288 try :
222- ctime , sock = self .conn_pool .get_nowait ()
289+ handshake_time , ssl_sock = self .conn_pool .get_nowait ()
223290 except :
224- sock = None
291+ ssl_sock = None
225292 break
226293
227- if time .time () - ctime < 210 : # gws ssl connection can keep for 230s after created
294+ if time .time () - ssl_sock .last_use_time < 210 : # gws ssl connection can keep for 230s after created
295+ #logging.debug("ssl_pool.get:%s handshake:%d", ssl_sock.ip, handshake_time)
228296 break
229297 else :
230- sock .close ()
298+ ssl_sock .close ()
231299 continue
232300
233301 conn_num = self .conn_pool .qsize ()
234- logging .debug ("ssl conn_num:%d" , conn_num )
302+ # logging.debug("ssl conn_num:%d", conn_num)
235303 if conn_num < self .min_connection_num :
236304 create_more_connection ()
237305
238- if sock :
239- return sock
306+ if ssl_sock :
307+ return ssl_sock
240308 else :
241309 try :
242- ctime , sock = self .conn_pool .get ()
243- return sock
310+ handshake_time , ssl_sock = self .conn_pool .get ()
311+ return ssl_sock
244312 except Exception as e :
245- logging .warning ("get ssl_pool err:%s" , e )
313+ logging .error ("get ssl_pool err:%s" , e )
246314 return None
247315
248316
@@ -317,7 +385,9 @@ class Forward_connection_manager():
317385
318386 def create_connection (self , sock_life = 5 , cache_key = None ):
319387 connection_cache_key = cache_key
320- def _create_connection (ip_port , timeout , queobj ):
388+ def _create_connection (ip_port , timeout , queobj , delay = 0 ):
389+ if delay != 0 :
390+ time .sleep (delay )
321391 ip = ip_port [0 ]
322392 sock = None
323393 try :
@@ -339,14 +409,17 @@ def _create_connection(ip_port, timeout, queobj):
339409 # record TCP connection time
340410 conn_time = time .time () - start_time
341411 google_ip .update_ip (ip , conn_time * 2000 )
412+ logging .info ("create_tcp update ip:%s time:%d" , ip , conn_time * 2000 )
413+ logging .debug ("tcp conn %s time:%d" , ip , conn_time * 1000 )
342414
343415 # put ssl socket object to output queobj
344416 queobj .put (sock )
345417 except (socket .error , OSError ) as e :
346418 # any socket.error, put Excpetions to output queobj.
347419 queobj .put (e )
348-
420+ logging . debug ( "tcp conn %s fail" , ip )
349421 google_ip .report_connect_fail (ip )
422+ logging .info ("create_ssl report fail ip:%s" , ip )
350423 if sock :
351424 sock .close ()
352425
@@ -362,7 +435,7 @@ def recycle_connection(count, queobj):
362435 if connection_cache_key :
363436 try :
364437 ctime , sock = self .tcp_connection_cache [connection_cache_key ].get_nowait ()
365- if time .time () - ctime < 5 :
438+ if time .time () - ctime < sock_life :
366439 return sock
367440 except Queue .Empty :
368441 pass
@@ -376,17 +449,16 @@ def recycle_connection(count, queobj):
376449
377450 addrs = addresses
378451 queobj = Queue .Queue ()
452+ delay = 0
379453 for addr in addrs :
380- thread .start_new_thread (_create_connection , (addr , timeout , queobj ))
454+ thread .start_new_thread (_create_connection , (addr , timeout , queobj , delay ))
455+ delay += 0.01
381456 for i in range (len (addrs )):
382457 result = queobj .get ()
383458 if not isinstance (result , (socket .error , OSError )):
384459 thread .start_new_thread (recycle_connection , (len (addrs )- i - 1 , queobj ))
385460 return result
386- else :
387- if i == 0 :
388- # only output first error
389- logging .warning ('create_connection to %s return %r, try again.' , addrs , result )
461+ logging .warning ('create_connection to %s fail.' , addrs )
390462
391463
392464 def forward_socket (self , local , remote , timeout = 60 , tick = 2 , bufsize = 8192 ):
@@ -424,5 +496,38 @@ def forward_socket(self, local, remote, timeout=60, tick=2, bufsize=8192):
424496 logging .debug ("forward closed." )
425497
426498
499+
427500https_manager = Https_connection_manager ()
428501forwork_manager = Forward_connection_manager ()
502+
503+
504+ def test_pool ():
505+ pool = Connect_pool ()
506+ pool .put ((3 , "c" ))
507+ pool .put ((1 , "a" ))
508+ pool .put ((2 , "b" ))
509+
510+ t , s = pool .get ()
511+ print s
512+
513+ t , s = pool .get ()
514+ print s
515+
516+ t , s = pool .get ()
517+ print s
518+
519+
520+ def test_pool_speed ():
521+ pool = Connect_pool ()
522+ for i in range (100 ):
523+ pool .put ((i , "%d" % i ))
524+
525+ start = time .time ()
526+ t , s = pool .get ()
527+ print time .time () - start
528+ print s
529+ # sort time is 5ms for 10000
530+ # sort time is 0ms for 100
531+
532+ if __name__ == "__main__" :
533+ test_pool_speed ()
0 commit comments