Skip to content

Commit 2520c42

Browse files
author
David Stirling
authored
Create run_batch_general.py (#108)
1 parent 4f0a525 commit 2520c42

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed

run_batch_general.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
import json
2+
import boto3
3+
import string
4+
import os
5+
import posixpath
6+
class JobQueue():
7+
8+
def __init__(self,name=None):
9+
self.sqs = boto3.resource('sqs')
10+
self.queue = self.sqs.get_queue_by_name(QueueName=name+'Queue')
11+
self.inProcess = -1
12+
self.pending = -1
13+
14+
def scheduleBatch(self, data):
15+
msg = json.dumps(data)
16+
response = self.queue.send_message(MessageBody=msg)
17+
print 'Batch sent. Message ID:',response.get('MessageId')
18+
19+
#project specific stuff
20+
topdirname='' #PROJECTNAME
21+
projectname='' #PROJECTNAME
22+
batchsuffix='' #BATCHNAME
23+
rows=list(string.ascii_uppercase)[0:8]
24+
columns=range(1,13)
25+
sites=range(1,18)
26+
platelist=[] # PLATEFOLDERNAMES
27+
illumpipename='illum.cppipe'
28+
qcpipename='qc.cppipe'
29+
analysispipename='analysis.cppipe'
30+
batchpipenameillum='Batch_data_illum.h5'
31+
batchpipename='Batch_data.h5'
32+
33+
#not project specific, unless you deviate from the structure
34+
startpath=posixpath.join('projects',topdirname)
35+
pipelinepath=posixpath.join(startpath,os.path.join('workspace/pipelines',batchsuffix))
36+
illumoutpath=posixpath.join(startpath,os.path.join(batchsuffix,'illum'))
37+
QCoutpath=posixpath.join(startpath,os.path.join('workspace/qc',os.path.join(batchsuffix,'results')))
38+
analysisoutpath=posixpath.join(startpath,os.path.join('workspace/analysis',batchsuffix))
39+
inputpath=posixpath.join(startpath,os.path.join('workspace/qc',os.path.join(batchsuffix,'rules')))
40+
datafilepath=posixpath.join(startpath,os.path.join('workspace/load_data_csv',batchsuffix))
41+
anlysisoutputstructure="Metadata_Plate/analysis/Metadata_Plate-Metadata_Well-Metadata_Site"
42+
batchpath=posixpath.join(startpath,os.path.join('workspace/batchfiles',batchsuffix))
43+
44+
def MakeIllumJobs(mode='repurp'):
45+
illumqueue = JobQueue(projectname+'_Illum')
46+
for toillum in platelist:
47+
if mode=='repurp':
48+
templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum,
49+
'pipeline': posixpath.join(pipelinepath,illumpipename),'output': illumoutpath,
50+
'input': inputpath, 'data_file':posixpath.join(datafilepath,toillum+'.csv')}
51+
elif mode=='batch':
52+
templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum,
53+
'pipeline': posixpath.join(batchpath,batchpipenameillum),
54+
'output': illumoutpath,
55+
'input':inputpath,
56+
'data_file': posixpath.join(batchpath,batchpipenameillum)
57+
}
58+
else:
59+
templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum,
60+
'pipeline': posixpath.join(pipelinepath,illumpipename),'output': illumoutpath,
61+
'input': inputpath, 'data_file':posixpath.join(datafilepath,untruncatedplatedict[toillum],'load_data.csv')}
62+
63+
illumqueue.scheduleBatch(templateMessage_illum)
64+
65+
print 'Illum job submitted. Check your queue'
66+
67+
def MakeQCJobs(repurp=False):
68+
qcqueue = JobQueue(projectname+'_QC')
69+
for toqc in platelist:
70+
for eachrow in rows:
71+
for eachcol in columns:
72+
if repurp==False:
73+
templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol,
74+
'pipeline': posixpath.join(pipelinepath,qcpipename),
75+
'output': QCoutpath,
76+
'input': inputpath,
77+
'data_file': posixpath.join(datafilepath,toqc+'.csv')
78+
}
79+
else:
80+
templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol,
81+
'pipeline': posixpath.join(pipelinepath,qcpipename),
82+
'output': QCoutpath,
83+
'input': inputpath,
84+
'data_file': posixpath.join(datafilepath,untruncatedplatedict[toqc],'load_data.csv')
85+
}
86+
qcqueue.scheduleBatch(templateMessage_qc)
87+
88+
print 'QC job submitted. Check your queue'
89+
90+
def MakeQCJobs_persite(repurp=False):
91+
qcqueue = JobQueue(projectname+'_QC')
92+
for toqc in platelist:
93+
for eachrow in rows:
94+
for eachcol in columns:
95+
for eachsite in sites:
96+
if repurp==False:
97+
templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite),
98+
'pipeline': posixpath.join(pipelinepath,qcpipename),
99+
'output': QCoutpath,
100+
'input': inputpath,
101+
'data_file': posixpath.join(datafilepath,toqc+'.csv')
102+
}
103+
else:
104+
templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite),
105+
'pipeline': posixpath.join(pipelinepath,qcpipename),
106+
'output': QCoutpath,
107+
'input': inputpath,
108+
'data_file': posixpath.join(datafilepath,untruncatedplatedict[toqc],'load_data.csv')
109+
}
110+
111+
qcqueue.scheduleBatch(templateMessage_qc)
112+
113+
print 'QC job submitted. Check your queue'
114+
115+
116+
def MakeAnalysisJobs(mode='repurp'):
117+
analysisqueue = JobQueue(projectname+'_Analysis')
118+
for toanalyze in platelist:
119+
for eachrow in rows:
120+
for eachcol in columns:
121+
for eachsite in sites:
122+
if mode=='repurp':
123+
templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite),
124+
'pipeline': posixpath.join(pipelinepath,analysispipename),
125+
'output': analysisoutpath,
126+
'output_structure':anlysisoutputstructure,
127+
'input':inputpath,
128+
'data_file': posixpath.join(datafilepath,toanalyze+'_with_illum.csv')
129+
}
130+
elif mode=='batch':
131+
templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite),
132+
'pipeline': posixpath.join(batchpath,batchpipename),
133+
'output': analysisoutpath,
134+
'output_structure':anlysisoutputstructure,
135+
'input':inputpath,
136+
'data_file': posixpath.join(batchpath,batchpipename)
137+
}
138+
else:
139+
templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite),
140+
'pipeline': posixpath.join(pipelinepath,analysispipename),
141+
'output': analysisoutpath,
142+
'output_structure':anlysisoutputstructure,
143+
'input':inputpath,
144+
'data_file': posixpath.join(datafilepath,untruncatedplatedict[toanalyze],'load_data_with_illum.csv')
145+
}
146+
147+
analysisqueue.scheduleBatch(templateMessage_analysis)
148+
149+
print 'Analysis job submitted. Check your queue'
150+
151+
#MakeIllumJobs(mode='batch')
152+
#MakeQCJobs(repurp=True)
153+
#MakeQCJobs_persite(repurp=True)
154+
#MakeAnalysisJobs(mode='batch')
155+

0 commit comments

Comments
 (0)