-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRunProcessManager.py
More file actions
executable file
·134 lines (96 loc) · 4.34 KB
/
RunProcessManager.py
File metadata and controls
executable file
·134 lines (96 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
__author__ = 'rodozov'
from CommandClasses import *
from Chain import Chain, EventsHandler
from Event import SimpleEvent
from RunlistManager import RunlistManager
import multiprocessing as mp
import Queue
from threading import Thread
import getpass
'''
'''
class RunProcessPool(Thread):
def __init__(self, runs_to_process_queue=None, processed_runs_queue=None, sequence_handler_object=None, options=None):
super(RunProcessPool, self).__init__()
self.options = options
self.pool = mp.Pool(mp.cpu_count(),maxtasksperchild=1)
self.toprocess = runs_to_process_queue
self.processed_runs_q = processed_runs_queue
self.sequence_handler = sequence_handler_object
self.stop_process_event = None
self.runsProcessThread = Thread
self.runChainProcessFunction = None
def processRuns(self, functoapply):
while True:
r_copy = self.toprocess.get()
results_folder = self.options['result_folder']
#run_status = None
for k in r_copy:
run_status = r_copy[k]['status']
#print k, run_status
sequence = self.sequence_handler.getSequenceForName(run_status)
#print 'sequence ', sequence
runChainArgs = {'rundetails':r_copy, 'commands': sequence, 'result_folder': results_folder}
result = self.pool.apply_async(functoapply, (runChainArgs, ))
self.processed_runs_q.put({k: result})
self.toprocess.task_done() # remove the run from the toprocess queue
if self.stop_process_event.is_set() and self.toprocess.empty():
self.pool.close()
self.pool.join()
print 'Exiting run proccess mngr'
break
def run(self):
self.processRuns(self.runChainProcessFunction)
def runForestRun(self):
''' For the lolz ! :) '''
self.start()
def processSingleRunChain(args=None):
'''
Function to run single runChain object
Setup the run chain object with the args
'''
e_handler = EventsHandler()
runchain = Chain(e_handler)
e_handler.addObserver(runchain)
run_num = None
for k in args['rundetails'].keys():
run_num = k
#print 'process is ', mp.current_process().name , ' for run ', run_num
runchain.commands = args['commands']
#print 'commands', runchain.commands
rfolder = args['result_folder']
initialEvent = SimpleEvent('init', True, {'run':run_num, 'result_folder':rfolder})
runchain.startChainWithEvent(initialEvent)
return runchain.getResult()
if __name__ == "__main__":
print os.getpid()
optionsObject = None
with open('resources/options_object.txt', 'r') as optobj: optionsObject = json.loads(optobj.read())
os.environ['LD_LIBRARY_PATH'] = optionsObject['paths']['cms_online_nt_machine']['root_path']
print os.environ['LD_LIBRARY_PATH']
p = getpass.getpass('lxplus passwd: ')
connections_dict = {}
connections_dict.update({'webserver':optionsObject['webserver_remote']})
connections_dict.update({'lxplus':optionsObject['lxplus_archive_remote']})
connections_dict['webserver']['ssh_credentials']['password'] = p
connections_dict['lxplus']['ssh_credentials']['password'] = p
print connections_dict
sshTransport = SSHTransportService(connections_dict)
db_obj = DBService(dbType='oracle+cx_oracle://',host= 'localhost',port= '1521',user= 'CMS_COND_RPC_NOISE',password= 'j6XFEznqH9f92WUf',schema= 'CMS_RPC_COND',dbName= 'cms_orcoff_prep')
#print alist
ssh_one = sshTransport.connections_dict['webserver']
ssh_two = sshTransport.connections_dict['lxplus']
#print ssh_one
#print ssh_two
runsToProcessQueue = Queue.Queue()
processedRunsQueue = Queue.Queue()
sequence_handler = CommandSequenceHandler('resources/SequenceDictionaries.json', 'resources/options_object.txt')
rpmngr = RunProcessPool(runsToProcessQueue, processedRunsQueue, sequence_handler, {'result_folder':'/rpctdata/CAF/'})
rlistMngr = RunlistManager('resources/runlist.json')
rlistMngr.toProcessQueue = runsToProcessQueue
stop = mp.Event()
#stop.set()
rpmngr.stop_process_event = stop
rpmngr.runChainProcessFunction = processSingleRunChain
runsToProcessQueue.put({'268457':rlistMngr.runlist['268457']})
rpmngr.start()