11import gzip
22import pathlib
3+ import select
34import subprocess
45import sys
56import textwrap
@@ -22,63 +23,69 @@ def get_project_root():
2223
2324
2425def execute_command (command , log_file_path = None ):
25- # Wrap command with proper error handling
2626 # set -e: exit on error, -u: exit on unset variable, -o pipefail: pipeline fails if any command fails
2727 prepared_command = f'set -euo pipefail; { textwrap .dedent (command )} '
28-
2928 logger .info ("Executing command: %s" , command .strip ())
3029
31- # Open the log file if provided
32- log_file = log_file_path .open ('a' , encoding = 'utf-8' ) if log_file_path else None
33-
34- # Prepare environment for subprocess (inherit current environment)
35- env = subprocess .os .environ .copy ()
36-
37- # Launch process with combined stdout and stderr streams, and line buffering enabled.
3830 process = subprocess .Popen (
3931 prepared_command ,
4032 shell = True ,
4133 executable = '/bin/bash' ,
4234 stdout = subprocess .PIPE ,
43- stderr = subprocess .STDOUT ,
35+ stderr = subprocess .PIPE ,
4436 text = True ,
4537 encoding = 'utf-8' ,
4638 bufsize = 1 , # line buffered
47- env = env
4839 )
4940
50- output_lines = []
51- # Iterate over each line as it becomes available
52- with process .stdout :
53- for line in iter (process .stdout .readline , '' ):
54- if line :
55- # Filter out bash libtinfo.so.6 warnings
56- if 'libtinfo.so.6: no version information available' not in line :
57- logger .info (line .strip ())
58- output_lines .append (line )
59- if log_file :
60- log_file .write (line )
61- log_file .flush () # flush immediately for real-time logging
62- process .wait () # wait for the process to complete
63-
64- if log_file :
65- log_file .close ()
66-
67- result = SimpleNamespace (
68- stdout = '' .join (output_lines ),
41+ stdout_lines = []
42+ stderr_lines = []
43+ stream_map = {
44+ process .stdout : (stdout_lines , logger .info ),
45+ process .stderr : (stderr_lines , logger .warning ),
46+ }
47+ open_streams = set (stream_map )
48+ log_file = log_file_path .open ('a' , encoding = 'utf-8' ) if log_file_path else None
49+
50+ try :
51+ # select multiplexes stdout and stderr in a single thread, preserving arrival order
52+ # and preventing pipe buffer deadlock without threading races on log_file writes
53+ while open_streams :
54+ readable , _ , _ = select .select (open_streams , [], [])
55+ for stream in readable :
56+ line = stream .readline ()
57+ if line :
58+ # Filter out bash libtinfo.so.6 warnings
59+ if 'libtinfo.so.6: no version information available' not in line :
60+ lines , log_fn = stream_map [stream ]
61+ log_fn (line .rstrip ())
62+ lines .append (line )
63+ if log_file :
64+ log_file .write (line )
65+ log_file .flush ()
66+ else :
67+ open_streams .discard (stream )
68+ finally :
69+ process .wait ()
70+ if log_file :
71+ log_file .close ()
72+
73+ if process .returncode != 0 :
74+ logger .error ("Command failed with return code %d: %s" , process .returncode , command .strip ())
75+ raise subprocess .CalledProcessError (
76+ process .returncode , command ,
77+ output = '' .join (stdout_lines ),
78+ stderr = '' .join (stderr_lines ),
79+ )
80+
81+ return SimpleNamespace (
82+ stdout = '' .join (stdout_lines ),
83+ stderr = '' .join (stderr_lines ),
6984 returncode = process .returncode ,
7085 pid = process .pid ,
71- command = command
86+ command = command ,
7287 )
7388
74- # Raise exception on non-zero return code
75- if result .returncode != 0 :
76- error_msg = f"Command failed with return code { result .returncode } : { command .strip ()} "
77- logger .error (error_msg )
78- raise subprocess .CalledProcessError (result .returncode , command , output = '' .join (output_lines ))
79-
80- return result
81-
8289def count_vcf_records (fp ):
8390 result = execute_command (f'bcftools view -H { fp } | wc -l' )
8491 return int (result .stdout .strip ())
0 commit comments