-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.nf
More file actions
216 lines (178 loc) · 5.42 KB
/
main.nf
File metadata and controls
216 lines (178 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/env nextflow
/*
* =============================================================
* EPMC Ontology Annotator – Nextflow Pipeline
* =============================================================
*
* Annotates EuropePMC abstracts with ontology terms using
* ols_text_tagger, then loads results into SQLite.
*
* Input: NDJSON citation files (citations_db_*.json)
* Output: Per-chunk tagged entity TSVs, abstract TSVs, and
* a combined SQLite database.
*
* Usage:
* nextflow run main.nf \
* --input_dir /path/to/citation_chunks \
* --tagger_db /path/to/text_tagger_db.bin \
* --ontologies ecto,hpo,mondo \
* --outdir results
* =============================================================
*/
nextflow.enable.dsl = 2
// --------------- Parameters ---------------
params.input_dir = null // Directory containing citations_db_*.json
params.tagger_db = null // Path to text_tagger_db.bin
params.outdir = 'results'
// Tagger options
params.ontologies = null // Comma-separated ontology IDs (e.g. ecto,hpo,mondo). Null = all.
params.min_label_len = 6
params.delimiters = " ,.;:!?()[]{}\"'/\\\t\n-+="
params.progress = 100000
// SQLite
params.db_name = 'epmc_tags.sqlite'
// --------------- Input validation ---------------
if (!params.input_dir) {
error "ERROR: --input_dir is required (directory with citations_db_*.json files)"
}
if (!params.tagger_db) {
error "ERROR: --tagger_db is required (path to text_tagger_db.bin)"
}
// --------------- Processes ---------------
/*
* TAG_CHUNK
*
* Runs tag_chunk.py on a single NDJSON citation chunk.
* Produces two outputs:
* - tagged_<chunk_id>.tsv (id, pmcid, ontology_id, term_iri, term_label)
* - abstracts_<chunk_id>.tsv (id, abstract)
*/
process TAG_CHUNK {
container 'ghcr.io/ebispot/epmc_ontology_annotator:dev'
tag "${chunk.simpleName}"
memory '6 GB'
cpus 1
time '6h'
input:
path chunk
path tagger_db
output:
path "tagged_*.tsv", emit: tagged
path "abstracts_*.tsv", emit: abstracts
script:
def chunk_id = chunk.simpleName.replaceAll(/^citations_db_/, '')
def ont_flag = params.ontologies ? "--ontologies ${params.ontologies}" : ""
"""
tag_chunk.py \\
-i ${chunk} \\
-o tagged_${chunk_id}.tsv \\
--abstracts abstracts_${chunk_id}.tsv \\
--tagger ols_text_tagger \\
--db ${tagger_db} \\
--min-label-len ${params.min_label_len} \\
--progress ${params.progress} \\
${ont_flag}
"""
}
/*
* LOAD_SQLITE
*
* Collects all tagged entity TSVs and abstract TSVs, then loads
* them into a single SQLite database.
*/
process LOAD_SQLITE {
container 'ghcr.io/ebispot/epmc_ontology_annotator:dev'
publishDir "${params.outdir}", mode: 'copy'
memory '16 GB'
cpus 1
time '2h'
input:
path tagged_files
path abstract_files
output:
path "${params.db_name}", emit: database
script:
"""
cat tagged_*.tsv > all_tagged.tsv
cat abstracts_*.tsv > all_abstracts.tsv
sqlite3 ${params.db_name} <<'EOF'
PRAGMA journal_mode=OFF;
PRAGMA synchronous=OFF;
PRAGMA cache_size=-512000;
CREATE TABLE tagged_entities (
id TEXT NOT NULL,
pmcid TEXT,
ontology_id TEXT NOT NULL,
term_iri TEXT NOT NULL,
term_label TEXT
);
CREATE TABLE abstracts (
id TEXT,
title TEXT,
abstract TEXT
);
.mode tabs
.import all_tagged.tsv tagged_entities
.import all_abstracts.tsv abstracts
CREATE INDEX idx_entities_id ON tagged_entities(id);
CREATE INDEX idx_entities_ontology ON tagged_entities(ontology_id);
CREATE INDEX idx_entities_iri ON tagged_entities(term_iri);
CREATE INDEX idx_entities_label ON tagged_entities(term_label);
CREATE INDEX idx_abstracts_id ON abstracts(id);
EOF
rm -f all_tagged.tsv all_abstracts.tsv
"""
}
/*
* SUMMARISE
*
* Produces a per-ontology label count CSV from the SQLite database.
* Lightweight summary step useful for quick QC.
*/
process SUMMARISE {
container 'ghcr.io/ebispot/epmc_ontology_annotator:dev'
publishDir "${params.outdir}", mode: 'copy'
memory '4 GB'
cpus 1
time '30m'
input:
path database
output:
path "ontology_summary.csv", emit: summary
script:
"""
sqlite3 -header -csv ${database} \\
"SELECT ontology_id, term_label, COUNT(*) AS count \
FROM tagged_entities \
GROUP BY ontology_id, term_label \
ORDER BY count DESC" \
> ontology_summary.csv
"""
}
// --------------- Workflow ---------------
workflow {
// Channels
ch_chunks = Channel.fromPath("${params.input_dir}/citations_db_*.json")
.ifEmpty { error "No citations_db_*.json files found in ${params.input_dir}" }
ch_db = Channel.value(file(params.tagger_db))
// 1. Tag each chunk in parallel
TAG_CHUNK(ch_chunks, ch_db)
// 2. Collect all tagged & abstract TSVs, load into SQLite
LOAD_SQLITE(
TAG_CHUNK.out.tagged.collect(),
TAG_CHUNK.out.abstracts.collect()
)
// 3. Produce summary statistics
SUMMARISE(LOAD_SQLITE.out.database)
}
// --------------- On-complete ---------------
workflow.onComplete {
log.info """
======================
Pipeline complete
======================
Status : ${workflow.success ? 'SUCCESS' : 'FAILED'}
Duration : ${workflow.duration}
Output : ${params.outdir}
""".stripIndent()
}