44Replaces decombine.sh and decombine_viruses.sh.
55"""
66
7- import argparse
87import logging
98import re
9+ import sys
1010from pathlib import Path
11+ from typing import Optional
12+
13+ import typer
1114
1215from krakenparser .utils import ensure_output_dir
1316
1417_log = logging .getLogger (__name__ )
1518
19+ app = typer .Typer (
20+ name = "split" ,
21+ add_completion = False ,
22+ context_settings = {"help_option_names" : ["-h" , "--help" ]},
23+ )
1624
1725_RANKS = [
1826 ("species" , "s__" , []),
2331 ("phylum" , "p__" , ["s__" , "g__" , "f__" , "o__" , "c__" ]),
2432]
2533
26- _HUMAN_TAXA = {
27- "species" : "s__Homo_sapiens" ,
28- "genus" : "g__Homo" ,
29- "family" : "f__Hominidae" ,
30- "order" : "o__Primates" ,
31- "class" : "c__Mammalia" ,
32- "phylum" : "p__Chordata" ,
33- }
34+ _HUMAN_MARKERS = frozenset (
35+ [
36+ "s__Homo_sapiens" ,
37+ "g__Homo" ,
38+ "f__Hominidae" ,
39+ "o__Primates" ,
40+ "c__Mammalia" ,
41+ "p__Chordata" ,
42+ ]
43+ )
3444
3545_ACCESSION_RE = re .compile (r"(SRS|SRR|SRX|ERS|ERR|ERX|DRS|DRR|DRX)\d*-" )
3646
3747
3848def _strip_path_prefix (line : str ) -> str :
39- """'d__X|p__Y|s__Z\t 10\t 20' → 's__Z\t 10\t 20'"""
4049 tab = line .find ("\t " )
4150 if tab == - 1 :
4251 return line
@@ -46,10 +55,20 @@ def _strip_path_prefix(line: str) -> str:
4655 return _ACCESSION_RE .sub ("" , segment + rest )
4756
4857
58+ def _human_in_line (line : str ) -> bool :
59+ tab = line .find ("\t " )
60+ path = line [:tab ] if tab != - 1 else line
61+ segments = set (path .split ("|" ))
62+ return bool (segments & _HUMAN_MARKERS )
63+
64+
4965def split_mpa (
5066 input_file : str ,
5167 output_dir : str ,
5268 viruses_only : bool = False ,
69+ bacteria_only : bool = False ,
70+ fungi_only : bool = False ,
71+ archaea_only : bool = False ,
5372 keep_human : bool = False ,
5473) -> None :
5574 in_path = Path (input_file )
@@ -58,17 +77,28 @@ def split_mpa(
5877 out_path = ensure_output_dir (output_dir , is_file = False )
5978 (out_path / "txt" ).mkdir (exist_ok = True )
6079
61- lines = in_path .read_text ().splitlines ()
62- data_lines = [ln for ln in lines if not ln .startswith ("#" ) and ln .strip ()]
80+ all_lines = [
81+ ln
82+ for ln in in_path .read_text ().splitlines ()
83+ if not ln .startswith ("#" ) and ln .strip ()
84+ ]
6385
86+ data_lines = all_lines .copy ()
6487 if viruses_only :
6588 data_lines = [ln for ln in data_lines if "d__Viruses" in ln ]
89+ if bacteria_only :
90+ data_lines = [ln for ln in data_lines if "d__Bacteria" in ln ]
91+ if fungi_only :
92+ data_lines = [ln for ln in data_lines if "k__Fungi" in ln ]
93+ if archaea_only :
94+ data_lines = [ln for ln in data_lines if "d__Archaea" in ln ]
6695
67- filter_human = not keep_human and not viruses_only
96+ if keep_human :
97+ human_lines = [ln for ln in all_lines if _human_in_line (ln )]
98+ data_lines = list (dict .fromkeys (data_lines + human_lines ))
6899
69100 for rank_name , rank_prefix , exclude_prefixes in _RANKS :
70101 result = []
71- human_pattern = _HUMAN_TAXA [rank_name ]
72102
73103 for line in data_lines :
74104 if rank_prefix not in line :
@@ -77,7 +107,7 @@ def split_mpa(
77107 continue
78108 if any (ep in line for ep in exclude_prefixes ):
79109 continue
80- if filter_human and human_pattern in line :
110+ if not keep_human and _human_in_line ( line ) :
81111 continue
82112 result .append (_strip_path_prefix (line ))
83113
@@ -87,33 +117,75 @@ def split_mpa(
87117 _log .info ("MPA file split successfully. Output stored in %s" , output_dir )
88118
89119
90- def main () -> None :
91- logging .basicConfig (level = logging .INFO , format = "%(message)s" )
92- parser = argparse .ArgumentParser (
93- description = "Split a combined MPA table into per-rank TXT files."
94- )
95- parser .add_argument ("-i" , "--input" , required = True , help = "Input combined MPA file" )
96- parser .add_argument ("-o" , "--output" , required = True , help = "Output directory" )
97- parser .add_argument (
98- "--viruses-only" ,
99- action = "store_true" ,
100- default = False ,
101- help = "Extract only Viruses domain taxa" ,
102- )
103- parser .add_argument (
120+ @app .callback (invoke_without_command = True )
121+ def main (
122+ ctx : typer .Context ,
123+ input_file : Optional [str ] = typer .Option (
124+ None ,
125+ "-i" ,
126+ "--input" ,
127+ help = "Input combined MPA file." ,
128+ ),
129+ output_dir : Optional [str ] = typer .Option (
130+ None ,
131+ "-o" ,
132+ "--output" ,
133+ help = "Output directory." ,
134+ ),
135+ viruses_only : bool = typer .Option (
136+ False ,
137+ "--viruses" ,
138+ help = "Extract only VIRUSES domain taxa." ,
139+ ),
140+ bacteria_only : bool = typer .Option (
141+ False ,
142+ "--bacteria" ,
143+ help = "Extract only BACTERIA domain taxa." ,
144+ ),
145+ fungi_only : bool = typer .Option (
146+ False ,
147+ "--fungi" ,
148+ help = "Extract only FUNGI kingdom taxa." ,
149+ ),
150+ archaea_only : bool = typer .Option (
151+ False ,
152+ "--archaea" ,
153+ help = "Extract only ARCHAEA domain taxa." ,
154+ ),
155+ keep_human : bool = typer .Option (
156+ False ,
104157 "--keep-human" ,
105- action = "store_true" ,
106- default = False ,
107- help = "Do not filter human-related taxa (default: filtered)" ,
108- )
109- args = parser .parse_args ()
110- split_mpa (
111- args .input ,
112- args .output ,
113- viruses_only = args .viruses_only ,
114- keep_human = args .keep_human ,
115- )
158+ help = "Retain human-related taxa (default: filtered out)." ,
159+ ),
160+ ) -> None :
161+ """Split a combined MPA table into per-rank TXT files."""
162+ logging .basicConfig (level = logging .INFO , format = "%(message)s" )
163+
164+ if input_file is None and output_dir is None :
165+ print (ctx .get_help ())
166+ raise typer .Exit ()
167+
168+ if not input_file or not output_dir :
169+ print (
170+ "Error: Missing required options '-i / --input' and '-o / --output'." ,
171+ file = sys .stderr ,
172+ )
173+ raise typer .Exit (code = 1 )
174+
175+ try :
176+ split_mpa (
177+ input_file ,
178+ output_dir ,
179+ viruses_only = viruses_only ,
180+ bacteria_only = bacteria_only ,
181+ fungi_only = fungi_only ,
182+ archaea_only = archaea_only ,
183+ keep_human = keep_human ,
184+ )
185+ except FileNotFoundError as e :
186+ print (f"Error: { e } " , file = sys .stderr )
187+ raise typer .Exit (code = 1 )
116188
117189
118190if __name__ == "__main__" :
119- main ()
191+ app ()
0 commit comments