forked from bendmorris/ncbi_taxonomy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathncbi_taxonomy.py
More file actions
173 lines (140 loc) · 6.92 KB
/
ncbi_taxonomy.py
File metadata and controls
173 lines (140 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
'''
Looks up taxonomic information from NCBI's taxonomy database given a taxon id list.
Will download the taxonomy id database dump files from NCBI to a folder called
data in the current working directory.
This is a modified version of ncbi_taxonomy by Ben Morris (https://github.com/bendmorris/ncbi_taxonomy)
'''
import urllib2
import os
import tarfile
import sys
import argparse
import re
col_delimiter = '\t|\t'
row_delimiter = '\t|\n'
url = 'ftp://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump.tar.gz'
tree_filename = 'ncbi_taxonomy.newick'
data_dir = 'data/'
linnean=["superkingdom", "kingdom", "phylum", "class", "order", "family", "genus", "species"]
def readargs():
'''
Read in program arguments
'''
parser = argparse.ArgumentParser(description="Looks up the Genbank taxon ID and returns rankings")
parser.add_argument("FILE", help="input file with the taxon IDs.", type=str)
parser.add_argument("-c", "--column", help="the column in the input file with the taxon IDs (default: 1)", default=1, type=int)
parser.add_argument("-r", "--ranks", help="ranks to output. Options: all, linnean, named (for all that are NOT \'no rank\') or a list of ranks given in quotes. Examples: -r all; -r \"superkingdom order genus\" ", default="linnean", type=str)
parser.add_argument("-d", "--download", help="force a re-download of the current taxonomy database (WILL OVERWRITE PREVIOUS VERSION)", action="store_true")
args = parser.parse_args()
if args.ranks == "linnean":
args.ranks = "superkingdom kingdom phylum class order family genus species"
args.ranks = re.split('[\s,]+',args.ranks) #split rank list on commas or spaces
args.ranks = [x.lower() for x in args.ranks] #convert to lowercase- all ranks in the taxanomy db are lowercase
return args
def downloadtax():
'''
Download and extract ncbi taxonomy database dump
from ncbi_taxonomy by Ben Morris (https://github.com/bendmorris/ncbi_taxonomy)
'''
if not os.path.exists(data_dir):
os.mkdir(data_dir)
#Delete old versions if they're present
if os.path.isfile(data_dir+"nodes.dmp"): os.remove(data_dir+"nodes.dmp")
if os.path.isfile(data_dir+"names.dmp"): os.remove(data_dir+"names.dmp")
if os.path.isfile(data_dir+"taxdump.tar.gz"): os.remove(data_dir+"taxdump.tar.gz")
# download the taxonomy archive
filename = os.path.join(data_dir, url.split('/')[-1])
sys.stderr.write('Downloading %s...\n' % filename)
r = urllib2.urlopen(urllib2.Request(url))
assert r.geturl() == url
with open(filename, 'wb') as output_file:
output_file.write(r.read())
r.close()
# extract the text dump
for extract in ('nodes.dmp', 'names.dmp'):
sys.stderr.write('Extracting %s from %s...\n' % (extract, filename))
archive = tarfile.open(name=filename, mode='r:gz')
archive.extract(extract, path=data_dir)
archive.close()
def readintaxa (taxa):
'''
from ncbi_taxonomy by Ben Morris (https://github.com/bendmorris/ncbi_taxonomy)
'''
# get names for all tax_ids from names.dmp
sys.stderr.write('Reading names...\n')
with open(os.path.join(data_dir, 'names.dmp')) as names_file:
for line in names_file:
line = line.rstrip(row_delimiter)
values = line.split(col_delimiter)
tax_id, name_txt, _, name_type = values[:4]
if name_type == 'scientific name':
taxa[tax_id] = {}
taxa[tax_id]['name'] = name_txt
# read all node info from nodes.dmp
sys.stderr.write('Reading taxonomy...\n')
with open(os.path.join(data_dir, 'nodes.dmp')) as nodes_file:
for line in nodes_file:
line = line.rstrip(row_delimiter)
values = line.split(col_delimiter)
tax_id, parent_id, rank = values[:3]
taxa[tax_id]['parent'] = parent_id
taxa[tax_id]['rank'] = rank
def lookuptaxa(taxa, tax_id, rankings):
'''
Print name of tax_id and then get information on parent
'''
if tax_id in taxa:
rankings.append({'rank':taxa[tax_id]['rank'], 'value':taxa[tax_id]['name']})
#lookup info of parent unless at root
if taxa[tax_id]['parent'] != tax_id:
lookuptaxa(taxa, taxa[tax_id]['parent'], rankings)
def printranking(rankings, ranksToPrint):
'''
Print the generated ranking for the ranks selected
'''
for ranking in rankings:
if "all" in ranksToPrint or ranking['rank'] in ranksToPrint or ("named" in ranksToPrint and ranking['rank'] != "no rank"):
print "\t", ranking['value'],
def main():
args = readargs()
#print args
taxonomyread = False
#Parse input (filename given at program execution)
endl = os.linesep
linecount = 0
with open(args.FILE, 'r') as f:
for line in f:
linecount += 1
if not line.strip().startswith("#"): #Ignore lines that start with #
values = line.strip(endl).split("\t")
if len(values) < args.column:
sys.exit("ERROR: You said to look in column "+str(args.column)+" for taxon IDs, but line "+str(linecount)+" doesn\'t have that many columns, it has "+str(len(values)))
taxid = values[args.column-1].strip()
#If a blast result has >1 taxon associated with it, BLAST returns a semicolon seperated list of taxon IDs. The script will use the first and give a warning
if len(taxid.split(";")) > 1:
sys.stderr.write("WARNING: '"+str(taxid)+"' appears to contain >1 Taxon ID speperated by \';\', using the first\n")
taxid = taxid.split(";")[0]
taxid.strip()
#Download and read taxonomy
if not taxonomyread:
#Check that the input file exists before reading taxonomy (or exit)
if not os.path.isfile(args.FILE): sys.exit("ERROR: File \""+args.FILE+"\" does not exist")
#Download the taxonomy files if they're not present or user has chosen to re-download
if not os.path.isfile(data_dir+"names.dmp") or not os.path.isfile(data_dir+"nodes.dmp") or args.download: downloadtax()
taxa = {}
readintaxa(taxa)
taxonomyread = True
if taxid in taxa:
rankings=[]
lookuptaxa(taxa,taxid,rankings)
rankings.reverse()
print line.strip(endl),
printranking(rankings, args.ranks)
print
else:
sys.stderr.write("Taxa ID '"+str(taxid)+"' not found\n")
print line.strip(endl) #If Taxa ID isn't found, still output the original line
else:
print line.strip(endl)
if __name__ == '__main__':
main()