Skip to content

Commit 119c267

Browse files
committed
StreamID previously could be 0
Before streaming was implemented the field used for streamID today used to be 0 always So streamID == 0 was made a special case for backward compatibility But raftjs in some situations still sent packets with streamID == 0 and this is fixed now
1 parent 6b07fea commit 119c267

6 files changed

Lines changed: 265 additions & 19 deletions

File tree

src/RaftConnector.ts

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import RaftChannelWebSocket from "./RaftChannelWebSocket";
1414
import RaftChannelWebSerial from "./RaftChannelWebSerial";
1515
import RaftChannelSimulated from "./RaftChannelSimulated";
1616
import RaftCommsStats from "./RaftCommsStats";
17-
import { RaftEventFn, RaftOKFail, RaftFileSendType, RaftFileDownloadResult, RaftProgressCBType, RaftStreamDataProgressCBType, RaftBridgeSetupResp, RaftFileDownloadFn, RaftReportMsg } from "./RaftTypes";
17+
import { RaftEventFn, RaftOKFail, RaftFileSendType, RaftFileDownloadResult, RaftProgressCBType, RaftStreamDataProgressCBType, RaftBridgeSetupResp, RaftFileDownloadFn, RaftReportMsg, RaftRtStreamDataCBType, RaftRtStreamHandle, RaftRtStreamOptions, RaftRtStreamStartResp } from "./RaftTypes";
1818
import RaftSystemUtils from "./RaftSystemUtils";
1919
import RaftFileHandler from "./RaftFileHandler";
2020
import RaftStreamHandler from "./RaftStreamHandler";
@@ -87,6 +87,10 @@ export default class RaftConnector {
8787
// Update manager
8888
private _raftUpdateManager: RaftUpdateManager | null = null;
8989

90+
// Open-ended RT stream callbacks keyed by streamID
91+
private _rtStreamCallbacks = new Map<number, RaftRtStreamDataCBType>();
92+
private _fallbackRtStreamCallback: { streamID: number, callback: RaftRtStreamDataCBType } | null = null;
93+
9094
/**
9195
* RaftConnector constructor
9296
* @param getSystemTypeCB - callback to get system type
@@ -193,6 +197,22 @@ export default class RaftConnector {
193197
return this._commsStats;
194198
}
195199

200+
/**
201+
* getOperationQueueDepth
202+
* @returns number of high-level device operations queued or running
203+
*/
204+
getOperationQueueDepth(): number {
205+
return 0;
206+
}
207+
208+
/**
209+
* isOperationBusy
210+
* @returns true when a high-level device operation is queued or running
211+
*/
212+
isOperationBusy(): boolean {
213+
return false;
214+
}
215+
196216
/**
197217
* Get Raft message handler (to allow message sending and receiving)
198218
* @returns RaftMsgHandler - Raft message handler
@@ -390,18 +410,24 @@ export default class RaftConnector {
390410
*
391411
*/
392412
async sendRICRESTMsg(commandName: string, params: object,
413+
bridgeID: number | undefined = undefined): Promise<RaftOKFail> {
414+
return this._sendRICRESTMsg(commandName, params, bridgeID);
415+
}
416+
417+
private async _sendRICRESTMsg(commandName: string, params: object,
393418
bridgeID: number | undefined = undefined): Promise<RaftOKFail> {
394419
try {
395420
// Format the paramList as query string
396421
const paramEntries = Object.entries(params);
397422
let paramQueryStr = '';
398423
for (const param of paramEntries) {
399424
if (paramQueryStr.length > 0) paramQueryStr += '&';
400-
paramQueryStr += param[0] + '=' + param[1];
425+
paramQueryStr += `${encodeURIComponent(param[0])}=${encodeURIComponent(String(param[1]))}`;
401426
}
402427
// Format the url to send
403428
if (paramQueryStr.length > 0) commandName += '?' + paramQueryStr;
404-
return await this._raftMsgHandler.sendRICRESTURL<RaftOKFail>(commandName, bridgeID);
429+
const response = await this._raftMsgHandler.sendRICRESTURL<RaftOKFail | null>(commandName, bridgeID);
430+
return response ?? { rslt: 'fail' };
405431
} catch (error) {
406432
RaftLog.warn(`sendRICRESTMsg failed ${error}`);
407433
return { rslt: 'fail' };
@@ -455,6 +481,13 @@ export default class RaftConnector {
455481
fileBlockData: Uint8Array
456482
): void {
457483
// RaftLog.info(`onRxFileBlock filePos ${filePos} fileBlockData ${RaftUtils.bufferToHex(fileBlockData)}`);
484+
const streamID = (filePos >>> 24) & 0xff;
485+
const streamFilePos = filePos & 0x00ffffff;
486+
const streamCallback = this._rtStreamCallbacks.get(streamID);
487+
if (streamID !== 0 && streamCallback) {
488+
streamCallback(fileBlockData, streamFilePos, streamID);
489+
return;
490+
}
458491
this._raftFileHandler.onFileBlock(filePos, fileBlockData);
459492
}
460493

@@ -516,6 +549,82 @@ export default class RaftConnector {
516549
return false;
517550
}
518551

552+
/**
553+
* openRtStream - open an indefinite bidirectional RT stream.
554+
* The returned handle can send byte blocks and closes with ufEnd.
555+
*/
556+
async openRtStream(options: RaftRtStreamOptions): Promise<RaftRtStreamHandle> {
557+
const cmdMsg = JSON.stringify({
558+
cmdName: "ufStart",
559+
reqStr: "ufStart",
560+
fileType: "rtstream",
561+
fileName: options.fileName,
562+
endpoint: options.endpoint,
563+
fileLen: 0,
564+
});
565+
566+
const startResp = await this._raftMsgHandler.sendRICRESTCmdFrame<RaftRtStreamStartResp>(cmdMsg);
567+
if (!startResp || startResp.rslt !== "ok" || startResp.streamID === undefined) {
568+
throw new Error(`openRtStream failed ${startResp?.rslt ?? "no response"}`);
569+
}
570+
571+
const streamID = startResp.streamID;
572+
const maxBlockSize = startResp.maxBlockSize || this._raftStreamHandler.maxBlockSize;
573+
let txFilePos = 0;
574+
let sendQueue = Promise.resolve();
575+
this._rtStreamCallbacks.set(streamID, options.onData);
576+
this._fallbackRtStreamCallback = { streamID, callback: options.onData };
577+
578+
const sendBytes = async (bytes: Uint8Array): Promise<boolean> => {
579+
let sentOk = false;
580+
sendQueue = sendQueue
581+
.catch(() => {
582+
// Keep later terminal input flowing even if an earlier block failed.
583+
})
584+
.then(async () => {
585+
sentOk = await this._raftMsgHandler.sendStreamBlock(bytes, txFilePos, streamID);
586+
if (sentOk) {
587+
txFilePos = (txFilePos + bytes.length) & 0x00ffffff;
588+
}
589+
});
590+
await sendQueue;
591+
return sentOk;
592+
};
593+
594+
const close = async (): Promise<boolean> => {
595+
this._rtStreamCallbacks.delete(streamID);
596+
if (this._fallbackRtStreamCallback?.streamID === streamID) {
597+
this._fallbackRtStreamCallback = null;
598+
}
599+
const endMsg = JSON.stringify({
600+
cmdName: "ufEnd",
601+
reqStr: "ufEnd",
602+
streamID,
603+
});
604+
try {
605+
const endResp = await this._raftMsgHandler.sendRICRESTCmdFrame<RaftOKFail>(endMsg);
606+
return endResp?.rslt === "ok";
607+
} catch (error) {
608+
RaftLog.warn(`closeRtStream failed ${streamID}: ${error}`);
609+
return false;
610+
}
611+
};
612+
613+
// Some endpoints use an initial empty block as an attach signal.
614+
// Keep this opt-in because older endpoints reject zero-length ufBlock frames.
615+
if (options.sendInitialEmptyBlock) {
616+
await sendBytes(new Uint8Array());
617+
}
618+
619+
return {
620+
streamID,
621+
maxBlockSize,
622+
sendBytes,
623+
sendText: (text: string) => sendBytes(new TextEncoder().encode(text)),
624+
close,
625+
};
626+
}
627+
519628
/**
520629
* streamAudio - stream audio
521630
* @param streamContents audio data

src/RaftFileHandler.ts

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export default class RaftFileHandler {
6565
private _ackedFilePos = 0;
6666
private _batchAckReceived = false;
6767
private _isTxCancelled = false;
68+
private _fileTxStreamID = 0;
6869

6970
// File receive info
7071
private _isRxCancelled = false;
@@ -78,7 +79,6 @@ export default class RaftFileHandler {
7879
private _fileRxLastAckTime = 0;
7980
private _fileRxLastBlockTime = 0;
8081
private _fileRxLastAckPos = 0;
81-
private OVERALL_FILE_TRANSFER_TIMEOUT_MS = 100000;
8282
private FILE_RX_ACK_RESEND_TIMEOUT_MS = 1000;
8383

8484
// RaftCommsStats
@@ -111,6 +111,7 @@ export default class RaftFileHandler {
111111
progressCallback: ((sent: number, total: number, progress: number) => void) | undefined,
112112
): Promise<boolean> {
113113
this._isTxCancelled = false;
114+
this._fileTxStreamID = 0;
114115

115116
// Send file start message
116117
if (!await this._sendFileStartMsg(fileName, fileType, fileDest, fileContents))
@@ -166,8 +167,8 @@ export default class RaftFileHandler {
166167
RaftLog.warn(`sendFileStartMsg error ${err}`);
167168
return false;
168169
}
169-
if (fileStartResp.rslt !== 'ok') {
170-
RaftLog.warn(`sendFileStartMsg error ${fileStartResp.rslt}`);
170+
if (!fileStartResp || fileStartResp.rslt !== 'ok') {
171+
RaftLog.warn(`sendFileStartMsg error ${fileStartResp?.rslt ?? 'no response'}`);
171172
return false;
172173
}
173174

@@ -182,12 +183,17 @@ export default class RaftFileHandler {
182183
} else {
183184
this._batchAckSize = this._requestedBatchAckSize;
184185
}
186+
const streamID = fileStartResp.streamID;
187+
this._fileTxStreamID = streamID !== undefined && streamID > 0 && streamID <= 0xff ? streamID : 0;
185188
RaftLog.debug(
186189
`_fileSendStartMsg fileBlockSize req ${this._requestedFileBlockSize} resp ${fileStartResp.batchMsgSize} actual ${this._fileBlockSize}`,
187190
);
188191
RaftLog.debug(
189192
`_fileSendStartMsg batchAckSize req ${this._requestedBatchAckSize} resp ${fileStartResp.batchAckSize} actual ${this._batchAckSize}`,
190193
);
194+
RaftLog.debug(
195+
`_fileSendStartMsg streamID ${this._fileTxStreamID}`,
196+
);
191197
return true;
192198
}
193199

@@ -203,7 +209,8 @@ export default class RaftFileHandler {
203209
? 'espfwupdate'
204210
: 'fileupload';
205211
const fileLen = fileContents.length;
206-
const cmdMsg = `{"cmdName":"ufEnd","reqStr":"${reqStr}","fileType":"${fileDest}","fileName":"${fileName}","fileLen":${fileLen}}`;
212+
const streamIDJson = this._fileTxStreamID !== 0 ? `,"streamID":${this._fileTxStreamID}` : '';
213+
const cmdMsg = `{"cmdName":"ufEnd","reqStr":"${reqStr}","fileType":"${fileDest}","fileName":"${fileName}","fileLen":${fileLen}${streamIDJson}}`;
207214

208215
// Await outstanding promises
209216
try {
@@ -224,12 +231,13 @@ export default class RaftFileHandler {
224231
RaftLog.warn(`sendFileEndMsg error ${err}`);
225232
return false;
226233
}
227-
return fileEndResp.rslt === 'ok';
234+
return fileEndResp?.rslt === 'ok';
228235
}
229236

230237
async _sendFileCancelMsg(): Promise<void> {
231238
// File cancel command message
232-
const cmdMsg = `{"cmdName":"ufCancel"}`;
239+
const streamIDJson = this._fileTxStreamID !== 0 ? `,"streamID":${this._fileTxStreamID}` : '';
240+
const cmdMsg = `{"cmdName":"ufCancel"${streamIDJson}}`;
233241

234242
// Await outstanding promises
235243
await this.awaitOutstandingMsgPromises(true);
@@ -368,7 +376,7 @@ export default class RaftFileHandler {
368376
await this.awaitOutstandingMsgPromises(false);
369377

370378
// Send
371-
const promRslt = this._msgHandler.sendFileBlock(fileContents.subarray(blockStart, blockEnd), blockStart);
379+
const promRslt = this._msgHandler.sendFileBlock(fileContents.subarray(blockStart, blockEnd), blockStart, this._fileTxStreamID);
372380
if (!promRslt) {
373381
return false;
374382
}
@@ -438,8 +446,8 @@ export default class RaftFileHandler {
438446
// Establish a bridge
439447
const bridgedDeviceSerialPort = "Serial" + fileSource.slice(bridgeSerialPrefix.length);
440448
const cmdResp = await this._msgHandler.createCommsBridge(bridgedDeviceSerialPort, "fileSource");
441-
if (cmdResp.rslt != "ok") {
442-
RaftLog.warn(`fileReceive - failed to setup bridge ${cmdResp.rslt}`);
449+
if (!cmdResp || cmdResp.rslt != "ok") {
450+
RaftLog.warn(`fileReceive - failed to setup bridge ${cmdResp?.rslt ?? 'no response'}`);
443451
return new RaftFileDownloadResult();
444452
}
445453
bridgeID = cmdResp.bridgeID;
@@ -498,7 +506,7 @@ export default class RaftFileHandler {
498506
return false;
499507
}
500508
RaftLog.info(`_receiveFileStartMsg rslt ${JSON.stringify(cmdResp)}`);
501-
if (cmdResp.rslt === 'ok') {
509+
if (cmdResp?.rslt === 'ok') {
502510
this._fileRxBatchMsgSize = cmdResp.batchMsgSize;
503511
this._fileRxBatchAckSize = cmdResp.batchAckSize;
504512
this._fileRxStreamID = cmdResp.streamID;
@@ -511,6 +519,10 @@ export default class RaftFileHandler {
511519
this._fileRxLastBlockTime = Date.now();
512520
this._fileRxActive = true;
513521
}
522+
if (!cmdResp) {
523+
RaftLog.warn(`_receiveFileStartMsg failed no response`);
524+
return false;
525+
}
514526
return cmdResp.rslt === 'ok';
515527
}
516528

@@ -581,6 +593,7 @@ export default class RaftFileHandler {
581593
`elapsed ${now - startTime}ms overallTimeout ${overallTimeoutMs}ms ` +
582594
`blockGap ${now - this._fileRxLastBlockTime}ms blockTimeout ${blockTimeoutMs}ms`);
583595
this._fileRxActive = false;
596+
this._sendFileRxCancelMsg(bridgeID);
584597
reject(new Error('fileReceive failed'));
585598
return;
586599
}
@@ -652,6 +665,12 @@ export default class RaftFileHandler {
652665
}
653666

654667
// Check deferred CRC if start response didn't include one
668+
if (!cmdResp) {
669+
RaftLog.warn(`_receiveFileEnd failed no response`);
670+
this._fileRxActive = false;
671+
return false;
672+
}
673+
655674
if (this._fileRxCrc16 < 0 && cmdResp.crc16) {
656675
const expectedCrc = parseInt(cmdResp.crc16, 16);
657676
const actualCrc = RaftMiniHDLC.crc16(this._fileRxBuffer);
@@ -684,8 +703,20 @@ export default class RaftFileHandler {
684703
): void {
685704
// RaftLog.info(`onFileBlock filePos ${filePos} fileBlockData ${RaftUtils.bufferToHex(fileBlockData)}`);
686705

706+
if (!this._fileRxActive) {
707+
RaftLog.verbose(`onFileBlock ignored inactive transfer filePos ${filePos} len ${fileBlockData.length}`);
708+
return;
709+
}
710+
711+
const streamID = (filePos >>> 24) & 0xff;
712+
const streamFilePos = filePos & 0x00ffffff;
713+
if (streamID !== 0 && streamID !== this._fileRxStreamID) {
714+
RaftLog.verbose(`onFileBlock ignored stale streamID ${streamID} active ${this._fileRxStreamID}`);
715+
return;
716+
}
717+
687718
// Check if this is the next block we are expecting
688-
if (filePos === this._fileRxBuffer.length) {
719+
if (streamFilePos === this._fileRxBuffer.length) {
689720

690721
// Add to buffer
691722
const tmpArray = new Uint8Array(this._fileRxBuffer.length + fileBlockData.length);
@@ -700,7 +731,7 @@ export default class RaftFileHandler {
700731
// RaftLog.info(`onFileBlock filePos ${filePos} fileBlockData ${RaftUtils.bufferToHex(fileBlockData)} added to buffer`);
701732

702733
} else {
703-
RaftLog.warn(`onFileBlock expected streamID ${this._fileRxStreamID} filePos ${filePos} fileBlockData ${RaftUtils.bufferToHex(fileBlockData)} out of sequence`);
734+
RaftLog.warn(`onFileBlock expected streamID ${this._fileRxStreamID} filePos ${streamFilePos} fileBlockData ${RaftUtils.bufferToHex(fileBlockData)} out of sequence`);
704735
}
705736
}
706737

0 commit comments

Comments
 (0)