Skip to content

Commit 1ee7edb

Browse files
ngseanpasino
andcommitted
fix(biometrics): address review feedback — shared parser, cleanup, recovery
- Extract _read_raw_record into modules/common/cbor_raw.py to eliminate duplication between piezo-processor and sleep-detector - Add file handle cleanup on shutdown to piezo-processor (was already present in sleep-detector) - Add consecutive-failure counter to both modules: after 5 failures at the same file position, skip forward 1 byte to resync past corrupt data instead of retrying forever - Narrow exception handler from bare `except Exception` to `except (ValueError, cbor2.CBORDecodeError, OSError)` so only parsing/IO errors are retried Original CBOR fix ported from throwaway31265/free-sleep#46 by @seanpasino — thank you! Co-Authored-By: seanpasino <seanpasino@users.noreply.github.com>
1 parent 615b6ef commit 1ee7edb

3 files changed

Lines changed: 132 additions & 154 deletions

File tree

modules/common/cbor_raw.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""
2+
Shared CBOR record reader for SleepyPod biometrics modules.
3+
4+
The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks,
5+
so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual
6+
record size. Since RAW file records are typically 17-5000 bytes, this causes
7+
nearly every record to be skipped silently.
8+
9+
This module provides a manual parser for the outer {seq, data} CBOR wrapper
10+
that reads byte-by-byte using f.read(), keeping f.tell() accurate.
11+
12+
See: https://github.com/throwaway31265/free-sleep/pull/46
13+
"""
14+
15+
import struct
16+
17+
18+
def read_raw_record(f):
19+
"""Parse one outer {seq, data} CBOR record from file object *f*.
20+
21+
Returns the raw inner data bytes, or ``None`` for empty placeholder
22+
records (Pod firmware writes ``data=b''`` as sequence-number markers).
23+
24+
Raises:
25+
EOFError: End of file (no more data to read).
26+
ValueError: Malformed CBOR structure.
27+
"""
28+
b = f.read(1)
29+
if not b:
30+
raise EOFError
31+
if b[0] != 0xa2:
32+
raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0])
33+
34+
# "seq" key — text(3) "seq"
35+
if f.read(4) != b'\x63\x73\x65\x71':
36+
raise ValueError('Expected seq key')
37+
38+
# seq value — uint32 (0x1a) or uint64 (0x1b)
39+
hdr = f.read(1)
40+
if not hdr:
41+
raise EOFError
42+
if hdr[0] == 0x1a:
43+
seq_bytes = f.read(4)
44+
if len(seq_bytes) < 4:
45+
raise EOFError
46+
elif hdr[0] == 0x1b:
47+
seq_bytes = f.read(8)
48+
if len(seq_bytes) < 8:
49+
raise EOFError
50+
else:
51+
raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0])
52+
53+
# "data" key — text(4) "data"
54+
if f.read(5) != b'\x64\x64\x61\x74\x61':
55+
raise ValueError('Expected data key')
56+
57+
# data value — byte string length
58+
bs = f.read(1)
59+
if not bs:
60+
raise EOFError
61+
ai = bs[0] & 0x1f
62+
if ai <= 23:
63+
length = ai
64+
elif ai == 24:
65+
lb = f.read(1)
66+
if not lb:
67+
raise EOFError
68+
length = lb[0]
69+
elif ai == 25:
70+
lb = f.read(2)
71+
if len(lb) < 2:
72+
raise EOFError
73+
length = struct.unpack('>H', lb)[0]
74+
elif ai == 26:
75+
lb = f.read(4)
76+
if len(lb) < 4:
77+
raise EOFError
78+
length = struct.unpack('>I', lb)[0]
79+
else:
80+
raise ValueError('Unsupported length encoding: %d' % ai)
81+
82+
data = f.read(length)
83+
if len(data) < length:
84+
raise EOFError
85+
if not data:
86+
return None # empty placeholder record, caller should skip
87+
return data

modules/piezo-processor/main.py

Lines changed: 25 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
from collections import deque
2828
from typing import Optional
2929

30-
import struct
30+
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
3131

3232
import cbor2
33+
from common.cbor_raw import read_raw_record
3334
import numpy as np
3435
from scipy.signal import butter, filtfilt, welch
3536
import heartpy as hp
@@ -174,82 +175,12 @@ def compute_breathing_rate(samples: np.ndarray, fs: float = SAMPLE_RATE) -> Opti
174175
log.debug("Breathing rate computation failed: %s", e)
175176
return None
176177

177-
# ---------------------------------------------------------------------------
178-
# CBOR record reader
179-
# ---------------------------------------------------------------------------
180-
181-
def _read_raw_record(f):
182-
"""
183-
Manually parse one outer {seq, data} CBOR record using f.read().
184-
185-
The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks,
186-
so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual
187-
record size. Since RAW file records are typically 17-5000 bytes, this causes
188-
nearly every record to be skipped silently.
189-
190-
This function parses the outer {seq: uint, data: bytes} wrapper byte-by-byte
191-
using f.read(), keeping f.tell() accurate after each record.
192-
193-
Returns the raw inner data bytes, or None for empty placeholder records
194-
(which the Pod firmware writes as sequence number markers with data=b'').
195-
Raises EOFError at end of file, ValueError on malformed data.
196-
"""
197-
b = f.read(1)
198-
if not b:
199-
raise EOFError
200-
if b[0] != 0xa2:
201-
raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0])
202-
if f.read(4) != b'\x63\x73\x65\x71':
203-
raise ValueError('Expected seq key')
204-
hdr = f.read(1)
205-
if not hdr:
206-
raise EOFError
207-
if hdr[0] == 0x1a:
208-
seq_bytes = f.read(4)
209-
if len(seq_bytes) < 4:
210-
raise EOFError
211-
elif hdr[0] == 0x1b:
212-
seq_bytes = f.read(8)
213-
if len(seq_bytes) < 8:
214-
raise EOFError
215-
else:
216-
raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0])
217-
if f.read(5) != b'\x64\x64\x61\x74\x61':
218-
raise ValueError('Expected data key')
219-
bs = f.read(1)
220-
if not bs:
221-
raise EOFError
222-
ai = bs[0] & 0x1f
223-
if ai <= 23:
224-
length = ai
225-
elif ai == 24:
226-
lb = f.read(1)
227-
if not lb:
228-
raise EOFError
229-
length = lb[0]
230-
elif ai == 25:
231-
lb = f.read(2)
232-
if len(lb) < 2:
233-
raise EOFError
234-
length = struct.unpack('>H', lb)[0]
235-
elif ai == 26:
236-
lb = f.read(4)
237-
if len(lb) < 4:
238-
raise EOFError
239-
length = struct.unpack('>I', lb)[0]
240-
else:
241-
raise ValueError('Unsupported length encoding: %d' % ai)
242-
data = f.read(length)
243-
if len(data) < length:
244-
raise EOFError
245-
if not data:
246-
return None # empty placeholder record, caller should skip
247-
return data
248-
249178
# ---------------------------------------------------------------------------
250179
# RAW file follower
251180
# ---------------------------------------------------------------------------
252181

182+
MAX_CONSECUTIVE_FAILURES = 5
183+
253184
class RawFileFollower:
254185
"""
255186
Follows the newest .RAW file in RAW_DATA_DIR, tailing it as new CBOR
@@ -261,6 +192,7 @@ def __init__(self, data_dir: Path):
261192
self._file = None
262193
self._path = None
263194
self._last_pos = 0
195+
self._consecutive_failures = 0
264196

265197
def _find_latest(self) -> Optional[Path]:
266198
candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True)
@@ -281,22 +213,38 @@ def read_records(self):
281213
self._file = open(latest, "rb")
282214
self._path = latest
283215
self._last_pos = 0
216+
self._consecutive_failures = 0
284217

285218
try:
286-
data_bytes = _read_raw_record(self._file)
219+
data_bytes = read_raw_record(self._file)
287220
if data_bytes is None:
288221
self._last_pos = self._file.tell()
222+
self._consecutive_failures = 0
289223
continue # empty placeholder record
290224
inner = cbor2.loads(data_bytes)
291225
self._last_pos = self._file.tell()
226+
self._consecutive_failures = 0
292227
yield inner
293228
except EOFError:
294229
# No new data yet — poll
295230
time.sleep(0.01)
296-
except Exception as e:
297-
log.warning("Error reading RAW record: %s", e)
231+
except (ValueError, cbor2.CBORDecodeError, OSError) as e:
232+
self._consecutive_failures += 1
233+
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
234+
log.warning("Skipping past corrupt data at offset %d after %d failures: %s",
235+
self._last_pos, self._consecutive_failures, e)
236+
self._last_pos += 1
237+
self._consecutive_failures = 0
238+
else:
239+
log.debug("Error reading RAW record (attempt %d): %s",
240+
self._consecutive_failures, e)
298241
self._file.seek(self._last_pos)
299-
time.sleep(1)
242+
time.sleep(0.1)
243+
244+
# Clean up file handle on shutdown
245+
if self._file:
246+
self._file.close()
247+
self._file = None
300248

301249
# ---------------------------------------------------------------------------
302250
# Per-side processor

modules/sleep-detector/main.py

Lines changed: 20 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@
2929
from dataclasses import dataclass, field
3030
from typing import Optional
3131

32-
import struct
32+
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
3333

3434
import cbor2
35+
from common.cbor_raw import read_raw_record
3536
import numpy as np
3637

3738
# ---------------------------------------------------------------------------
@@ -257,88 +258,19 @@ def _flush_movement(self, ts: float) -> None:
257258
self._movement_buf = []
258259
self._last_movement_write = ts
259260

260-
# ---------------------------------------------------------------------------
261-
# CBOR record reader
262-
# ---------------------------------------------------------------------------
263-
264-
def _read_raw_record(f):
265-
"""
266-
Manually parse one outer {seq, data} CBOR record using f.read().
267-
268-
The cbor2 C extension (_cbor2) reads files in internal 4096-byte chunks,
269-
so cbor2.load(f) advances f.tell() by 4096 bytes regardless of the actual
270-
record size. Since RAW file records are typically 17-5000 bytes, this causes
271-
nearly every record to be skipped silently.
272-
273-
This function parses the outer {seq: uint, data: bytes} wrapper byte-by-byte
274-
using f.read(), keeping f.tell() accurate after each record.
275-
276-
Returns the raw inner data bytes, or None for empty placeholder records
277-
(which the Pod firmware writes as sequence number markers with data=b'').
278-
Raises EOFError at end of file, ValueError on malformed data.
279-
"""
280-
b = f.read(1)
281-
if not b:
282-
raise EOFError
283-
if b[0] != 0xa2:
284-
raise ValueError('Expected outer map 0xa2, got 0x%02x' % b[0])
285-
if f.read(4) != b'\x63\x73\x65\x71':
286-
raise ValueError('Expected seq key')
287-
hdr = f.read(1)
288-
if not hdr:
289-
raise EOFError
290-
if hdr[0] == 0x1a:
291-
seq_bytes = f.read(4)
292-
if len(seq_bytes) < 4:
293-
raise EOFError
294-
elif hdr[0] == 0x1b:
295-
seq_bytes = f.read(8)
296-
if len(seq_bytes) < 8:
297-
raise EOFError
298-
else:
299-
raise ValueError('Unexpected seq encoding: 0x%02x' % hdr[0])
300-
if f.read(5) != b'\x64\x64\x61\x74\x61':
301-
raise ValueError('Expected data key')
302-
bs = f.read(1)
303-
if not bs:
304-
raise EOFError
305-
ai = bs[0] & 0x1f
306-
if ai <= 23:
307-
length = ai
308-
elif ai == 24:
309-
lb = f.read(1)
310-
if not lb:
311-
raise EOFError
312-
length = lb[0]
313-
elif ai == 25:
314-
lb = f.read(2)
315-
if len(lb) < 2:
316-
raise EOFError
317-
length = struct.unpack('>H', lb)[0]
318-
elif ai == 26:
319-
lb = f.read(4)
320-
if len(lb) < 4:
321-
raise EOFError
322-
length = struct.unpack('>I', lb)[0]
323-
else:
324-
raise ValueError('Unsupported length encoding: %d' % ai)
325-
data = f.read(length)
326-
if len(data) < length:
327-
raise EOFError
328-
if not data:
329-
return None # empty placeholder record, caller should skip
330-
return data
331-
332261
# ---------------------------------------------------------------------------
333262
# RAW file follower (same pattern as piezo-processor)
334263
# ---------------------------------------------------------------------------
335264

265+
MAX_CONSECUTIVE_FAILURES = 5
266+
336267
class RawFileFollower:
337268
def __init__(self, data_dir: Path):
338269
self.data_dir = data_dir
339270
self._file = None
340271
self._path = None
341272
self._last_pos = 0
273+
self._consecutive_failures = 0
342274

343275
def _find_latest(self):
344276
candidates = sorted(self.data_dir.glob("*.RAW"), key=lambda p: p.stat().st_mtime, reverse=True)
@@ -358,21 +290,32 @@ def read_records(self):
358290
self._file = open(latest, "rb")
359291
self._path = latest
360292
self._last_pos = 0
293+
self._consecutive_failures = 0
361294

362295
try:
363-
data_bytes = _read_raw_record(self._file)
296+
data_bytes = read_raw_record(self._file)
364297
if data_bytes is None:
365298
self._last_pos = self._file.tell()
299+
self._consecutive_failures = 0
366300
continue # empty placeholder record
367301
inner = cbor2.loads(data_bytes)
368302
self._last_pos = self._file.tell()
303+
self._consecutive_failures = 0
369304
yield inner
370305
except EOFError:
371306
time.sleep(0.5)
372-
except Exception as e:
373-
log.warning("Error reading RAW record: %s", e)
307+
except (ValueError, cbor2.CBORDecodeError, OSError) as e:
308+
self._consecutive_failures += 1
309+
if self._consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
310+
log.warning("Skipping past corrupt data at offset %d after %d failures: %s",
311+
self._last_pos, self._consecutive_failures, e)
312+
self._last_pos += 1
313+
self._consecutive_failures = 0
314+
else:
315+
log.debug("Error reading RAW record (attempt %d): %s",
316+
self._consecutive_failures, e)
374317
self._file.seek(self._last_pos)
375-
time.sleep(1)
318+
time.sleep(0.1)
376319

377320
# Clean up file handle on shutdown
378321
if self._file:

0 commit comments

Comments
 (0)