Skip to content

Commit d77f77e

Browse files
authored
Merge pull request #78 from CoffeaTeam/topic_parsl
Parsl executor and corresponding tests with simple config
2 parents e3a402c + cc1da9f commit d77f77e

File tree

18 files changed

+292
-188
lines changed

18 files changed

+292
-188
lines changed

MANIFEST.in

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
include README.rst
2+
include LICENSE
3+
include fnal_column_analysis_tools/processor/templates/*.tmpl

fnal_column_analysis_tools/processor/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from .executor import (
88
iterative_executor,
99
futures_executor,
10-
condor_executor,
1110
run_uproot_job,
1211
run_parsl_job,
1312
run_spark_job

fnal_column_analysis_tools/processor/dataframe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ class LazyDataFrame(MutableMapping):
1111
Simple delayed uproot reader (a la lazyarrays)
1212
Keeps track of values accessed, for later parsing.
1313
"""
14-
def __init__(self, tree, stride=None, index=None, preload_items=None):
14+
def __init__(self, tree, stride=None, index=None, preload_items=None, flatten=False):
1515
self._tree = tree
16-
self._branchargs = {'awkwardlib': awkward}
16+
self._branchargs = {'awkwardlib': awkward, 'flatten': flatten}
1717
self._stride = None
1818
if (stride is not None) and (index is not None):
1919
self._stride = stride

fnal_column_analysis_tools/processor/executor.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
from .accumulator import accumulator
77

88
try:
9-
from collections.abc import Mapping
9+
from collections.abc import Mapping, Sequence
1010
from functools import lru_cache
1111
except ImportError:
12-
from collections import Mapping
12+
from collections import Mapping, Sequence
1313

1414
def lru_cache(maxsize):
1515
def null_wrapper(f):
@@ -60,14 +60,6 @@ def futures_executor(items, function, accumulator, workers=2, status=True, unit=
6060
return accumulator
6161

6262

63-
def condor_executor(items, function, accumulator, workers, status=True, unit='items', desc='Processing'):
64-
raise NotImplementedError
65-
66-
67-
def spark_executor(items, function, accumulator, config, status=True, unit='datasets', desc='Processing'):
68-
raise NotImplementedError
69-
70-
7163
def _work_function(item):
7264
dataset, fn, treename, chunksize, index, processor_instance = item
7365
file = uproot.open(fn)
@@ -122,7 +114,7 @@ def run_uproot_job(fileset, treename, processor_instance, executor, executor_arg
122114
return output
123115

124116

125-
def run_parsl_job(fileset, treename, processor_instance, executor, executor_args={'config': None}, chunksize=500000):
117+
def run_parsl_job(fileset, treename, processor_instance, executor, data_flow=None, executor_args={'config': None}, chunksize=500000):
126118
'''
127119
A convenience wrapper to submit jobs for a file, which is a
128120
dictionary of dataset: [file list] entries. In this case using parsl.
@@ -149,8 +141,8 @@ def run_parsl_job(fileset, treename, processor_instance, executor, executor_args
149141

150142
print('parsl version:', parsl.__version__)
151143

152-
from .parsl.detail import _parsl_work_function, _parsl_get_chunking
153-
from .parsl.parsl_base_executor import ParslBaseExecutor
144+
from .parsl.parsl_executor import ParslExecutor
145+
from .parsl.detail import _parsl_initialize, _parsl_stop, _parsl_get_chunking
154146

155147
if executor_args['config'] is None:
156148
executor_args.pop('config')
@@ -159,17 +151,32 @@ def run_parsl_job(fileset, treename, processor_instance, executor, executor_args
159151
raise ValueError("Expected fileset to be a mapping dataset: list(files)")
160152
if not isinstance(processor_instance, ProcessorABC):
161153
raise ValueError("Expected processor_instance to derive from ProcessorABC")
162-
if not isinstance(executor, ParslBaseExecutor):
154+
if not isinstance(executor, ParslExecutor):
163155
raise ValueError("Expected executor to derive from ParslBaseExecutor")
164156

157+
# initialize spark if we need to
158+
# if we initialize, then we deconstruct
159+
# when we're done
160+
killParsl = False
161+
if data_flow is None:
162+
data_flow = _parsl_initialize(**executor_args)
163+
killParsl = True
164+
else:
165+
if not isinstance(data_flow, parsl.dataflow.dflow.DataFlowKernel):
166+
raise ValueError("Expected 'data_flow' to be a parsl.dataflow.dflow.DataFlowKernel")
167+
165168
items = []
166169
for dataset, filelist in tqdm(fileset.items(), desc='Preprocessing'):
167170
for chunk in _parsl_get_chunking(tuple(filelist), treename, chunksize):
168-
items.append((dataset, chunk[0], treename, chunk[1], chunk[2], processor_instance))
171+
items.append((dataset, chunk[0], treename, chunk[1], chunk[2]))
169172

170173
output = processor_instance.accumulator.identity()
171-
executor(items, _parsl_work_function, output, **executor_args)
174+
executor(data_flow, items, processor_instance, output, **executor_args)
172175
processor_instance.postprocess(output)
176+
177+
if killParsl:
178+
_parsl_stop(data_flow)
179+
173180
return output
174181

175182

@@ -230,7 +237,7 @@ def run_spark_job(fileset, processor_instance, executor, executor_args={'config'
230237
if not isinstance(spark, pyspark.sql.session.SparkSession):
231238
raise ValueError("Expected 'spark' to be a pyspark.sql.session.SparkSession")
232239

233-
dfslist = _spark_make_dfs(spark, fileset, partitionsize, thread_workers)
240+
dfslist = _spark_make_dfs(spark, fileset, partitionsize, processor_instance.columns, thread_workers)
234241

235242
output = processor_instance.accumulator.identity()
236243
executor(spark, dfslist, processor_instance, output, thread_workers)

fnal_column_analysis_tools/processor/parsl/condor_executor.py

Lines changed: 0 additions & 23 deletions
This file was deleted.

fnal_column_analysis_tools/processor/parsl/detail.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from concurrent.futures import as_completed
2+
import multiprocessing
3+
4+
import parsl
25

36
from parsl.app.app import python_app
47

58
from parsl.providers import LocalProvider
69
from parsl.channels import LocalChannel
710
from parsl.config import Config
811
from parsl.executors import HighThroughputExecutor
9-
from parsl.addresses import address_by_hostname
1012

1113
try:
1214
from functools import lru_cache
@@ -17,33 +19,30 @@ def null_wrapper(f):
1719

1820
return null_wrapper
1921

20-
21-
default_cfg = Config(
22+
_default_cfg = Config(
2223
executors=[
2324
HighThroughputExecutor(
2425
label="coffea_parsl_default",
25-
address=address_by_hostname(),
26-
prefetch_capacity=0,
27-
worker_debug=True,
2826
cores_per_worker=1,
29-
max_workers=1,
30-
# max_blocks=200,
31-
# workers_per_node=1,
32-
worker_logdir_root='./',
3327
provider=LocalProvider(
3428
channel=LocalChannel(),
3529
init_blocks=1,
3630
max_blocks=1,
37-
nodes_per_block=1
3831
),
3932
)
4033
],
4134
strategy=None,
4235
)
4336

4437

45-
def _parsl_work_function():
46-
raise NotImplementedError
38+
def _parsl_initialize(config=_default_cfg):
39+
dfk = parsl.load(config)
40+
return dfk
41+
42+
43+
def _parsl_stop(dfk):
44+
dfk.cleanup()
45+
parsl.clear()
4746

4847

4948
@python_app

fnal_column_analysis_tools/processor/parsl/parsl_base_executor.py

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from fnal_column_analysis_tools import hist, processor
2+
from copy import deepcopy
3+
from concurrent.futures import as_completed
4+
5+
from tqdm import tqdm
6+
import cloudpickle as cpkl
7+
import lz4.frame as lz4f
8+
import numpy as np
9+
import pandas as pd
10+
11+
from parsl.app.app import python_app
12+
13+
lz4_clevel = 1
14+
15+
16+
@python_app
17+
def coffea_pyapp(dataset, fn, treename, chunksize, index, procstr):
18+
import uproot
19+
import cloudpickle as cpkl
20+
import lz4.frame as lz4f
21+
from fnal_column_analysis_tools import hist, processor
22+
from fnal_column_analysis_tools.processor.accumulator import accumulator
23+
24+
lz4_clevel = 1
25+
26+
# instrument xrootd source
27+
if not hasattr(uproot.source.xrootd.XRootDSource, '_read_real'):
28+
29+
def _read(self, chunkindex):
30+
self.bytesread = getattr(self, 'bytesread', 0) + self._chunkbytes
31+
return self._read_real(chunkindex)
32+
33+
uproot.source.xrootd.XRootDSource._read_real = uproot.source.xrootd.XRootDSource._read
34+
uproot.source.xrootd.XRootDSource._read = _read
35+
36+
processor_instance = cpkl.loads(lz4f.decompress(procstr))
37+
38+
file = uproot.open(fn)
39+
tree = file[treename]
40+
41+
df = processor.LazyDataFrame(tree, chunksize, index, flatten=True)
42+
df['dataset'] = dataset
43+
44+
vals = processor_instance.process(df)
45+
vals['_bytesread'] = accumulator(file.source.bytesread if isinstance(file.source, uproot.source.xrootd.XRootDSource) else 0)
46+
valsblob = lz4f.compress(cpkl.dumps(vals), compression_level=lz4_clevel)
47+
48+
return valsblob
49+
50+
51+
class ParslExecutor(object):
52+
53+
def __init__(self):
54+
pass
55+
56+
def __call__(self, dfk, items, processor_instance, output, unit='items', desc='Processing'):
57+
procstr = lz4f.compress(cpkl.dumps(processor_instance))
58+
59+
nitems = len(items)
60+
ftr_to_item = set()
61+
for dataset, fn, treename, chunksize, index in items:
62+
ftr_to_item.add(coffea_pyapp(dataset, fn, treename, chunksize, index, procstr))
63+
64+
for ftr in tqdm(as_completed(ftr_to_item), total=nitems, unit='items', desc='Processing'):
65+
blob = ftr.result()
66+
ftrhist = cpkl.loads(lz4f.decompress(blob))
67+
output.add(ftrhist)
68+
69+
70+
parsl_executor = ParslExecutor()

fnal_column_analysis_tools/processor/parsl/slurm_executor.py

Lines changed: 0 additions & 23 deletions
This file was deleted.

fnal_column_analysis_tools/processor/spark/detail.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ def _spark_initialize(config=_default_config, **kwargs):
1919
spark_progress = kwargs['spark_progress']
2020

2121
cfg_actual = config
22+
# get spark to not complain about missing log configs
23+
cfg_actual = cfg_actual.config('spark.driver.extraJavaOptions', '-Dlog4jspark.root.logger=ERROR,console')
2224
if not spark_progress:
2325
cfg_actual = cfg_actual.config('spark.ui.showConsoleProgress', 'false')
2426

@@ -36,22 +38,30 @@ def _spark_initialize(config=_default_config, **kwargs):
3638
def _read_df(spark, files_or_dirs):
3739
if not isinstance(files_or_dirs, Sequence):
3840
raise ValueError("spark dataset file list must be a Sequence (like list())")
39-
return spark.read.parquet(*files_or_dirs)
41+
df = spark.read.parquet(*files_or_dirs)
42+
count = df.count()
43+
return df, count
4044

4145

42-
def _spark_make_dfs(spark, fileset, partitionsize, thread_workers):
46+
def _spark_make_dfs(spark, fileset, partitionsize, columns, thread_workers):
4347
dfs = {}
48+
ana_cols = set(columns)
4449
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
4550
future_to_ds = {executor.submit(_read_df, spark, fileset[dataset]): dataset for dataset in fileset.keys()}
4651
for ftr in tqdm(as_completed(future_to_ds), total=len(fileset), desc='loading', unit='datasets'):
4752
dataset = future_to_ds[ftr]
48-
df = ftr.result()
53+
df, count = ftr.result()
54+
df_cols = set(df.columns)
55+
cols_in_df = ana_cols.intersection(df_cols)
56+
df = df.select(*cols_in_df)
57+
missing_cols = ana_cols - cols_in_df
58+
for missing in missing_cols:
59+
df = df.withColumn(missing, fn.lit(0.0))
4960
df = df.withColumn('dataset', fn.lit(dataset))
50-
count = df.count()
5161
npartitions = max(count // partitionsize, 1)
5262
if df.rdd.getNumPartitions() > npartitions:
53-
df = df.coalesce(npartitions)
54-
dfs[dataset] = df
63+
df = df.repartition(npartitions)
64+
dfs[dataset] = (df, count)
5565
return dfs
5666

5767

0 commit comments

Comments
 (0)