@@ -39,6 +39,8 @@ def base_cl_options():
3939 help = "the JSON serialization library to use, default: {}" .format (DEFAULT_JSON ))
4040 parser .add_argument ('--include' , nargs = "+" , type = int , default = [],
4141 help = "list of SBP message IDs to include, empty means all" )
42+ parser .add_argument ("--unbuffered" , action = "store_true" ,
43+ help = "disable buffering when reading data from input (slower)" )
4244 parser .add_argument ('file' , nargs = '?' , metavar = 'FILE' , type = argparse .FileType ('rb' ),
4345 default = sys .stdin , help = "the input file, stdin by default" )
4446
@@ -61,6 +63,13 @@ def get_args():
6163 return args
6264
6365
66+ # return the read and expected CRCs from 'buf'
67+ def get_crcs (buf , payload_len ):
68+ crc_read , = struct .unpack ("<H" , buf [SBP_HEADER_LEN + payload_len :SBP_HEADER_LEN + payload_len + 2 ])
69+ crc_expected = binascii .crc_hqx (buf [1 :SBP_HEADER_LEN + payload_len ], 0 )
70+ return crc_read , crc_expected
71+
72+
6473def dump (args , res ):
6574 if 'json' == args .mode :
6675 sys .stdout .write (json .dumps (res .to_json_dict (),
@@ -72,8 +81,54 @@ def dump(args, res):
7281 sys .stdout .write ("\n " )
7382
7483
75- # generator to produce SBP messages from a file object
76- def iter_messages (fp ):
84+ # Generator which parses SBP messages from a file object.
85+ # Messages are read one at a time from the stream.
86+ def iter_messages_unbuffered (fp ):
87+ buf = b''
88+
89+ def read_bytes_until (target_len ):
90+ nonlocal buf
91+ while len (buf ) < target_len :
92+ read_bytes = fp .read (target_len - len (buf ))
93+ if len (read_bytes ) == 0 :
94+ raise IOError
95+ buf += read_bytes
96+
97+ while True :
98+ # read header
99+ try :
100+ read_bytes_until (SBP_HEADER_LEN )
101+ except IOError :
102+ return
103+
104+ # buf now contains at least SBP_HEADER_LEN bytes
105+
106+ preamble , msg_type , sender , payload_len = struct .unpack ("<BHHB" , buf [:SBP_HEADER_LEN ])
107+
108+ # check preamble
109+ if preamble != SBP_PREAMBLE :
110+ buf = buf [1 :] # drop first byte
111+ continue
112+
113+ # read payload and CRC
114+ try :
115+ read_bytes_until (SBP_HEADER_LEN + payload_len + 2 )
116+ except IOError :
117+ return
118+
119+ # check CRC
120+ crc_read , crc_expected = get_crcs (buf , payload_len )
121+ if crc_read == crc_expected :
122+ yield msg_type , sender , payload_len , buf [:SBP_HEADER_LEN + payload_len + 2 ], crc_read
123+ buf = buf [SBP_HEADER_LEN + payload_len + 3 :] # drop message
124+ else :
125+ sys .stderr .write ("CRC error: {} vs {} for msg type {}\n " .format (crc_read , crc_expected , msg_type ))
126+ buf = buf [1 :] # drop first byte
127+
128+
129+ # Generator which parses SBP messages from a file object.
130+ # Data is read from the stream in 4096 byte chunks.
131+ def iter_messages_buffered (fp ):
77132 buf = memoryview (bytearray (4096 ))
78133 unconsumed_offset = 0
79134 read_offset = 0
@@ -114,8 +169,7 @@ def iter_messages(fp):
114169 else :
115170 # check CRC
116171 b = b [:SBP_HEADER_LEN + payload_len + 2 ]
117- crc_read , = struct .unpack ("<H" , b [SBP_HEADER_LEN + payload_len :SBP_HEADER_LEN + payload_len + 2 ])
118- crc_expected = binascii .crc_hqx (b [1 :SBP_HEADER_LEN + payload_len ], 0 )
172+ crc_read , crc_expected = get_crcs (b , payload_len )
119173 if crc_read == crc_expected :
120174 yield msg_type , sender , payload_len , b , crc_read
121175 consumed = SBP_HEADER_LEN + payload_len + 2
@@ -132,8 +186,9 @@ def iter_messages(fp):
132186def sbp_main (args ):
133187 reader = io .open (args .file .fileno (), 'rb' )
134188 include = set (args .include )
189+ iter_fn = iter_messages_unbuffered if args .unbuffered else iter_messages_buffered
135190
136- for msg_type , sender , payload_len , buf , crc_read in iter_messages (reader ):
191+ for msg_type , sender , payload_len , buf , crc_read in iter_fn (reader ):
137192 msg_buf = buf [SBP_HEADER_LEN :SBP_HEADER_LEN + payload_len ]
138193 if not include or msg_type in include :
139194 try :
0 commit comments