-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathET.py
More file actions
284 lines (259 loc) · 11.6 KB
/
ET.py
File metadata and controls
284 lines (259 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import requests
import subprocess
import os
import glob
import json
import BRB.misc
from BRB.logger import log
import pandas as pd
from pathlib import Path
def getNReads(d):
"""
Get the number of reads and % optical dupes from a directory
"""
if len(glob.glob("{}/*.duplicate.txt".format(d))):
fname = glob.glob("{}/*.duplicate.txt".format(d))[0]
s = open(fname).read()
optDupes, total = s.split()
try:
opt_frac = 100. * float(optDupes) / float(total)
except ZeroDivisionError:
opt_frac = float(-1)
return int(total) - int(optDupes), opt_frac
else:
# For machines in which we don't mark duplicates
# Just count the number of reads in R1
CMD1 = ["zcat", glob.glob("{}/*_R1.fastq.gz".format(d.replace("FASTQC_", "")))[0]]
CMD2 = ["wc", "-l"]
c1 = subprocess.Popen(CMD1, stdout=subprocess.PIPE)
res = subprocess.check_output(CMD2, stdin=c1.stdout)
return (int(res) / 4), 0.
def getOffSpeciesRate(d, organism = None) -> float:
"""
Parses kraken report for number of reads mapping to unexpected organisms
"""
fname = glob.glob("{}/*rep".format(d))[0]
# match parkour org to kraken db organism/group
org_map = {
'Human (GRCh38)': 'humangrp',
'Human (GRCh37 / hg19)': 'humangrp',
'Mouse (GRCm38 / mm10)': 'mousegrp',
'Mouse (GRCm39)': 'mousegrp',
'mouse': 'mousegrp',
'human': 'humangrp',
'Escherichia phage Lambda':'lambdaphage',
'Caenorhabditis_elegans': 'c-elegans',
'lamprey': 'sea-lamprey',
'medaka': 'japanese-medaka',
'zebrafish': 'zebrafish',
'drosophila': 'flygrp',
}
if organism not in org_map:
log.info(f"getOffSpeciesRate - organism {organism} is not in the org_map!")
return 0
with open(fname) as f:
for line in f:
if org_map[organism] in line:
off = 1-(float(line.strip().split()[0])/100)
# off-species actually means fraction of non-expected organism reads !
# off-species reads vs of-species reads ;)
log.info(f"confident reads for {fname} = {off}")
return off
def getBaseStatistics(config, outputDir, samples_id, organism = None):
"""
Return a directionary with keys lib names and values:
(sample name, nReads, off-species rate, % optical dupes)
Also return the mapping of sample names to library names
"""
baseDict = dict() # output dictionary
s2l = dict() # sample to library dictionary
odir, adir = os.path.split(os.path.split(outputDir)[0])
pdir = "FASTQC_Project_{}".format(adir[9:])
for sample in samples_id:
for d in glob.glob("{}/{}/{}/Sample_{}".format(config.get('Paths','baseData'),
config.get('Options', 'runID'),
pdir, sample)):
libName = os.path.split(d)[1][7:]
if len(glob.glob("{}/*_R1_fastqc.zip".format(d))) == 0:
continue # Skip failed samples
sampleName = glob.glob("{}/*_R1_fastqc.zip".format(d))[0]
sampleName = os.path.split(sampleName)[1][:-14]
nReads, optDupes = getNReads(d) # opt. dup.
offRate = getOffSpeciesRate(d,organism)
baseDict[libName] = [sampleName, nReads, offRate, optDupes]
s2l[sampleName] = libName
return baseDict, s2l
def DNA(config, outputDir, baseDict, sample2lib):
"""
Parse an output directory to get a dictionary of libraries and their associated values.
Add % mapped, % dupped, and insert size to baseDict. Filter it for those actually in the output
"""
# baseDict, sample2lib = getBaseStatistics(config, outputDir)
# If we have RELACS, the sample2lib won't match what we find here.
# We can re-parse the sampleSheet to upload actual statistics of the RELACS demuxed samples.
if Path(outputDir, 'RELACS_sampleSheet.txt').exists():
# RELACS is a problem for parkour (matching is in sampleID / barcode level).
# Just return a list of dicts with the previous info
m = []
for k, v in baseDict.items():
m.append({'barcode': k,
'reads_pf_sequenced': v[1],
'confident_reads': v[2],
'optical_duplicates': v[3]})
log.info(f"ET - DNA module detected RELACS. Returning {m}")
return m
# % Mapped
for fname in glob.glob("{}/Bowtie2/*.Bowtie2_summary.txt".format(outputDir)):
sampleName = os.path.basename(fname).split(".Bowtie2_summary")[0]
lastLine = open(fname).read().split("\n")[-2]
mappedPercent = lastLine.split("%")[0]
baseDict[sample2lib[sampleName]].append(float(mappedPercent))
# % Duplicated
dup_info = glob.glob("{}/multiQC/multiqc_data/multiqc_samtools_flagstat.txt".format(outputDir))[0]
dup_df = pd.read_csv(dup_info, sep ="\t", usecols=["Sample", "total_passed", "duplicates_passed"])
dup_df = dup_df.loc[dup_df["Sample"].astype(str) == sampleName]
dup_rate = dup_df["duplicates_passed"].values/dup_df["total_passed"].values*100
dup_rate = dup_rate[0]
baseDict[sample2lib[sampleName]].append(dup_rate)
# Median insert size
insert_size_info = os.path.join(outputDir, "deepTools_qc/bamPEFragmentSize/fragmentSize.metric.tsv")
insert_size_df = pd.read_csv(insert_size_info, sep ="\t")
medInsertSize = insert_size_df.loc[insert_size_df["Unnamed: 0"]=="filtered_bam/"+sampleName+".filtered.bam"]
medInsertSize = medInsertSize["Frag. Len. Median"].values[0]
baseDict[sample2lib[sampleName]].append(int(medInsertSize))
log.info(f"ET - DNA module parsed {baseDict}")
# Reformat into a matrix
m = []
for k, v in baseDict.items():
m.append({'barcode': k,
'reads_pf_sequenced': v[1],
'confident_reads': v[2],
'optical_duplicates': v[3],
'dupped_reads': v[5],
'mapped_reads': v[4],
'insert_size': v[6]})
return m
def RNA(config, outputDir, baseDict, sample2lib):
"""
Parse an output directory to get a dictionary of libraries and their associated values.
Add % mapped to baseDict. Filter it for those actually in the output
"""
for fname in glob.glob("{}/STAR/*/*.Log.final.out".format(outputDir)):
f = open(fname)
tot = 0
uniq = 0
multimap = 0
for line in f:
if 'Uniquely mapped reads %' in line:
uniq = float(line.strip().split("\t")[-1][:-1])
tot += uniq
elif '% of reads mapped to multiple loci' in line:
multimap = float(line.strip().split("\t")[-1][:-1])
tot += multimap
sampleName = os.path.basename(fname).split(".")[0]
baseDict[sample2lib[sampleName]].append(tot)
baseDict[sample2lib[sampleName]].append(uniq)
baseDict[sample2lib[sampleName]].append(multimap)
# duplication
dup_info = glob.glob("{}/multiQC/multiqc_data/multiqc_samtools_flagstat.txt".format(outputDir))[0]
dup_df = pd.read_csv(dup_info, sep ="\t", usecols=["Sample", "total_passed", "duplicates_passed"])
dup_df = dup_df.loc[dup_df["Sample"].astype(str) == sampleName]
dup_rate = dup_df["duplicates_passed"].values/dup_df["total_passed"].values*100
dup_rate = dup_rate[0]
baseDict[sample2lib[sampleName]].append(dup_rate)
# assigned reads
assigned_info = glob.glob("{}/multiQC/multiqc_data/multiqc_featurecounts.txt".format(outputDir))[0]
assigned_df = pd.read_csv(assigned_info, sep ="\t", usecols=["Sample", "Total", "Assigned"])
assigned_df = assigned_df.loc[assigned_df["Sample"].astype(str) == sampleName+".filtered"]
assigned_rate = assigned_df["Assigned"].values/assigned_df["Total"].values*100
assigned_rate = assigned_rate[0]
baseDict[sample2lib[sampleName]].append(assigned_rate)
log.info(f"ET - RNA module parsed {baseDict}")
# Reformat into a matrix
m = []
for k, v in baseDict.items():
m.append({'barcode': k,
'reads_pf_sequenced': v[1],
'confident_reads': v[2],
'optical_duplicates': v[3],
'mapped_reads': v[4],
'uniq_mapped': v[5],
'multi_mapped': v[6],
'dupped_reads': v[7],
'assigned_reads': v[8]})
return m
def sendToParkour(config, msg):
basePath= config.get("Paths","baseData")
aviti_check= glob.glob(f"{basePath}/*/RunManifest.csv")
if aviti_check:
FCID = config.get("Options", "runID").split("_")[2]
if '-' in FCID:
FCID = FCID.split('-')[-1]
d = {'flowcell_id': FCID}
d['sequences'] = json.dumps(msg)
else:
FCID = config.get("Options", "runID").split("_")[3][1:]
if '-' in FCID:
FCID = FCID.split('-')[-1]
d = {'flowcell_id': FCID}
d['sequences'] = json.dumps(msg)
log.info(f"sendToParkour: Sending {d} to Parkour")
res = requests.post(config.get("Parkour", "ResultsURL"), auth=(config.get("Parkour", "user"), config.get("Parkour", "password")), data=d, verify=config.get("Parkour", "cert"))
log.info(f"sendToParkour return {res}")
return res
def phoneHome(config, outputDir, pipeline, samples_tuples, organism, project, libType):
"""
Return metrics to Parkour, the results are in outputDir and pipeline needs to be run on them
"""
samples_id = [row[0] for row in samples_tuples]
baseDict, sample2lib = getBaseStatistics(config, outputDir, samples_id, organism)
msg = None
if pipeline == 'DNA':
msg = DNA(config, outputDir, baseDict, sample2lib)
elif pipeline == 'RNA':
msg = RNA(config, outputDir, baseDict, sample2lib)
else:
m = []
for k, v in baseDict.items():
m.append({'barcode': k,
'reads_pf_sequenced': v[1],
'confident_reads': v[2],
'optical_duplicates': v[3]})
msg = m
log.info(f"phoneHome: got msg = {msg}")
if msg is not None:
ret = sendToParkour(config, msg)
else:
ret = None
return [project, organism, libType, pipeline, 'success', ret]
def telegraphHome(config, group, project, skipList, organism=None):
"""
The skipList is a list of samples/libraries for which we don't run a pipeline, but it'd be nice to still send back sequencing metrics
Structure of skipList:
[library, sampleName, libraryType]
"""
log.info(f"telegraphHome triggered for {project}")
# make a fake output directory path
baseDir = Path(
config.get('Paths', 'groupData'),
BRB.misc.pacifier(group),
BRB.misc.getLatestSeqdir(config.get('Paths','groupData'), group),
config.get('Options', 'runID'),
BRB.misc.pacifier(project)
)
# Mock path
outputDir = baseDir / "DNA_mouse"
samples_id = [row[0] for row in skipList]
baseDict, sample2lib = getBaseStatistics(config, outputDir, samples_id, organism)
# Reformat into a matrix
m = []
for k, v in baseDict.items():
m.append({'barcode': k,
'reads_pf_sequenced': v[1],
'confident_reads': v[2],
'optical_duplicates': v[3]})
ret = sendToParkour(config, m)
# Format the libtypes
libTypes = ','.join(set([i[2] for i in skipList]))
# [project, organism, libtypes, workflow, workflow status, parkour status, sambaUpdate]
return [project, organism, libTypes, None, None, ret, False]