Skip to content

Commit 539c4b5

Browse files
authored
Merge pull request #1289 from btovar/taskvine_executor_backport
feat: Taskvine executor backport
2 parents 828617d + 9595c11 commit 539c4b5

File tree

5 files changed

+1391
-3
lines changed

5 files changed

+1391
-3
lines changed

Diff for: .github/workflows/ci.yml

+24
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,30 @@ jobs:
161161
conda-pack --output coffea-env.tar.gz
162162
python wq.py coffea-env.tar.gz
163163
164+
testtaskvine:
165+
runs-on: ubuntu-latest
166+
timeout-minutes: 15
167+
needs: pre-commit
168+
strategy:
169+
matrix:
170+
python-version: ["3.11"]
171+
name: test coffea-taskvine
172+
173+
steps:
174+
- uses: actions/checkout@v3
175+
- name: Set up Conda
176+
uses: conda-incubator/[email protected]
177+
with:
178+
auto-update-conda: true
179+
python-version: ${{ matrix.python-version }}
180+
- name: Test TaskVine Executor
181+
shell: bash -l {0}
182+
run: |
183+
conda create --yes --name coffea-env -c conda-forge python=${{ matrix.python-version }} ndcctools pytest 'setuptools<71'
184+
conda activate coffea-env
185+
python -m pip install --ignore-installed .
186+
python -m pytest tests/test_taskvine_executor.py
187+
164188
165189
# testskyhookjob:
166190
# runs-on: ubuntu-latest

Diff for: coffea/processor/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
DaskExecutor,
1414
ParslExecutor,
1515
WorkQueueExecutor,
16+
TaskVineExecutor,
1617
Runner,
1718
run_spark_job,
1819
)
@@ -128,6 +129,7 @@ def _run_x_job(
128129
"DaskExecutor",
129130
"ParslExecutor",
130131
"WorkQueueExecutor",
132+
"TaskVineExecutor",
131133
"Runner",
132134
"run_spark_job",
133135
"accumulate",

Diff for: coffea/processor/executor.py

+190-3
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,187 @@ def _wait_for_merges(FH: _FuturesHolder, executor: ExecutorBase) -> Accumulatabl
453453
)
454454

455455

456+
@dataclass
457+
class TaskVineExecutor(ExecutorBase):
458+
"""Execute using Work Queue
459+
460+
For more information, see :ref:`intro-coffea-vine`
461+
462+
Parameters
463+
----------
464+
items : sequence or generator
465+
Sequence of input arguments
466+
function : callable
467+
A function to be called on each input, which returns an accumulator instance
468+
accumulator : Accumulatable
469+
An accumulator to collect the output of the function
470+
status : bool
471+
If true (default), enable progress bar
472+
unit : str
473+
Label of progress bar unit
474+
desc : str
475+
Label of progress bar description
476+
compression : int, optional
477+
Compress accumulator outputs in flight with LZ4, at level specified (default 1)
478+
`None`` sets level to 1 (minimal compression)
479+
# taskvine specific options:
480+
cores : int
481+
Maximum number of cores for work queue task. If unset, use a whole worker.
482+
memory : int
483+
Maximum amount of memory (in MB) for work queue task. If unset, use a whole worker.
484+
disk : int
485+
Maximum amount of disk space (in MB) for work queue task. If unset, use a whole worker.
486+
gpus : int
487+
Number of GPUs to allocate to each task. If unset, use zero.
488+
resource_monitor : str
489+
If given, one of 'off', 'measure', or 'watchdog'. Default is 'off'.
490+
- 'off': turns off resource monitoring. Overriden to 'watchdog' if resources_mode
491+
is not set to 'fixed'.
492+
- 'measure': turns on resource monitoring for Work Queue. The
493+
resources used per task are measured.
494+
- 'watchdog': in addition to measuring resources, tasks are terminated if they
495+
go above the cores, memory, or disk specified.
496+
resources_mode : str
497+
one of 'fixed', 'max-seen', or 'max-throughput'. Default is 'max-seen'.
498+
Sets the strategy to automatically allocate resources to tasks.
499+
- 'fixed': allocate cores, memory, and disk specified for each task.
500+
- 'max-seen' or 'auto': use the cores, memory, and disk given as maximum values to allocate,
501+
but first try each task by allocating the maximum values seen. Leads
502+
to a good compromise between parallelism and number of retries.
503+
- 'max-throughput': Like max-seen, but first tries the task with an
504+
allocation that maximizes overall throughput.
505+
If resources_mode is other than 'fixed', preprocessing and
506+
accumulation tasks always use the 'max-seen' strategy, as the
507+
former tasks always use the same resources, the latter has a
508+
distribution of resources that increases over time.
509+
split_on_exhaustion: bool
510+
Whether to split a processing task in half according to its chunksize when it exhausts its
511+
the cores, memory, or disk allocated to it. If False, a task that exhausts resources
512+
permanently fails. Default is True.
513+
fast_terminate_workers: int
514+
Terminate workers on which tasks have been running longer than average.
515+
The time limit is computed by multiplying the average runtime of tasks
516+
by the value of 'fast_terminate_workers'. Since there are
517+
legitimately slow tasks, no task may trigger fast termination in
518+
two distinct workers. Less than 1 disables it.
519+
checkpoint_proportion: float
520+
Whether to bring back accumulation results to the manager. If proportion of checkpointed inputs
521+
is less than this proportion, the accumulation is checkpointed. >=1 forces all accumulations to
522+
be checkpointed, with <=0 no accumulation is checkpointed. Default is 0.5
523+
524+
manager_name : str
525+
Name to refer to this work queue manager.
526+
Sets port to 0 (any available port) if port not given.
527+
port : int or tuple(int, int)
528+
Port number or range (inclusive of ports )for work queue manager program.
529+
Defaults to 9123 if manager_name not given.
530+
password_file: str
531+
Location of a file containing a password used to authenticate workers.
532+
ssl: bool or tuple(str, str)
533+
Enable ssl encryption between manager and workers. If a tuple, then it
534+
should be of the form (key, cert), where key and cert are paths to the files
535+
containing the key and certificate in pem format. If True, auto-signed temporary
536+
key and cert are generated for the session.
537+
538+
extra_input_files: list
539+
A list of files in the current working directory to send along with each task.
540+
Useful for small custom libraries and configuration files needed by the processor.
541+
x509_proxy : str
542+
Path to the X509 user proxy. If None (the default), use the value of the
543+
environment variable X509_USER_PROXY, or fallback to the file /tmp/x509up_u${UID} if
544+
exists. If False, disables the default behavior and no proxy is sent.
545+
546+
environment_file : optional, str
547+
Conda python environment tarball to use. If not given, assume that
548+
the python environment is already setup at the execution site.
549+
550+
treereduction : int
551+
Number of processed chunks per accumulation task. Defaults is 20.
552+
concurrent_reads : int
553+
Number of processed chunks concurrently read per accumulation task. Defaults is 2.
554+
replicas : int
555+
Number of replicas for temporary results in the cluster before checkpointing to manager
556+
with an accumulation. If less than 2, only one copy of a result is kept, which reduces
557+
cluster disk usage, but results need to be regenerated if workers are lost.
558+
559+
verbose : bool
560+
If true, emit a message on each task submission and completion.
561+
Default is false.
562+
print_stdout : bool
563+
If true (default), print the standard output of work queue task on completion.
564+
565+
filepath: str
566+
Path to the parent directory where to create the staging directory.
567+
Default is "." (current working directory).
568+
569+
custom_init : function, optional
570+
A function that takes as an argument the queue's WorkQueue object.
571+
The function is called just before the first work unit is submitted
572+
to the queue.
573+
"""
574+
575+
# Standard executor options:
576+
compression: Optional[int] = 1
577+
retries: int = 2 # task executes at most 3 times
578+
# wq executor options:
579+
manager_name: Optional[str] = None
580+
port: Optional[Union[int, Tuple[int, int]]] = None
581+
filepath: str = "."
582+
events_total: Optional[int] = None
583+
x509_proxy: Optional[str] = None
584+
verbose: bool = False
585+
print_stdout: bool = False
586+
status_display_interval: Optional[int] = 10
587+
password_file: Optional[str] = None
588+
ssl: Union[bool, Tuple[str, str]] = False
589+
environment_file: Optional[str] = None
590+
extra_input_files: List = field(default_factory=list)
591+
resource_monitor: Optional[str] = "off"
592+
resources_mode: Optional[str] = "max-seen"
593+
split_on_exhaustion: Optional[bool] = True
594+
fast_terminate_workers: Optional[int] = None
595+
checkpoint_proportion: Optional[float] = 0.5
596+
cores: Optional[int] = None
597+
memory: Optional[int] = None
598+
disk: Optional[int] = None
599+
gpus: Optional[int] = None
600+
treereduction: int = 10
601+
concurrent_reads: int = 2
602+
replicas: int = 3
603+
chunksize: int = 100000
604+
dynamic_chunksize: Optional[Dict] = None
605+
custom_init: Optional[Callable] = None
606+
607+
# deprecated
608+
bar_format: Optional[str] = None
609+
chunks_accum_in_mem: Optional[int] = None
610+
master_name: Optional[str] = None
611+
chunks_per_accum: Optional[int] = None
612+
wrapper: Optional[str] = None
613+
debug_log: Optional[str] = None
614+
stats_log: Optional[str] = None
615+
transactions_log: Optional[str] = None
616+
tasks_accum_log: Optional[str] = None
617+
618+
def __call__(
619+
self,
620+
items: Iterable,
621+
function: Callable,
622+
accumulator: Accumulatable,
623+
):
624+
from .taskvine_executor import run
625+
626+
return (
627+
run(
628+
self,
629+
items,
630+
function,
631+
accumulator,
632+
),
633+
0,
634+
)
635+
636+
456637
@dataclass
457638
class WorkQueueExecutor(ExecutorBase):
458639
"""Execute using Work Queue
@@ -1814,7 +1995,9 @@ def run(
18141995
processor_instance=pi_to_send,
18151996
)
18161997

1817-
if self.format == "root" and isinstance(self.executor, WorkQueueExecutor):
1998+
if self.format == "root" and isinstance(
1999+
self.executor, (TaskVineExecutor, WorkQueueExecutor)
2000+
):
18182001
# keep chunks in generator, use a copy to count number of events
18192002
# this is cheap, as we are reading from the cache
18202003
chunks_to_count = self.preprocess(fileset, treename)
@@ -1829,7 +2012,7 @@ def run(
18292012
"unit": "chunk",
18302013
"function_name": type(processor_instance).__name__,
18312014
}
1832-
if isinstance(self.executor, WorkQueueExecutor):
2015+
if isinstance(self.executor, (TaskVineExecutor, WorkQueueExecutor)):
18332016
exe_args.update(
18342017
{
18352018
"unit": "event",
@@ -1856,7 +2039,11 @@ def run(
18562039
processor_instance.postprocess(wrapped_out["out"])
18572040

18582041
if "metrics" in wrapped_out.keys():
1859-
wrapped_out["metrics"]["chunks"] = len(chunks)
2042+
if isinstance(self.executor, (TaskVineExecutor, WorkQueueExecutor)):
2043+
wrapped_out["metrics"]["chunks"] = len(wrapped_out["processed"])
2044+
else:
2045+
wrapped_out["metrics"]["chunks"] = len(chunks_to_count)
2046+
18602047
for k, v in wrapped_out["metrics"].items():
18612048
if isinstance(v, set):
18622049
wrapped_out["metrics"][k] = list(v)

0 commit comments

Comments
 (0)