@@ -189,12 +189,14 @@ class TaskScheduler(threading.Thread):
189189
190190 __slots__ = (
191191 ' _task_manager' ,
192+ ' _mutex_lock' ,
192193 ' _shutdown' ,
193194 ' _task_queue'
194195 )
195196
196197 def __init__ (self , task_manager ):
197198 self ._task_manager = task_manager
199+ self ._mutex_lock = threading.RLock()
198200
199201 self ._shutdown = False
200202 self ._task_queue = collections.deque()
@@ -300,21 +302,23 @@ class TaskScheduler(threading.Thread):
300302 return
301303 elif result == TASK_WAIT:
302304 task.can_delay = True
305+ # since this task is delayed, we need to keep it on it's original
306+ # thread; otherwise our thread will be killed and the task will
307+ # float in oblivion forever...
308+ self ._task_queue.append(task)
303309 elif result == TASK_CONT:
304310 task.can_delay = False
311+ # add this task back to the task manager's queue so it can
312+ # be ran on another thread if another one is available and has
313+ # even less currently running tasks on it...
314+ self ._task_manager.task_queue.append(task)
305315 else :
306316 # check to see if we got any other result than what we
307317 # are expecting, tasks do not return values when they are called
308318 # like a normal function... So we should never expect this to be the case.
309319 raise TaskError(' Got invalid result <%r > when running task <%s >!' % (
310320 result, task.name))
311321
312- # the task want's to be placed back into the queue,
313- # instead of waiting for a new scheduler to be created,
314- # let's just add this task back to our own scheduler so we
315- # can save time between each execution...
316- self ._task_manager.task_queue.append(task)
317-
318322 # finally let's check to see if we have any tasks remaining
319323 # in the task queue, if we do not; then this means we have
320324 # served our purpose and is no longer needed...
@@ -329,16 +333,20 @@ class TaskScheduler(threading.Thread):
329333 """
330334
331335 while not self ._shutdown:
332- try :
333- self .update()
334- except (KeyboardInterrupt , SystemExit ):
335- break
336+ # acquire the mutex lock so we don't change the queue
337+ # during it's iteration on the other thread...
338+ with self ._mutex_lock:
339+ try :
340+ self .update()
341+ except (KeyboardInterrupt , SystemExit ):
342+ break
336343
337344 thread_wait()
338345
339346 self ._task_manager.remove_scheduler(self )
340347
341348 def __del__ (self ):
349+ self ._mutex_lock = None
342350 self ._task_queue = None
343351
344352
0 commit comments