|
9 | 9 | _SIGN_BITS = [0] + [(1 << (bits - 1)) for bits in range(1, 65)] |
10 | 10 |
|
11 | 11 |
|
12 | | -def read_value(data, offset_bits, num_bits, is_signed): |
13 | | - offset_bytes, offset_extra_bits = divmod(offset_bits, 8) |
14 | | - total_bytes = (num_bits + 7) // 8 |
15 | | - |
16 | | - if num_bits == 1: |
17 | | - return int((data[offset_bytes] & (1 << offset_extra_bits)) != 0) |
18 | | - |
19 | | - result = int.from_bytes(data[offset_bytes: offset_bytes + total_bytes], byteorder="little") |
20 | | - result >>= offset_extra_bits |
21 | | - if (total_bytes * 8 - offset_extra_bits) < num_bits: |
22 | | - remainder = data[offset_bytes + total_bytes] |
23 | | - result |= remainder << (total_bytes * 8 - offset_extra_bits) |
| 12 | +def make_field_reader(offset_bits, num_bits, is_signed): |
| 13 | + """Build a specialized closure for reading a single field from a structure. |
24 | 14 |
|
25 | | - if num_bits < 64 or offset_extra_bits > 0: |
26 | | - result = result & ((1 << num_bits) - 1) |
27 | | - |
28 | | - if not is_signed: |
29 | | - return result |
30 | | - |
31 | | - return (result & (_SIGN_BITS[num_bits] - 1)) - (result & _SIGN_BITS[num_bits]) |
32 | | - |
33 | | - |
34 | | -def write_value(data, offset_bits, num_bits, is_signed, value): |
35 | | - assert num_bits <= 64, f'Number of bits to write is greater than 64' |
36 | | - |
37 | | - offset_bytes, offset_extra_bits = divmod(offset_bits, 8) |
| 15 | + Returns a function reader(data, pos_bytes) that reads the field value |
| 16 | + from ``data`` at byte position ``pos_bytes``. All constants (byte offset, |
| 17 | + bit shift, mask, sign handling) are pre-computed and captured by the |
| 18 | + closure so the hot path does minimal work. |
| 19 | + """ |
| 20 | + offset_bytes, offset_extra = divmod(offset_bits, 8) |
38 | 21 | total_bytes = (num_bits + 7) // 8 |
39 | | - |
40 | | - if num_bits == 1: |
41 | | - if value == 1: |
42 | | - data[offset_bytes] |= 1 << offset_extra_bits |
43 | | - else: |
44 | | - data[offset_bytes] &= ~(1 << offset_extra_bits) |
45 | | - return |
46 | | - |
| 22 | + end_byte = offset_bytes + total_bytes |
47 | 23 | mask = (1 << num_bits) - 1 |
48 | | - value <<= offset_extra_bits |
49 | | - value &= mask << offset_extra_bits |
50 | | - value_in_little_endian = value.to_bytes(total_bytes + 1, byteorder="little", signed=is_signed) |
51 | | - surrounding_bits = data[offset_bytes] & ((1 << offset_bits) - 1) |
| 24 | + needs_extra = (total_bytes * 8 - offset_extra) < num_bits |
| 25 | + extra_shift = total_bytes * 8 - offset_extra |
52 | 26 |
|
53 | | - byte_idx = 0 |
54 | | - data[offset_bytes] = value_in_little_endian[byte_idx] |
55 | | - data[offset_bytes] |= surrounding_bits |
56 | | - |
57 | | - byte_idx += 1 |
58 | | - while byte_idx < total_bytes: |
59 | | - data[offset_bytes + byte_idx] = value_in_little_endian[byte_idx] |
60 | | - byte_idx += 1 |
| 27 | + if num_bits == 1: |
| 28 | + bit_mask = 1 << offset_extra |
| 29 | + def reader(data, pos): |
| 30 | + return int((data[pos + offset_bytes] & bit_mask) != 0) |
| 31 | + return reader |
61 | 32 |
|
62 | | - bits_written = total_bytes * 8 - offset_extra_bits |
63 | | - if bits_written < num_bits: |
64 | | - surrounding_bits = data[offset_bytes + byte_idx] & ~((1 << offset_bits) - 1) |
65 | | - data[offset_bytes + byte_idx] = value_in_little_endian[byte_idx] & ((1 << (8 - (bits_written % 8))) - 1) |
66 | | - data[offset_bytes + byte_idx] |= surrounding_bits |
| 33 | + if is_signed: |
| 34 | + sign_bit = _SIGN_BITS[num_bits] |
| 35 | + sign_mask = sign_bit - 1 |
| 36 | + if needs_extra: |
| 37 | + def reader(data, pos): |
| 38 | + result = int.from_bytes( |
| 39 | + data[pos + offset_bytes: pos + end_byte], byteorder="little") |
| 40 | + result >>= offset_extra |
| 41 | + result |= data[pos + end_byte] << extra_shift |
| 42 | + result &= mask |
| 43 | + return (result & sign_mask) - (result & sign_bit) |
| 44 | + elif offset_extra: |
| 45 | + def reader(data, pos): |
| 46 | + result = (int.from_bytes( |
| 47 | + data[pos + offset_bytes: pos + end_byte], |
| 48 | + byteorder="little") >> offset_extra) & mask |
| 49 | + return (result & sign_mask) - (result & sign_bit) |
| 50 | + else: |
| 51 | + def reader(data, pos): |
| 52 | + result = int.from_bytes( |
| 53 | + data[pos + offset_bytes: pos + end_byte], |
| 54 | + byteorder="little") & mask |
| 55 | + return (result & sign_mask) - (result & sign_bit) |
| 56 | + return reader |
| 57 | + |
| 58 | + # Unsigned paths |
| 59 | + if needs_extra: |
| 60 | + def reader(data, pos): |
| 61 | + result = int.from_bytes( |
| 62 | + data[pos + offset_bytes: pos + end_byte], byteorder="little") |
| 63 | + result >>= offset_extra |
| 64 | + result |= data[pos + end_byte] << extra_shift |
| 65 | + return result & mask |
| 66 | + elif offset_extra: |
| 67 | + def reader(data, pos): |
| 68 | + return (int.from_bytes( |
| 69 | + data[pos + offset_bytes: pos + end_byte], |
| 70 | + byteorder="little") >> offset_extra) & mask |
| 71 | + else: |
| 72 | + def reader(data, pos): |
| 73 | + return int.from_bytes( |
| 74 | + data[pos + offset_bytes: pos + end_byte], |
| 75 | + byteorder="little") & mask |
| 76 | + return reader |
67 | 77 |
|
68 | 78 |
|
69 | 79 | def read_field_vectorized(raw_bytes_2d, field_offset_bits, field_width_bits, is_signed): |
@@ -110,3 +120,49 @@ def read_field_vectorized(raw_bytes_2d, field_offset_bits, field_width_bits, is_ |
110 | 120 | result = np.where(result & sign_bit, signed, result.astype(np.int64)) |
111 | 121 |
|
112 | 122 | return result |
| 123 | + |
| 124 | + |
| 125 | +def read_value(data, offset_bits, num_bits, is_signed): |
| 126 | + """Read a bit-packed value from data at the given bit offset. |
| 127 | +
|
| 128 | + This is a convenience wrapper around :func:`make_field_reader` for one-off |
| 129 | + reads. For repeated reads of the same field, prefer building a reader once |
| 130 | + with ``make_field_reader`` and reusing it. |
| 131 | + """ |
| 132 | + reader = make_field_reader(offset_bits, num_bits, is_signed) |
| 133 | + return reader(data, 0) |
| 134 | + |
| 135 | + |
| 136 | +def write_value(data, offset_bits, num_bits, is_signed, value): |
| 137 | + assert num_bits <= 64, f'Number of bits to write is greater than 64' |
| 138 | + |
| 139 | + offset_bytes, offset_extra_bits = divmod(offset_bits, 8) |
| 140 | + total_bytes = (num_bits + 7) // 8 |
| 141 | + |
| 142 | + if num_bits == 1: |
| 143 | + if value == 1: |
| 144 | + data[offset_bytes] |= 1 << offset_extra_bits |
| 145 | + else: |
| 146 | + data[offset_bytes] &= ~(1 << offset_extra_bits) |
| 147 | + return |
| 148 | + |
| 149 | + mask = (1 << num_bits) - 1 |
| 150 | + value <<= offset_extra_bits |
| 151 | + value &= mask << offset_extra_bits |
| 152 | + value_in_little_endian = value.to_bytes(total_bytes + 1, byteorder="little", signed=is_signed) |
| 153 | + surrounding_bits = data[offset_bytes] & ((1 << offset_bits) - 1) |
| 154 | + |
| 155 | + byte_idx = 0 |
| 156 | + data[offset_bytes] = value_in_little_endian[byte_idx] |
| 157 | + data[offset_bytes] |= surrounding_bits |
| 158 | + |
| 159 | + byte_idx += 1 |
| 160 | + while byte_idx < total_bytes: |
| 161 | + data[offset_bytes + byte_idx] = value_in_little_endian[byte_idx] |
| 162 | + byte_idx += 1 |
| 163 | + |
| 164 | + bits_written = total_bytes * 8 - offset_extra_bits |
| 165 | + if bits_written < num_bits: |
| 166 | + surrounding_bits = data[offset_bytes + byte_idx] & ~((1 << offset_bits) - 1) |
| 167 | + data[offset_bytes + byte_idx] = value_in_little_endian[byte_idx] & ((1 << (8 - (bits_written % 8))) - 1) |
| 168 | + data[offset_bytes + byte_idx] |= surrounding_bits |
0 commit comments