Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions luigi/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,32 +454,41 @@ class NewlineFormat(WrappedFormat):
wrapper_cls = NewlineWrapper


class GzipFormat(Format):
class CompressionFormat(Format):
input = "bytes"
output = "bytes"

def __init__(self, compression_level=None):
def __init__(self, decompress_cmd, compress_cmd, compression_level=None):
self.decompress_cmd = decompress_cmd
self.compress_cmd = compress_cmd
self.compression_level = compression_level

def pipe_reader(self, input_pipe):
return InputPipeProcessWrapper(["gunzip"], input_pipe)
return InputPipeProcessWrapper(self.decompress_cmd, input_pipe)

def pipe_writer(self, output_pipe):
args = ["gzip"]
args = list(self.compress_cmd)
if self.compression_level is not None:
args.append("-" + str(int(self.compression_level)))
return OutputPipeProcessWrapper(args, output_pipe)


class Bzip2Format(Format):
input = "bytes"
output = "bytes"
class GzipFormat(CompressionFormat):
def __init__(self, compression_level=None):
super().__init__(
decompress_cmd=["gunzip"],
compress_cmd=["gzip"],
compression_level=compression_level,
)

def pipe_reader(self, input_pipe):
return InputPipeProcessWrapper(["bzcat"], input_pipe)

def pipe_writer(self, output_pipe):
return OutputPipeProcessWrapper(["bzip2"], output_pipe)
class Bzip2Format(CompressionFormat):
def __init__(self, compression_level=None):
super().__init__(
decompress_cmd=["bzcat"],
compress_cmd=["bzip2"],
compression_level=compression_level,
)


Text = TextFormat()
Expand Down
10 changes: 7 additions & 3 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,11 +858,15 @@ def add_task(

task = self._state.get_task(task_id, setdefault=_default_task)

if task is None or (task.status != RUNNING and not worker.enabled):
task_does_not_exist = task is None
worker_disabled_and_task_not_running = task is not None and task.status != RUNNING and not worker.enabled
if task_does_not_exist or worker_disabled_and_task_not_running:
Comment on lines +861 to +863

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue this is change is harder to read. Negative variable names makes code more difficult to follow.

return

# Ignore claims that the task is PENDING if it very recently was marked as DONE.
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs:
claimed_pending = status == PENDING
currently_done = task.status == DONE
within_cooldown = (time.time() - task.updated) < self._config.stable_done_cooldown_secs
if claimed_pending and currently_done and within_cooldown:
return

# for setting priority, we'll sometimes create tasks with unset family and params
Expand Down
Loading