2626from __future__ import print_function
2727
2828import queue
29- from concurrent .futures import ThreadPoolExecutor , CancelledError
30- from socketserver import ThreadingMixIn
3129
3230from scalyr_agent import compat
3331
5654from six .moves import range
5755import six .moves .socketserver
5856
59- from scalyr_agent .builtin_monitors .thread_pool import ExecutorMixIn
57+ if six .PY2 :
58+ from scalyr_agent .builtin_monitors .thread_pool_dummy import ExecutorMixIn
59+ else :
60+ from scalyr_agent .builtin_monitors .thread_pool import ExecutorMixIn
6061
6162try :
6263 # Only available for python >= 3.6
@@ -591,6 +592,7 @@ def __init__(self, request_processing_executor, request, client_address, server)
591592 else :
592593 six .moves .socketserver .BaseRequestHandler .__init__ (self , request , client_address , server )
593594
595+ @staticmethod
594596 def factory_method (request_processing_executor ):
595597 def create_handler (request , client_address , server ):
596598 return SyslogUDPHandler (request_processing_executor , request , client_address , server )
@@ -606,9 +608,13 @@ def handle(self):
606608 "destport" : self .server .server_address [1 ],
607609 }
608610
609- self .__request_processing_executor .submit (
610- self .server .syslog_handler .handle , data , extra
611- )
611+ if self .__request_processing_executor :
612+ self .__request_processing_executor .submit (
613+ self .server .syslog_handler .handle , data , extra
614+ )
615+ else :
616+ self .server .syslog_handler .handle (data , extra )
617+
612618
613619
614620class SocketNotReadyException (Exception ):
@@ -636,7 +642,9 @@ def read(self):
636642 if not data :
637643 self .is_closed = True
638644 raise SocketClosed ()
639- except (socket .timeout , * NON_BLOCKING_SOCKET_DATA_NOT_READY_EXCEPTIONS ) as e :
645+ except socket .timeout as e :
646+ raise SocketNotReadyException (e )
647+ except NON_BLOCKING_SOCKET_DATA_NOT_READY_EXCEPTIONS as e :
640648 raise SocketNotReadyException (e )
641649 except socket .error as e :
642650 if e .errno == errno .EAGAIN :
@@ -980,8 +988,6 @@ class SyslogTCPHandler(six.moves.socketserver.BaseRequestHandler):
980988
981989 def __init__ (self , * args , ** kwargs ):
982990 self .__request_processing_executor = kwargs .pop ("request_processing_executor" , None )
983- if not self .__request_processing_executor :
984- raise ValueError ("request_processing_executor is required" )
985991
986992 self .request_parser = kwargs .pop ("request_parser" , "default" )
987993 self .incomplete_frame_timeout = kwargs .pop ("incomplete_frame_timeout" , None )
@@ -1077,25 +1083,30 @@ def handle(self):
10771083 )
10781084
10791085 try :
1080- # Worker is responsible for processing the data read from one request.
1081- # Using the queue ensures the data is processed in the order it was read.
1082- DONE = "DONE"
1083- work_queue = queue .Queue ()
1084- def worker (queue , is_shutdown ):
1085- data = queue .get (block = True )
1086- while data != DONE :
1087- if is_shutdown ():
1088- global_log .info ("ThreadPool shutting down, skipping further request processing." )
1089- break
1090- self .__request_data_process (syslog_parser , data )
1086+ if self .__request_processing_executor :
1087+ # Worker is responsible for processing the data read from one request.
1088+ # Using the queue ensures the data is processed in the order it was read.
1089+ DONE = "DONE"
1090+ work_queue = queue .Queue ()
1091+ def worker (queue , is_shutdown ):
10911092 data = queue .get (block = True )
1093+ while data != DONE :
1094+ if is_shutdown ():
1095+ global_log .info ("ThreadPool shutting down, skipping further request processing." )
1096+ break
1097+ self .__request_data_process (syslog_parser , data )
1098+ data = queue .get (block = True )
10921099
1093- self .__request_processing_executor .submit (worker , work_queue , lambda : self .__request_processing_executor ._shutdown )
1100+ self .__request_processing_executor .submit (worker , work_queue , lambda : self .__request_processing_executor ._shutdown )
10941101
1095- for data in self .__request_stream_read (syslog_request , self .server .is_running ):
1096- work_queue .put (data )
1102+ for data in self .__request_stream_read (syslog_request , self .server .is_running ):
1103+ work_queue .put (data )
1104+
1105+ work_queue .put (DONE )
1106+ else :
1107+ for data in self .__request_stream_read (syslog_request , self .server .is_running ):
1108+ self .__request_data_process (syslog_parser , data )
10971109
1098- work_queue .put (DONE )
10991110
11001111 except Exception as e :
11011112 global_log .warning (
0 commit comments