99from exceptions import FirmwareUploadFail , InvalidUploadTool , UploadToolNotFound
1010
1111
12+ class StreamReader :
13+ """Reads a stream byte-by-byte, treating both \\ n and \\ r as line delimiters.
14+
15+ Automatically sends buffered data after 1 second of inactivity.
16+ """
17+
18+ def __init__ (
19+ self ,
20+ stream : asyncio .StreamReader ,
21+ stream_name : str ,
22+ output_callback : Optional [Callable [[str , str ], Awaitable [None ]]] = None ,
23+ timeout : float = 1.0 ,
24+ ) -> None :
25+ self .stream = stream
26+ self .stream_name = stream_name
27+ self .output_callback = output_callback
28+ self .timeout = timeout
29+ self .buffer = b""
30+ self .lines : list [str ] = []
31+
32+ async def _send_line (self , line : str ) -> None :
33+ """Send a decoded line through the callback and logger."""
34+ if line :
35+ logger .debug (f"[{ self .stream_name } ] { line } " )
36+ self .lines .append (line )
37+ if self .output_callback :
38+ await self .output_callback (self .stream_name , line )
39+
40+ async def _flush_buffer (self ) -> None :
41+ """Flush the current buffer and send as a line."""
42+ if self .buffer :
43+ decoded_line = self .buffer .decode ().rstrip ("\r \n " )
44+ await self ._send_line (decoded_line )
45+ self .buffer = b""
46+
47+ async def read_all (self ) -> None :
48+ """Read all data from the stream until it closes."""
49+ while True :
50+ try :
51+ # Try to read one byte with timeout
52+ chunk = await asyncio .wait_for (self .stream .read (1 ), timeout = self .timeout )
53+ if not chunk :
54+ # End of stream - send any remaining buffer
55+ await self ._flush_buffer ()
56+ break
57+
58+ self .buffer += chunk
59+
60+ # Check if we hit a line delimiter (\n or \r)
61+ if chunk in (b"\n " , b"\r " ):
62+ decoded_line = self .buffer .decode ().rstrip ("\r \n " )
63+ await self ._send_line (decoded_line )
64+ self .buffer = b""
65+ except asyncio .TimeoutError :
66+ # Timeout passed without new data - send buffer if non-empty
67+ await self ._flush_buffer ()
68+
69+
1270class FirmwareUploader :
1371 def __init__ (self ) -> None :
1472 self ._autopilot_port : pathlib .Path = pathlib .Path ("/dev/autopilot" )
@@ -60,30 +118,16 @@ async def upload(
60118 stderr = asyncio .subprocess .PIPE ,
61119 shell = True ,
62120 )
63- errors = []
121+ stdout_reader = StreamReader (process .stdout , "stdout" , output_callback ) if process .stdout else None
122+ stderr_reader = StreamReader (process .stderr , "stderr" , output_callback ) if process .stderr else None
64123
65124 async def read_stdout () -> None :
66- if process .stdout :
67- while True :
68- line = await process .stdout .readline ()
69- if not line :
70- break
71- decoded_line = line .decode ().rstrip ("\n " )
72- logger .debug (f"[stdout] { decoded_line } " )
73- if output_callback :
74- await output_callback ("stdout" , decoded_line )
125+ if stdout_reader :
126+ await stdout_reader .read_all ()
75127
76128 async def read_stderr () -> None :
77- if process .stderr :
78- while True :
79- line = await process .stderr .readline ()
80- if not line :
81- break
82- decoded_line = line .decode ().rstrip ("\n " )
83- logger .debug (f"[stderr] { decoded_line } " )
84- errors .append (decoded_line )
85- if output_callback :
86- await output_callback ("stderr" , decoded_line )
129+ if stderr_reader :
130+ await stderr_reader .read_all ()
87131
88132 try :
89133 # Run both stream readers and process wait concurrently with a single timeout
@@ -98,6 +142,7 @@ async def read_stderr() -> None:
98142 raise FirmwareUploadFail ("Unable to upload firmware to board." ) from error
99143 finally :
100144 return_code = process .returncode
145+ errors = stderr_reader .lines if stderr_reader else []
101146 if errors and return_code != 0 :
102147 raise FirmwareUploadFail (f"Upload process returned errors: { errors } return code: { return_code } " )
103148 if return_code != 0 :
0 commit comments