-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathskim_nanoaod.py
107 lines (90 loc) · 3.68 KB
/
skim_nanoaod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import multiprocessing as mp
import argparse
import time
import json
import yaml
import glob
import ROOT
from PhysicsTools.NanoAODTools.postprocessing.framework.eventloop import Module
from PhysicsTools.NanoAODTools.postprocessing.framework.postprocessor import PostProcessor
class SkimEvents(Module):
def __init__(self):
self.writeHistFile = True
def beginJob(self, histFile=None, histDirName=None):
Module.beginJob(self, histFile, histDirName)
def analyze(self, event):
return True
def worker(params, nfiles=None):
idx = params['idx'] if 'idx' in params else None
pf = ''.join(['_',str(idx),'_skim']) if idx else '_skim'
infiles = glob.glob(params['files']) if nfiles is None else glob.glob(params['files'])[:nfiles]
p = PostProcessor(
params['output_path'],
infiles,
cut=params['preselection'],
branchsel=None,
modules=[SkimEvents()],
jsonInput=params['json'],
prefetch=True,
longTermCache=True,
postfix=pf
)
p.run()
def main(args):
with open(args.config, 'r') as f:
cfg = yaml.safe_load(f)
datasets = {k:v for k,v in cfg['datasets'].items() if k in args.datasets} if args.datasets else cfg['datasets']
job_configs = []
for params in datasets.values():
if params['json_path'] is None:
job_dict = {}
job_dict['json'] = None
job_dict['output_path'] = params['output_path']
job_dict['files'] = params['files']
job_dict['preselection'] = params['preselection']
job_configs.append(job_dict)
else:
if isinstance(params['files'], list):
for idx, files in enumerate(params['files']):
job_dict = {}
job_dict['json'] = None
job_dict['output_path'] = os.path.join([params['output_path'] , 'skims_local'])
job_dict['files'] = files
job_dict['preselection'] = params['preselection']
job_dict['idx'] = idx
job_configs.append(job_dict)
else:
job_dict = {}
job_dict['json'] = None
job_dict['output_path'] = os.path.join([params['output_path'] , 'skims_local'])
job_dict['files'] = params['files']
job_dict['preselection'] = params['preselection']
job_dict['idx'] = 0
job_configs.append(job_dict)
if 'mp' in cfg['config']['run_strategy']:
n_cores = mp.cpu_count()
if args.verbose:
print(''.join(['Distributing ~',str(len(job_configs)),' jobs to ', str(n_cores), ' cores...']))
start_time = time.perf_counter()
jobs = []
for params in job_configs:
pool = mp.Pool()
proc = mp.Process(target=worker, args=(params, ))
jobs.append(proc)
proc.start()
for p in jobs:
p.join()
finish_time = time.perf_counter()
if args.verbose:
print(''.join(['Finished in ', str(finish_time-start_time), ' seconds']))
else:
for params in job_configs:
worker(params)
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', dest='config', type=str, default='skim_cfg.yml', help='skim configuration file (.yml)')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='printouts to stdout')
parser.add_argument('-d', '--datasets', dest='datasets', nargs='+', help='target datasets to run (from cfg file)')
args = parser.parse_args()
main(args)