diff --git a/luigi/format.py b/luigi/format.py index 0a4afe116f..aaceb358f7 100644 --- a/luigi/format.py +++ b/luigi/format.py @@ -454,32 +454,41 @@ class NewlineFormat(WrappedFormat): wrapper_cls = NewlineWrapper -class GzipFormat(Format): +class CompressionFormat(Format): input = "bytes" output = "bytes" - def __init__(self, compression_level=None): + def __init__(self, decompress_cmd, compress_cmd, compression_level=None): + self.decompress_cmd = decompress_cmd + self.compress_cmd = compress_cmd self.compression_level = compression_level def pipe_reader(self, input_pipe): - return InputPipeProcessWrapper(["gunzip"], input_pipe) + return InputPipeProcessWrapper(self.decompress_cmd, input_pipe) def pipe_writer(self, output_pipe): - args = ["gzip"] + args = list(self.compress_cmd) if self.compression_level is not None: args.append("-" + str(int(self.compression_level))) return OutputPipeProcessWrapper(args, output_pipe) -class Bzip2Format(Format): - input = "bytes" - output = "bytes" +class GzipFormat(CompressionFormat): + def __init__(self, compression_level=None): + super().__init__( + decompress_cmd=["gunzip"], + compress_cmd=["gzip"], + compression_level=compression_level, + ) - def pipe_reader(self, input_pipe): - return InputPipeProcessWrapper(["bzcat"], input_pipe) - def pipe_writer(self, output_pipe): - return OutputPipeProcessWrapper(["bzip2"], output_pipe) +class Bzip2Format(CompressionFormat): + def __init__(self, compression_level=None): + super().__init__( + decompress_cmd=["bzcat"], + compress_cmd=["bzip2"], + compression_level=compression_level, + ) Text = TextFormat() diff --git a/luigi/scheduler.py b/luigi/scheduler.py index c3fef1040a..1f5291fb99 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -858,11 +858,15 @@ def add_task( task = self._state.get_task(task_id, setdefault=_default_task) - if task is None or (task.status != RUNNING and not worker.enabled): + task_does_not_exist = task is None + worker_disabled_and_task_not_running = task is not None and task.status != RUNNING and not worker.enabled + if task_does_not_exist or worker_disabled_and_task_not_running: return - # Ignore claims that the task is PENDING if it very recently was marked as DONE. - if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs: + claimed_pending = status == PENDING + currently_done = task.status == DONE + within_cooldown = (time.time() - task.updated) < self._config.stable_done_cooldown_secs + if claimed_pending and currently_done and within_cooldown: return # for setting priority, we'll sometimes create tasks with unset family and params diff --git a/luigi/worker.py b/luigi/worker.py index 017e44d744..2c4a311423 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -169,62 +169,62 @@ def _run_get_new_deps(self): # get the next generator result next_send = requires.paths + def _execute_external_task(self): + if self.check_complete(self.task): + return DONE, "", [] + return FAILED, "Task is an external data dependency and data does not exist (yet?).", [] + + def _execute_regular_task(self): + with self._forward_attributes(): + new_deps = self._run_get_new_deps() + if not new_deps: + if not self.check_complete_on_run: + if self.task_completion_cache is not None: + self.task_completion_cache[self.task.task_id] = True + return DONE, "", [] + elif self.check_complete(self.task): + return DONE, "", [] + else: + raise TaskException("Task finished running, but complete() is still returning false.") + return PENDING, "", new_deps + + def _check_unfulfilled_dependencies(self): + if not self.check_unfulfilled_deps or _is_external(self.task): + return [] + missing = [] + for dep in self.task.deps(): + if not self.check_complete(dep): + nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()] + if nonexistent_outputs: + missing.append(f"{dep.task_id} ({', '.join(map(str, nonexistent_outputs))})") + else: + missing.append(dep.task_id) + return missing + def run(self): logger.info("[pid %s] Worker %s running %s", os.getpid(), self.worker_id, self.task) if self.use_multiprocessing: - # Need to have different random seeds if running in separate processes - processID = os.getpid() - currentTime = time.time() - random.seed(processID * currentTime) + process_id = os.getpid() + current_time = time.time() + random.seed(process_id * current_time) status = FAILED expl = "" missing = [] new_deps = [] try: - # Verify that all the tasks are fulfilled! For external tasks we - # don't care about unfulfilled dependencies, because we are just - # checking completeness of self.task so outputs of dependencies are - # irrelevant. - if self.check_unfulfilled_deps and not _is_external(self.task): - missing = [] - for dep in self.task.deps(): - if not self.check_complete(dep): - nonexistent_outputs = [output for output in flatten(dep.output()) if not output.exists()] - if nonexistent_outputs: - missing.append(f"{dep.task_id} ({', '.join(map(str, nonexistent_outputs))})") - else: - missing.append(dep.task_id) - if missing: - deps = "dependency" if len(missing) == 1 else "dependencies" - raise RuntimeError("Unfulfilled %s at run time: %s" % (deps, ", ".join(missing))) + missing = self._check_unfulfilled_dependencies() + if missing: + deps = "dependency" if len(missing) == 1 else "dependencies" + raise RuntimeError("Unfulfilled %s at run time: %s" % (deps, ", ".join(missing))) self.task.trigger_event(Event.START, self.task) t0 = time.time() - status = None if _is_external(self.task): - # External task - if self.check_complete(self.task): - status = DONE - else: - status = FAILED - expl = "Task is an external data dependency and data does not exist (yet?)." + status, expl, new_deps = self._execute_external_task() else: - with self._forward_attributes(): - new_deps = self._run_get_new_deps() - if not new_deps: - if not self.check_complete_on_run: - # update the cache - if self.task_completion_cache is not None: - self.task_completion_cache[self.task.task_id] = True - status = DONE - elif self.check_complete(self.task): - status = DONE - else: - raise TaskException("Task finished running, but complete() is still returning false.") - else: - status = PENDING + status, expl, new_deps = self._execute_regular_task() if new_deps: logger.info("[pid %s] Worker %s new requirements %s", os.getpid(), self.worker_id, self.task) @@ -535,6 +535,100 @@ def rpc_message_callback(fn): return fn +def _validate_dependency(dependency): + if isinstance(dependency, Target): + raise Exception("requires() can not return Target objects. Wrap it in an ExternalTask class") + elif not isinstance(dependency, Task): + raise Exception("requires() must return Task objects but {} is a {}".format(dependency, type(dependency))) + + +def _check_complete_value(is_complete): + if is_complete not in (True, False): + if isinstance(is_complete, TracebackWrapper): + raise AsyncCompletionException(is_complete.trace) + raise Exception("Return value of Task.complete() must be boolean (was %r)" % is_complete) + + +class WorkerNotifier: + """Handles all notification and error-email responsibilities for a Worker.""" + + def __init__(self, scheduler, worker_id, host, config): + self._scheduler = scheduler + self._worker_id = worker_id + self.host = host + self._config = config + + def log_complete_error(self, task, tb): + log_msg = "Will not run {task} or any dependencies due to error in complete() method:\n{tb}".format(task=task, tb=tb) + logger.warning(log_msg) + + def log_dependency_error(self, task, tb): + log_msg = "Will not run {task} or any dependencies due to error in deps() method:\n{tb}".format(task=task, tb=tb) + logger.warning(log_msg) + + def log_unexpected_error(self, task): + logger.exception("Luigi unexpected framework error while scheduling %s", task) + + def announce_scheduling_failure(self, task, expl): + try: + self._scheduler.announce_scheduling_failure( + worker=self._worker_id, + task_name=str(task), + family=task.task_family, + params=task.to_str_params(only_significant=True), + expl=expl, + owners=task._owner_list(), + ) + except Exception: + formatted_traceback = traceback.format_exc() + self.email_unexpected_error(task, formatted_traceback) + raise + + def email_complete_error(self, task, formatted_traceback): + self.announce_scheduling_failure(task, formatted_traceback) + if self._config.send_failure_email: + self._email_error( + task, + formatted_traceback, + subject="Luigi: {task} failed scheduling. Host: {host}", + headline="Will not run {task} or any dependencies due to error in complete() method", + ) + + def email_dependency_error(self, task, formatted_traceback): + self.announce_scheduling_failure(task, formatted_traceback) + if self._config.send_failure_email: + self._email_error( + task, + formatted_traceback, + subject="Luigi: {task} failed scheduling. Host: {host}", + headline="Will not run {task} or any dependencies due to error in deps() method", + ) + + def email_unexpected_error(self, task, formatted_traceback): + self._email_error( + task, + formatted_traceback, + subject="Luigi: Framework error while scheduling {task}. Host: {host}", + headline="Luigi framework error", + ) + + def email_task_failure(self, task, formatted_traceback): + if self._config.send_failure_email: + self._email_error( + task, + formatted_traceback, + subject="Luigi: {task} FAILED. Host: {host}", + headline="A task failed when running. Most likely run() raised an exception.", + ) + + def _email_error(self, task, formatted_traceback, subject, headline): + formatted_subject = subject.format(task=task, host=self.host) + formatted_headline = headline.format(task=task, host=self.host) + command = subprocess.list2cmdline(sys.argv) + message = notifications.format_task_error(formatted_headline, task, command, formatted_traceback) + notifications.send_error_email(formatted_subject, message, task.owner_email) + + class Worker: """ Worker object communicates with a scheduler. @@ -565,6 +659,7 @@ def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant self._stop_requesting_work = False self.host = socket.gethostname() + self._notifier = WorkerNotifier(scheduler, self._id, self.host, self._config) self._scheduled_tasks = {} self._suspended_tasks = {} self._batch_running_tasks = {} @@ -681,79 +776,6 @@ def _validate_task(self, task): # we can't get the repr of it since it's not initialized... raise TaskException("Task of class %s not initialized. Did you override __init__ and forget to call super(...).__init__?" % task.__class__.__name__) - def _log_complete_error(self, task, tb): - log_msg = "Will not run {task} or any dependencies due to error in complete() method:\n{tb}".format(task=task, tb=tb) - logger.warning(log_msg) - - def _log_dependency_error(self, task, tb): - log_msg = "Will not run {task} or any dependencies due to error in deps() method:\n{tb}".format(task=task, tb=tb) - logger.warning(log_msg) - - def _log_unexpected_error(self, task): - logger.exception("Luigi unexpected framework error while scheduling %s", task) # needs to be called from within except clause - - def _announce_scheduling_failure(self, task, expl): - try: - self._scheduler.announce_scheduling_failure( - worker=self._id, - task_name=str(task), - family=task.task_family, - params=task.to_str_params(only_significant=True), - expl=expl, - owners=task._owner_list(), - ) - except Exception: - formatted_traceback = traceback.format_exc() - self._email_unexpected_error(task, formatted_traceback) - raise - - def _email_complete_error(self, task, formatted_traceback): - self._announce_scheduling_failure(task, formatted_traceback) - if self._config.send_failure_email: - self._email_error( - task, - formatted_traceback, - subject="Luigi: {task} failed scheduling. Host: {host}", - headline="Will not run {task} or any dependencies due to error in complete() method", - ) - - def _email_dependency_error(self, task, formatted_traceback): - self._announce_scheduling_failure(task, formatted_traceback) - if self._config.send_failure_email: - self._email_error( - task, - formatted_traceback, - subject="Luigi: {task} failed scheduling. Host: {host}", - headline="Will not run {task} or any dependencies due to error in deps() method", - ) - - def _email_unexpected_error(self, task, formatted_traceback): - # this sends even if failure e-mails are disabled, as they may indicate - # a more severe failure that may not reach other alerting methods such - # as scheduler batch notification - self._email_error( - task, - formatted_traceback, - subject="Luigi: Framework error while scheduling {task}. Host: {host}", - headline="Luigi framework error", - ) - - def _email_task_failure(self, task, formatted_traceback): - if self._config.send_failure_email: - self._email_error( - task, - formatted_traceback, - subject="Luigi: {task} FAILED. Host: {host}", - headline="A task failed when running. Most likely run() raised an exception.", - ) - - def _email_error(self, task, formatted_traceback, subject, headline): - formatted_subject = subject.format(task=task, host=self.host) - formatted_headline = headline.format(task=task, host=self.host) - command = subprocess.list2cmdline(sys.argv) - message = notifications.format_task_error(formatted_headline, task, command, formatted_traceback) - notifications.send_error_email(formatted_subject, message, task.owner_email) - def _handle_task_load_error(self, exception, task_ids): msg = "Cannot find task(s) sent by scheduler: {}".format(",".join(task_ids)) logger.exception(msg) @@ -806,9 +828,9 @@ def add(self, task, multiprocess=False, processes=0): except Exception as ex: self.add_succeeded = False formatted_traceback = traceback.format_exc() - self._log_unexpected_error(task) + self._notifier.log_unexpected_error(task) task.trigger_event(Event.BROKEN_TASK, task, ex) - self._email_unexpected_error(task, formatted_traceback) + self._notifier.email_unexpected_error(task, formatted_traceback) raise finally: pool.close() @@ -829,73 +851,64 @@ def _add_task_batcher(self, task): ) self._batch_families_sent.add(family) - def _add(self, task, is_complete): + def _determine_task_status(self, task, is_complete): if self._config.task_limit is not None and len(self._scheduled_tasks) >= self._config.task_limit: logger.warning("Will not run %s or any dependencies due to exceeded task-limit of %d", task, self._config.task_limit) - deps = None - status = UNKNOWN - runnable = False + return None, UNKNOWN, False - else: - formatted_traceback = None - try: - self._check_complete_value(is_complete) - except KeyboardInterrupt: - raise - except AsyncCompletionException as ex: - formatted_traceback = ex.trace - except BaseException: - formatted_traceback = traceback.format_exc() - - if formatted_traceback is not None: - self.add_succeeded = False - self._log_complete_error(task, formatted_traceback) - task.trigger_event(Event.DEPENDENCY_MISSING, task) - self._email_complete_error(task, formatted_traceback) - deps = None - status = UNKNOWN - runnable = False - - elif is_complete: - deps = None - status = DONE - runnable = False - task.trigger_event(Event.DEPENDENCY_PRESENT, task) - - elif _is_external(task): - deps = None - status = PENDING - runnable = self._config.retry_external_tasks - task.trigger_event(Event.DEPENDENCY_MISSING, task) - logger.warning("Data for %s does not exist (yet?). The task is an external data dependency, so it cannot be run from this luigi process.", task) + formatted_traceback = None + try: + _check_complete_value(is_complete) + except KeyboardInterrupt: + raise + except AsyncCompletionException as ex: + formatted_traceback = ex.trace + except BaseException: + formatted_traceback = traceback.format_exc() - else: - try: - deps = task.deps() - self._add_task_batcher(task) - except Exception as ex: - formatted_traceback = traceback.format_exc() - self.add_succeeded = False - self._log_dependency_error(task, formatted_traceback) - task.trigger_event(Event.BROKEN_TASK, task, ex) - self._email_dependency_error(task, formatted_traceback) - deps = None - status = UNKNOWN - runnable = False - else: - status = PENDING - runnable = True + if formatted_traceback is not None: + self.add_succeeded = False + self._notifier.log_complete_error(task, formatted_traceback) + task.trigger_event(Event.DEPENDENCY_MISSING, task) + self._notifier.email_complete_error(task, formatted_traceback) + return None, UNKNOWN, False + + if is_complete: + task.trigger_event(Event.DEPENDENCY_PRESENT, task) + return None, DONE, False + + if _is_external(task): + task.trigger_event(Event.DEPENDENCY_MISSING, task) + logger.warning("Data for %s does not exist (yet?). The task is an external data dependency, so it cannot be run from this luigi process.", task) + return None, PENDING, self._config.retry_external_tasks - if task.disabled: - status = DISABLED + try: + deps = task.deps() + self._add_task_batcher(task) + except Exception as ex: + formatted_traceback = traceback.format_exc() + self.add_succeeded = False + self._notifier.log_dependency_error(task, formatted_traceback) + task.trigger_event(Event.BROKEN_TASK, task, ex) + self._notifier.email_dependency_error(task, formatted_traceback) + return None, UNKNOWN, False - if deps: - for d in deps: - self._validate_dependency(d) - task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) - yield d # return additional tasks to add + status = DISABLED if task.disabled else PENDING + return deps, status, True - deps = [d.task_id for d in deps] + def _add(self, task, is_complete): + deps, status, runnable = self._determine_task_status(task, is_complete) + + if task.disabled: + status = DISABLED + + if deps: + for d in deps: + _validate_dependency(d) + task.trigger_event(Event.DEPENDENCY_DISCOVERED, task, d) + yield d + + deps = [d.task_id for d in deps] self._scheduled_tasks[task.task_id] = task self._add_task( @@ -914,18 +927,6 @@ def _add(self, task, is_complete): accepts_messages=task.accepts_messages, ) - def _validate_dependency(self, dependency): - if isinstance(dependency, Target): - raise Exception("requires() can not return Target objects. Wrap it in an ExternalTask class") - elif not isinstance(dependency, Task): - raise Exception("requires() must return Task objects but {} is a {}".format(dependency, type(dependency))) - - def _check_complete_value(self, is_complete): - if is_complete not in (True, False): - if isinstance(is_complete, TracebackWrapper): - raise AsyncCompletionException(is_complete.trace) - raise Exception("Return value of Task.complete() must be boolean (was %r)" % is_complete) - def _add_worker(self): self._worker_info.append(("first_task", self._first_task)) self._scheduler.add_worker(self._id, self._worker_info) @@ -1106,7 +1107,7 @@ def _handle_next_task(self): # external task if run not implemented, retry-able if config option is enabled. external_task_retryable = _is_external(task) and self._config.retry_external_tasks if status == FAILED and not external_task_retryable: - self._email_task_failure(task, expl) + self._notifier.email_task_failure(task, expl) new_deps = [] if new_requirements: @@ -1167,24 +1168,34 @@ def _keep_alive(self, get_work_response): If worker-count-uniques is true, it will also require that one of the tasks is unique to this worker. """ - if not self._config.keep_alive: + keep_alive_disabled = not self._config.keep_alive + if keep_alive_disabled: return False - elif self._assistant: + + is_assistant_worker = self._assistant + if is_assistant_worker: return True - elif self._config.count_last_scheduled: - return get_work_response.n_pending_last_scheduled > 0 - elif self._config.count_uniques: - return get_work_response.n_unique_pending > 0 - elif get_work_response.n_pending_tasks == 0: - return False - elif not self._config.max_keep_alive_idle_duration: + + has_last_scheduled_pending = self._config.count_last_scheduled and get_work_response.n_pending_last_scheduled > 0 + if has_last_scheduled_pending: return True - elif not self._idle_since: + + has_unique_pending = self._config.count_uniques and get_work_response.n_unique_pending > 0 + if has_unique_pending: return True - else: - time_to_shutdown = self._idle_since + self._config.max_keep_alive_idle_duration - datetime.datetime.now() - logger.debug("[%s] %s until shutdown", self._id, time_to_shutdown) - return time_to_shutdown > datetime.timedelta(0) + + no_pending_tasks = get_work_response.n_pending_tasks == 0 + if no_pending_tasks: + return False + + no_idle_duration_limit = not self._config.max_keep_alive_idle_duration + not_yet_idle = not self._idle_since + if no_idle_duration_limit or not_yet_idle: + return True + + time_to_shutdown = self._idle_since + self._config.max_keep_alive_idle_duration - datetime.datetime.now() + logger.debug("[%s] %s until shutdown", self._id, time_to_shutdown) + return time_to_shutdown > datetime.timedelta(0) def handle_interrupt(self, signum, _): """