Skip to content

Commit 736a5e4

Browse files
committed
Improved timestamp handling
1 parent ee8e88d commit 736a5e4

3 files changed

Lines changed: 72 additions & 43 deletions

File tree

src/RaftAttributeHandler.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,18 @@ export default class AttributeHandler {
159159
// Handle the timestamps with increments if specified
160160
const timeIncUs: number = pollRespMetadata.us ? pollRespMetadata.us : 1000;
161161
const timestampsUs = Array(numNewDataPoints).fill(0);
162+
// Get the last timestamp in the timeline to ensure monotonicity
163+
const lastTimeUs = deviceTimeline.timestampsUs.length > 0
164+
? deviceTimeline.timestampsUs[deviceTimeline.timestampsUs.length - 1]
165+
: -Infinity;
162166
for (let i = 0; i < numNewDataPoints; i++) {
163-
timestampsUs[i] = timestampUs + i * timeIncUs;
167+
timestampsUs[i] = timestampUs + i * timeIncUs;
168+
// Ensure monotonically increasing timestamps
169+
if (i === 0 && timestampsUs[0] <= lastTimeUs) {
170+
timestampsUs[0] = lastTimeUs + 1;
171+
} else if (i > 0 && timestampsUs[i] <= timestampsUs[i - 1]) {
172+
timestampsUs[i] = timestampsUs[i - 1] + 1;
173+
}
164174
}
165175

166176
// Check if timeline points need to be discarded

src/RaftCustomAttrHandler.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ export default class CustomAttrHandler {
4646
return attrValueVecs;
4747
}
4848

49-
// Provide only this poll block (bounded by pollRespMetadata.b) to avoid
50-
// decoding bytes that belong to subsequent records in the same frame.
51-
const buf = msgBuffer.slice(msgBufIdx, msgBufIdx + numMsgBytes);
52-
if (buf.length < numMsgBytes) {
49+
// Provide this poll block to the custom handler. Use the smaller of
50+
// pollRespMetadata.b and the bytes actually available — variable-length
51+
// samples may be shorter than b, and the last sample in a frame may not
52+
// have b bytes remaining in the buffer.
53+
const availableBytes = Math.min(numMsgBytes, msgBuffer.length - msgBufIdx);
54+
if (availableBytes <= 0) {
5355
return [];
5456
}
57+
const buf = msgBuffer.slice(msgBufIdx, msgBufIdx + availableBytes);
5558

5659
const fn = this.getOrCompileFunction(customFnDef);
5760
if (!fn) {

src/RaftDeviceManager.ts

Lines changed: 54 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -217,44 +217,60 @@ export class DeviceManager implements RaftDeviceMgrIF{
217217
// The rxMsg passed to this function has a 2-byte message type prefix (e.g. 0x0080)
218218
// added by the transport layer. After that prefix comes a devbin frame:
219219
//
220-
// Devbin envelope (2 bytes):
221-
// Byte 0: magic+version 0xDB = devbin v1 (valid range 0xDB–0xDF for v1–v5)
222-
// Byte 1: topicIndex 0x00–0xFE = topic index; 0xFF = no topic
220+
// Devbin envelope (3 bytes):
221+
// Byte 0: magic+version 0xDB (valid range 0xDB–0xDF)
222+
// Byte 1: topicIndex 0x00–0xFE = topic index; 0xFF = no topic
223+
// Byte 2: envelopeSeqNum uint8, wrapping — detects whole-frame drops
223224
//
224225
// Then zero or more per-device records, concatenated back-to-back:
225-
// Bytes 0-1: recordLen uint16 big-endian — number of body bytes that follow (min 7)
226-
// Byte 2: statusBus bit 7 = online flag, bit 6 = pending deletion, bits 5:4 = reserved, bits 3:0 = bus number (0-15)
227-
// Bytes 3-6: address uint32 big-endian — device address on the bus
228-
// Bytes 7-8: devTypeIdx uint16 big-endian — device type table index
229-
// Bytes 9+: pollData variable length (recordLen − 7 bytes) — device data
226+
// Bytes 0-1: recordLen uint16 big-endian — number of body bytes that follow (min 8)
227+
// Byte 2: statusBus bit 7 = online flag, bit 6 = pending deletion, bits 3:0 = bus number
228+
// Bytes 3-6: address uint32 big-endian — device address on the bus
229+
// Bytes 7-8: devTypeIdx uint16 big-endian — device type table index
230+
// Byte 9: deviceSeqNum uint8, wrapping — per-device drop detection
231+
// Bytes 10+: samples length-prefixed: [sampleLen(1B)][sampleData(sampleLen B)] × N
230232
//
231-
// Example message (with transport prefix):
232-
// 0080 DB01 0015 81 0000076a 000b bff10000ffffffff7a07d1f1221c 000e 80 00000000 001f bc340000030001
233-
// ^^^^ ^^^^ ^^^^
234-
// | | | || | | | Record 2 ...
235-
// | | | || | | pollData (14 bytes)
236-
// | | | || | devTypeIdx = 0x000b (11)
237-
// | | | || address = 0x0000076a (slot 7, I2C addr 0x6a)
238-
// | | | |busInfo = 0x81 (bus 1, online)
239-
// | | | recordLen = 0x0015 (21 bytes)
233+
// Example message (two device records; first record has two samples):
234+
// 0080 DB 01 07 0018 81 0000076a 000b 2a 07feff0000010008 07185707931400 01 000e 80 00000000 001f 05 05030001af01
235+
// ^^^^ ^^^^
236+
// | ^^ ^^ ^^ Record 2 ...
237+
// | | | envelopeSeqNum = 0x07 (same layout as Record 1)
240238
// | | topicIndex = 0x01
241239
// | magic+version = 0xDB (devbin v1)
242240
// msgType prefix (transport layer)
241+
//
242+
// Record 1 breakdown:
243+
// 0018 recordLen = 24 body bytes follow
244+
// 81 statusBus: online=1, pendDel=0, bus=1
245+
// 0000076a address = 0x0000076A (slot 7, I2C addr 0x6A)
246+
// 000b devTypeIdx = 11
247+
// 2a deviceSeqNum = 42
248+
// 07 feff0000010008 sample 1: sampleLen=7, 7 bytes of attribute data
249+
// 07 18570793140001 sample 2: sampleLen=7, 7 bytes of attribute data
250+
//
251+
// Record 2 breakdown:
252+
// 000e recordLen = 14 body bytes follow
253+
// 80 statusBus: online=1, pendDel=0, bus=0
254+
// 00000000 address = 0x00000000 (direct-connect)
255+
// 001f devTypeIdx = 31
256+
// 05 deviceSeqNum = 5
257+
// 05 030001af01 sample 1: sampleLen=5, 5 bytes of attribute data
243258

244259
// Debug
245260
// const debugMsgTime = Date.now();
246261
const debugMsgIndex = this._debugMsgIndex++;
247262

248263
// Message layout constants
249264
const msgTypeLen = 2; // Transport-layer message type prefix (first two bytes, e.g. 0x0080)
250-
const devbinEnvelopeLen = 2; // Devbin envelope: magic+version (1 byte) + topicIndex (1 byte)
265+
const devbinEnvelopeLen = 3; // Devbin envelope: magic+version (1B) + topicIndex (1B) + envelopeSeqNum (1B)
251266
const devbinMagicMin = 0xDB;
252267
const devbinMagicMax = 0xDF;
253268
const recordLenLen = 2; // Per-record length prefix (uint16 big-endian)
254269
const busInfoLen = 1; // statusBus byte: bit 7 = online, bit 6 = pending deletion, bits 3:0 = bus number
255270
const deviceAddrLen = 4; // Device address (uint32 big-endian)
256271
const devTypeIdxLen = 2; // Device type index (uint16 big-endian)
257-
const recordHeaderLen = busInfoLen + deviceAddrLen + devTypeIdxLen; // = 7, minimum record body
272+
const deviceSeqNumLen = 1; // Per-device sequence counter
273+
const recordHeaderLen = busInfoLen + deviceAddrLen + devTypeIdxLen + deviceSeqNumLen; // = 8, minimum record body
258274

259275
// console.log(`DevMan.handleClientMsgBinary debugIdx ${debugMsgIndex} rxMsg.length ${rxMsg.length} rxMsg ${RaftUtils.bufferToHex(rxMsg)}`);
260276

@@ -315,7 +331,13 @@ export class DeviceManager implements RaftDeviceMgrIF{
315331

316332
// Device type index (uint16 big-endian)
317333
const devTypeIdx = (rxMsg[recordPos] << 8) + rxMsg[recordPos + 1];
318-
let pollDataPos = recordPos + devTypeIdxLen;
334+
recordPos += devTypeIdxLen;
335+
336+
// Per-device sequence counter (reserved for future drop detection)
337+
// const deviceSeqNum = rxMsg[recordPos];
338+
recordPos += deviceSeqNumLen;
339+
340+
let pollDataPos = recordPos;
319341

320342
// Debug
321343
// console.log(`DevMan.handleClientMsgBinary debugIdx ${debugMsgIndex} overallLen ${rxMsg.length} recordStart ${msgPos} recordLen ${recordLen} ${pollDataPos} ${RaftUtils.bufferToHex(rxMsg.slice(msgPos, msgPos + recordLenLen + recordLen))}`);
@@ -391,45 +413,39 @@ export class DeviceManager implements RaftDeviceMgrIF{
391413
// Iterate over attributes in the group
392414
const pollRespMetadata = deviceState.deviceTypeInfo!.resp!;
393415

394-
// Process poll data (recordLen - recordHeaderLen bytes)
395-
const pollDataLen = recordLen - recordHeaderLen;
396-
const pollDataStartPos = pollDataPos;
416+
// Process length-prefixed samples within this record
417+
const samplesEndPos = msgPos + recordLenLen + recordLen;
397418
const attrLengthsBefore = this.snapshotAttrLengths(deviceState.deviceAttributes, pollRespMetadata);
398419
const timelineLenBefore = deviceState.deviceTimeline.timestampsUs.length;
399420
const totalSamplesBefore = deviceState.deviceTimeline.totalSamplesAdded;
400-
while (pollDataPos < pollDataStartPos + pollDataLen) {
421+
while (pollDataPos < samplesEndPos) {
401422

402-
// Add bounds checking
423+
// Read sample length prefix
403424
if (pollDataPos >= rxMsg.length) {
404425
console.warn(`DevMan.handleClientMsgBinary debugIdx ${debugMsgIndex} pollDataPos ${pollDataPos} exceeds message length ${rxMsg.length}`);
405426
break;
406427
}
428+
const sampleLen = rxMsg[pollDataPos];
429+
pollDataPos += 1;
430+
431+
if (sampleLen === 0 || pollDataPos + sampleLen > samplesEndPos) {
432+
break;
433+
}
407434

408435
const newMsgBufIdx = this._attributeHandler.processMsgAttrGroup(rxMsg, pollDataPos,
409436
deviceState.deviceTimeline, pollRespMetadata,
410437
deviceState.deviceAttributes,
411438
this._maxDatapointsToStore);
412439

413-
// console.log(`DevMan.handleClientMsgBinary decoded debugIdx ${debugMsgIndex} devType ${deviceState.deviceTypeInfo.name} pollDataLen ${pollDataLen} pollDataPos ${pollDataPos} recordLen ${recordLen} msgPos ${msgPos} rxMsgLen ${rxMsg.length} remainingLen ${remainingLen} pollRespMetadata ${JSON.stringify(pollRespMetadata)}`);
414-
415440
if (newMsgBufIdx < 0)
416441
{
417442
console.warn(`DevMan.handleClientMsgBinary debugIdx ${debugMsgIndex} processMsgAttrGroup failed newMsgBufIdx ${newMsgBufIdx}`);
418443
break;
419444
}
420-
421-
// Prevent infinite loops
422-
if (newMsgBufIdx <= pollDataPos) {
423-
console.warn(`DevMan.handleClientMsgBinary debugIdx ${debugMsgIndex} processMsgAttrGroup didn't advance position from ${pollDataPos} to ${newMsgBufIdx}`);
424-
break;
425-
}
426445

427-
pollDataPos = newMsgBufIdx;
446+
// Advance by sampleLen regardless of how much processMsgAttrGroup consumed
447+
pollDataPos += sampleLen;
428448
deviceState.stateChanged = true;
429-
430-
// console.log(`debugMsgTime ${debugMsgTime} newPt debugMsgIdx ${debugMsgIndex} rxMsgLen ${rxMsg.length} devType ${deviceState.deviceTypeInfo!.name} timestampsUs ${deviceState.deviceTimeline.timestampsUs[deviceState.deviceTimeline.timestampsUs.length - 1]} curTimelineLen ${deviceState.deviceTimeline.timestampsUs.length}`);
431-
432-
// console.log(`DevMan.handleClientMsgBinary group done debugIdx ${debugMsgIndex} pollDataPos ${pollDataPos} recordLen ${recordLen} msgPos ${msgPos} rxMsgLen ${rxMsg.length} remainingLen ${remainingLen}`);
433449
}
434450

435451
// Inform decoded-data callbacks

0 commit comments

Comments
 (0)