66import logging
77import traceback
88from datetime import datetime
9- from concurrent .futures import ProcessPoolExecutor , as_completed
9+ from concurrent .futures import ThreadPoolExecutor , as_completed
1010
1111import click
1212from pyopenms import *
@@ -143,7 +143,7 @@ def ascore(
143143
144144 # Main processing loop
145145 start_time = time .time ()
146- processed_peptide_ids = []
146+ processed_peptide_ids = PeptideIdentificationList ()
147147
148148 # Process each PeptideIdentification (optionally in parallel)
149149 if max (1 , int (threads )) == 1 :
@@ -165,7 +165,7 @@ def ascore(
165165 )
166166
167167 if result ["status" ] == "success" :
168- processed_peptide_ids .append (result ["new_pid" ])
168+ processed_peptide_ids .push_back (result ["new_pid" ])
169169 stats ["processed" ] += 1
170170 phospho_count = len ([h for h in result ["new_pid" ].getHits ()
171171 if "(Phospho)" in h .getSequence ().toString ()])
@@ -187,13 +187,13 @@ def ascore(
187187 else :
188188 workers = max (1 , int (threads ))
189189 click .echo (
190- f"[{ time .strftime ('%H:%M:%S' )} ] Parallel execution with { workers } processes "
190+ f"[{ time .strftime ('%H:%M:%S' )} ] Parallel execution with { workers } threads "
191191 )
192192
193193 if debug :
194194 logger .info (f"Starting parallel processing with { workers } workers" )
195195
196- # Build serializable tasks in dict format
196+ # Build tasks - with threads we can pass objects directly (shared memory)
197197 params = {
198198 "fragment_mass_tolerance" : fragment_mass_tolerance ,
199199 "fragment_mass_unit" : fragment_mass_unit ,
@@ -208,7 +208,7 @@ def ascore(
208208 hit_payloads .append ({"sequence" : seq_str , "proforma" : proforma })
209209 tasks .append ({
210210 "idx" : idx ,
211- "mzml_path " : in_file ,
211+ "exp " : exp , # Pass spectrum object directly - shared between threads
212212 "params" : params ,
213213 "pid" : {
214214 "mz" : pid .getMZ (),
@@ -221,8 +221,8 @@ def ascore(
221221 logger .info (f"Created { len (tasks )} parallel tasks" )
222222
223223 indexed_results = {}
224- with ProcessPoolExecutor (max_workers = workers ) as executor :
225- futures = {executor .submit (_worker_process_pid , t ): t ["idx" ] for t in tasks }
224+ with ThreadPoolExecutor (max_workers = workers ) as executor :
225+ futures = {executor .submit (_worker_process_pid_threaded , t ): t ["idx" ] for t in tasks }
226226 for fut in as_completed (futures ):
227227 idx = futures [fut ]
228228 try :
@@ -277,7 +277,7 @@ def ascore(
277277 new_hits .append (new_hit )
278278
279279 new_pid .setHits (new_hits )
280- processed_peptide_ids .append (new_pid )
280+ processed_peptide_ids .push_back (new_pid )
281281 stats ["processed" ] += 1
282282 phospho_count = len ([h for h in new_pid .getHits () if "(Phospho)" in h .getSequence ().toString ()])
283283 stats ["phospho" ] += phospho_count
@@ -337,7 +337,7 @@ def load_identifications(idxml_file):
337337 """Load identification results"""
338338 print (f"[{ time .strftime ('%H:%M:%S' )} ] Loading identifications from { idxml_file } " )
339339 protein_ids = []
340- peptide_ids = []
340+ peptide_ids = PeptideIdentificationList ()
341341 IdXMLFile ().load (idxml_file , protein_ids , peptide_ids )
342342 print (f"Loaded { len (peptide_ids )} peptide identifications" )
343343 return protein_ids , peptide_ids
@@ -440,34 +440,24 @@ def find_spectrum_by_mz(exp, target_mz, rt=None, ppm_tolerance=10):
440440 return best_match
441441
442442
443- # ----------------------- Multiprocessing worker utilities -----------------------
444- _WORKER_EXP = None
443+ # ----------------------- Threading worker utilities -----------------------
444+ # Note: Using ThreadPoolExecutor instead of ProcessPoolExecutor allows threads
445+ # to share the spectrum data (exp object) directly without reloading the file.
446+ # This provides significant performance improvement for parallel processing.
445447
446448
447- def _worker_get_exp (mzml_file ):
448- global _WORKER_EXP
449- if _WORKER_EXP is None :
450- exp = MSExperiment ()
451- FileHandler ().loadExperiment (mzml_file , exp )
452- # Warm up spectrum index inside the worker for faster lookups
453- if exp .size () > 0 :
454- # Rebuild local cache for find_spectrum_by_mz in this process
455- if hasattr (find_spectrum_by_mz , "spectrum_list" ):
456- delattr (find_spectrum_by_mz , "spectrum_list" )
457- _ = find_spectrum_by_mz (exp , 0.0 , None )
458- _WORKER_EXP = exp
459- return _WORKER_EXP
449+ def _worker_process_pid_threaded (task ):
450+ """Thread-safe worker that uses shared spectrum data.
460451
461-
462- def _worker_process_pid (task ):
452+ Unlike process-based workers, threads share memory so we can pass
453+ the exp object directly without serialization or file reloading.
454+ """
463455 try :
464- mzml_path = task ["mzml_path" ]
456+ exp = task ["exp" ] # Shared spectrum object - no file reload needed
465457 pid_info = task ["pid" ]
466458 params = task ["params" ]
467459
468- exp = _worker_get_exp (mzml_path )
469-
470- # Find spectrum
460+ # Find spectrum (uses shared cache from main thread)
471461 spectrum = find_spectrum_by_mz (exp , pid_info ["mz" ], pid_info .get ("rt" ))
472462 if spectrum is None :
473463 return {"status" : "error" , "reason" : "spectrum_not_found" }
0 commit comments