1
1
import struct
2
2
from dataclasses import dataclass
3
- from typing import Tuple
3
+ from io import BytesIO
4
+ from typing import Tuple , Union
4
5
5
6
import numpy as np
6
7
@@ -35,6 +36,8 @@ class ConstStructs:
35
36
Struct format for unsigned integer"""
36
37
37
38
39
+
40
+
38
41
class BufferReader :
39
42
"""
40
43
Class is used to read binary data from buffer
@@ -45,11 +48,18 @@ class BufferReader:
45
48
buffer from which to read data
46
49
read_offset: int
47
50
current read offset in buffer
51
+ read_skipped: int
52
+ how many bytes were skipped during reading
48
53
"""
49
54
50
- def __init__ (self , buffer : bytes ):
51
- self .buffer = buffer
55
+ def __init__ (self , buffer : Union [bytearray , bytes ]):
56
+ self .buffer : bytearray = buffer
57
+ self .total_bytes_read = len (buffer )
52
58
self .read_offset = 0
59
+ self .read_skipped = 0
60
+
61
+ def expect_to_read (self , n : int ):
62
+ pass
53
63
54
64
def bytes_left (self ):
55
65
"""
@@ -60,7 +70,7 @@ def bytes_left(self):
60
70
int
61
71
The number of bytes left to read.
62
72
"""
63
- return len (self .buffer ) - self .read_offset
73
+ return len (self .buffer ) - self .read_offset + self . read_skipped
64
74
65
75
def unpack_f (self , s_format : str ):
66
76
"""
@@ -97,7 +107,9 @@ def unpack_numpy(self, s: struct.Struct, shape: Tuple):
97
107
np.ndarray
98
108
The unpacked NumPy array.
99
109
"""
100
- arr = np .ndarray (shape , s .format , self .buffer , self .read_offset ).copy ()
110
+ self .expect_to_read (s .size * int (np .prod (shape )))
111
+
112
+ arr = np .ndarray (shape , s .format , self .buffer , self .read_offset - self .read_skipped ).copy ()
101
113
self .advance (s , int (np .prod (shape )))
102
114
return arr
103
115
@@ -155,7 +167,8 @@ def unpack(self, s: struct.Struct):
155
167
-------
156
168
Unpacked data as specified by the struct format.
157
169
"""
158
- unpack : tuple = s .unpack_from (self .buffer , self .read_offset )
170
+ self .expect_to_read (s .size )
171
+ unpack : tuple = s .unpack_from (self .buffer , self .read_offset - self .read_skipped )
159
172
self .advance (s )
160
173
if len (unpack ) == 1 :
161
174
return unpack [0 ]
@@ -174,6 +187,9 @@ def advance(self, s: struct.Struct, times=1):
174
187
"""
175
188
self .read_offset += s .size * times
176
189
190
+ def skip (self , s : struct .Struct , times = 1 ):
191
+ self .advance (s , times )
192
+
177
193
def unpack_str (self ) -> str :
178
194
"""
179
195
Unpacks a string from the buffer.
@@ -184,10 +200,34 @@ def unpack_str(self) -> str:
184
200
The unpacked string, encoded in UTF-8.
185
201
"""
186
202
length : int = self .unpack (ConstStructs .ushort )
203
+ self .expect_to_read (length )
187
204
bytes_ : bytes = self .unpack_f ("%ds" % length )
188
205
return bytes_ .decode ("utf-8" )
189
206
190
207
208
+ class BytesIOReader (BufferReader ):
209
+ def __init__ (self , reader : BytesIO ):
210
+ super ().__init__ (bytearray ())
211
+ self .reader = reader
212
+
213
+ def skip (self , s : struct .Struct , times = 1 ):
214
+ self .buffer = self .buffer [:self .read_offset ] # remove the bytes that were not used
215
+ self .read_skipped += s .size * times
216
+ super ().skip (s , times )
217
+
218
+ def read_chunk (self , chunk_size : int ):
219
+ self .reader .seek (self .read_offset , 0 ) # 0 means absolute seek
220
+ self .buffer .extend (self .reader .read (chunk_size ))
221
+ self .total_bytes_read += chunk_size
222
+
223
+ if not self .buffer :
224
+ raise EOFError ("End of file reached" )
225
+
226
+ def expect_to_read (self , n : int ):
227
+ if self .bytes_left () < n :
228
+ self .read_chunk (n - self .bytes_left ())
229
+
230
+
191
231
if __name__ == "__main__" :
192
232
from tqdm import tqdm
193
233
0 commit comments