@@ -392,6 +392,10 @@ def get_path(self, path):
392392
393393 def get (self , path , worker_id , default = None ):
394394 # type: (six.text_type, six.text_type, Optional[object]) -> Optional[object]
395+
396+ if path not in self .__paths :
397+ return default
398+
395399 return self .__paths [path ].get (worker_id , default )
396400
397401 def complement_keys (self , path , worker_ids ):
@@ -435,7 +439,13 @@ def keys(self):
435439
436440 def pop (self , path , worker_id , default = None ):
437441 # type: (six.text_type, six.text_type, Optional[object]) -> Optional[object]
438- return self .__paths [path ].pop (worker_id , default )
442+
443+ value = self .__paths [path ].pop (worker_id , default )
444+
445+ if not self .__paths [path ]:
446+ self .__paths .pop (path )
447+
448+ return value
439449
440450 def __len__ (self ):
441451 return len (list (self .keys ()))
@@ -509,6 +519,8 @@ def __init__(self, configuration, monitors):
509519 # log file has been processed yet
510520 self .__logs_pending_removal = PathWorkerIdDict ()
511521
522+ self .__logs_matcher_finishing = PathWorkerIdDict ()
523+
512524 # Dict[Tuple(str, str), Dict]
513525 # # a dict of (log path (str), processing CopyingManagerWorker ID (str)) -> Log config (Dict)
514526 # a dict of log_configs keyed by log_path for logs with configs that need to be reloaded.
@@ -530,7 +542,7 @@ def __init__(self, configuration, monitors):
530542
531543 # A lock that protects the status variables and the __log_matchers variable, the only variables that
532544 # are access in generate_status() which needs to be thread safe.
533- self .__lock = threading .Lock ()
545+ self .__lock = threading .RLock ()
534546
535547 # The last time we scanned for new files that match the __log_matchers.
536548 self .__last_new_file_scan_time = 0
@@ -694,9 +706,23 @@ def add_log_config(self, monitor_name, log_config, force_add=False):
694706 )
695707 self .__logs_pending_removal .pop (path , worker_id )
696708 return log_config
709+ elif force_add and self .__logs_matcher_finishing .contains (path , worker_id ):
710+ log .log (
711+ scalyr_logging .DEBUG_LEVEL_0 ,
712+ "Tried to add new log file (path='%s', worker_id='%s') for monitor '%s', but it is already being monitored by '%s' "
713+ "and scheduled for removal. Canceling scheduled removal and ensuring log file is continue "
714+ "to be monitored."
715+ % (
716+ path ,
717+ worker_id ,
718+ monitor_name ,
719+ self .__dynamic_paths .get (path , worker_id ),
720+ ),
721+ )
697722
723+ self .__remove_log_path_one_worker (monitor_name , path , worker_id )
698724 # Make sure the path isn't already being dynamically monitored
699- if self .__dynamic_paths .contains (path , worker_id ):
725+ elif self .__dynamic_paths .contains (path , worker_id ):
700726 log .log (
701727 scalyr_logging .DEBUG_LEVEL_0 ,
702728 "Tried to add new log file '%s' for monitor '%s', but it is already being monitored by '%s'"
@@ -861,6 +887,7 @@ def __remove_log_path_one_worker(self, monitor_name, log_path, worker_id):
861887
862888 self ._log_matchers [:] = matchers
863889 self .__logs_pending_removal .pop (log_path , worker_id )
890+ self .__logs_matcher_finishing .pop (log_path , worker_id )
864891 self .__logs_pending_reload .pop (log_path , worker_id )
865892 self .__dynamic_paths .pop (log_path , worker_id )
866893
@@ -1323,10 +1350,7 @@ def __remove_logs_scheduled_for_deletion(self):
13231350 # so we can iterate without a lock (remove_log_path also acquires the lock so best
13241351 # not to do that while the lock is already aquired
13251352 self .__lock .acquire ()
1326- try :
1327- pending_removal = self .__logs_pending_removal .copy ()
1328- finally :
1329- self .__lock .release ()
1353+ pending_removal = self .__logs_pending_removal .copy ()
13301354
13311355 # if we have a log matcher for the path, then set it to finished
13321356 for path , worker_id in pending_removal .keys ():
@@ -1336,13 +1360,11 @@ def __remove_logs_scheduled_for_deletion(self):
13361360 continue
13371361
13381362 matcher .finish ()
1363+ self .__logs_matcher_finishing .set (path , worker_id , matcher )
13391364
13401365 # remove from list of logs pending removal
1341- self .__lock .acquire ()
1342- try :
1343- self .__logs_pending_removal = PathWorkerIdDict ()
1344- finally :
1345- self .__lock .release ()
1366+ self .__logs_pending_removal = PathWorkerIdDict ()
1367+ self .__lock .release ()
13461368
13471369 def __purge_finished_log_matchers (self ):
13481370 # type: () -> int
@@ -1360,6 +1382,7 @@ def __purge_finished_log_matchers(self):
13601382 if m .is_finished ():
13611383 self .__remove_log_path_one_worker (SCHEDULED_DELETION , path , worker_id )
13621384 self .__dynamic_matchers .pop (path , worker_id , None )
1385+ self .__logs_matcher_finishing .pop (path , worker_id )
13631386 removed += 1
13641387
13651388 return removed
0 commit comments