Skip to content

Commit 2ba5927

Browse files
committed
feat(): integrate incoming event handling
1 parent 4bc8537 commit 2ba5927

File tree

2 files changed

+199
-79
lines changed

2 files changed

+199
-79
lines changed

examples/ws-public.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ const wsClient = new WebsocketClient(
2626

2727
// Raw data will arrive on the 'update' event
2828
wsClient.on('update', (data) => {
29+
// console.log(
30+
// new Date(),
31+
// 'ws update (raw data received)',
32+
// JSON.stringify(data, null, 2),
33+
// );
34+
// console.log('ws update (raw data received)', JSON.stringify(data, null, 2));
2935
console.log(
3036
new Date(),
3137
'ws update (raw data received)',
32-
JSON.stringify(data, null, 2),
38+
JSON.stringify(data),
3339
);
34-
// console.log('ws update (raw data received)', JSON.stringify(data, null, 2));
3540
});
3641

3742
wsClient.on('open', (data) => {
@@ -49,7 +54,7 @@ wsClient.on('reconnect', ({ wsKey }) => {
4954
wsClient.on('reconnected', (data) => {
5055
console.log('ws has reconnected ', data?.wsKey);
5156
});
52-
wsClient.on('error', (data) => {
57+
wsClient.on('exception', (data) => {
5358
console.error('ws exception: ', data);
5459
});
5560

@@ -129,7 +134,7 @@ wsClient.subscribe({
129134
// Price limit channel
130135
wsClient.subscribe({
131136
channel: 'price-limit',
132-
instId: 'LTC-USD-190628',
137+
instId: 'BTC-USDT-SWAP',
133138
});
134139

135140
// Order book channel

src/websocket-client.ts

Lines changed: 190 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,8 @@ export class WebsocketClient extends BaseWebsocketClient<
326326
}),
327327
};
328328

329+
console.log('Prepared wsEVENT: ', wsEvent);
330+
329331
const midflightWsEvent: MidflightWsRequestEvent<
330332
WsRequestOperationOKX<object>
331333
> = {
@@ -470,8 +472,194 @@ export class WebsocketClient extends BaseWebsocketClient<
470472
wsKey: WsKey,
471473
event: MessageEventLike,
472474
): EmittableEvent[] {
473-
// TODO:
474-
return [];
475+
const results: EmittableEvent[] = [];
476+
477+
const logContext = {
478+
...WS_LOGGER_CATEGORY,
479+
wsKey,
480+
method: 'resolveEmittableEvents',
481+
};
482+
483+
try {
484+
const msg = JSON.parse(event.data);
485+
const emittableEvent = { ...msg, wsKey };
486+
487+
/**
488+
* WS API response handling
489+
*/
490+
// if (isWSAPIResponse(emittableEvent)) {
491+
// // const eg1 = {
492+
// // event: 'error',
493+
// // id: '1',
494+
// // code: '43012',
495+
// // msg: 'Insufficient balance',
496+
// // };
497+
498+
// const retCode = emittableEvent.code;
499+
// const reqId = emittableEvent.id;
500+
// const isError = retCode !== '0';
501+
502+
// const promiseRef = [emittableEvent.id].join('_');
503+
504+
// const loggableContext = {
505+
// wsKey,
506+
// promiseRef,
507+
// parsedEvent: emittableEvent,
508+
// };
509+
510+
// if (!reqId) {
511+
// this.logger.error(
512+
// 'WS API response is missing reqId - promisified workflow could get stuck. If this happens, please get in touch with steps to reproduce. Trace:',
513+
// loggableContext,
514+
// );
515+
// }
516+
517+
// if (isError) {
518+
// try {
519+
// this.getWsStore().rejectDeferredPromise(
520+
// wsKey,
521+
// promiseRef,
522+
// emittableEvent,
523+
// true,
524+
// );
525+
// } catch (e) {
526+
// this.logger.error('Exception trying to reject WSAPI promise', {
527+
// ...loggableContext,
528+
// error: e,
529+
// });
530+
// }
531+
532+
// results.push({
533+
// eventType: 'exception',
534+
// event: emittableEvent,
535+
// isWSAPIResponse: true,
536+
// });
537+
// return results;
538+
// }
539+
540+
// // WS API Success
541+
// try {
542+
// this.getWsStore().resolveDeferredPromise(
543+
// wsKey,
544+
// promiseRef,
545+
// emittableEvent,
546+
// true,
547+
// );
548+
// } catch (e) {
549+
// this.logger.error('Exception trying to resolve WSAPI promise', {
550+
// ...loggableContext,
551+
// error: e,
552+
// });
553+
// }
554+
555+
// results.push({
556+
// eventType: 'response',
557+
// event: emittableEvent,
558+
// isWSAPIResponse: true,
559+
// });
560+
561+
// return results;
562+
// }
563+
564+
if (isWsErrorEvent(msg)) {
565+
this.logger.error('WS error event: ', { ...msg, wsKey });
566+
567+
this.logger.error('WS Error received', {
568+
...logContext,
569+
wsKey,
570+
message: msg || 'no message',
571+
// messageType: typeof msg,
572+
// messageString: JSON.stringify(msg),
573+
event,
574+
});
575+
results.push({
576+
eventType: 'exception',
577+
event: emittableEvent,
578+
});
579+
return results;
580+
}
581+
582+
if (isWsDataEvent(msg)) {
583+
results.push({
584+
eventType: 'update',
585+
event: emittableEvent,
586+
});
587+
return results;
588+
}
589+
590+
if (isWsLoginEvent(msg)) {
591+
// Successfully authenticated
592+
if (msg.code === WS_EVENT_CODE_ENUM.OK) {
593+
this.logger.info(
594+
`Authenticated successfully on wsKey(${wsKey})`,
595+
logContext,
596+
);
597+
598+
results.push({
599+
eventType: 'response',
600+
event: emittableEvent,
601+
});
602+
results.push({
603+
eventType: 'authenticated',
604+
event: emittableEvent,
605+
});
606+
return results;
607+
}
608+
609+
this.logger.error('Authentication failed: ', {
610+
...logContext,
611+
...msg,
612+
wsKey,
613+
});
614+
results.push({
615+
eventType: 'exception',
616+
event: emittableEvent,
617+
});
618+
return results;
619+
}
620+
621+
if (isWsSubscribeEvent(msg) || isWsUnsubscribeEvent(msg)) {
622+
results.push({
623+
eventType: 'response',
624+
event: emittableEvent,
625+
});
626+
// this.logger.trace(`Ws subscribe reply:`, { ...msg, wsKey });
627+
return results;
628+
}
629+
630+
if (isConnCountEvent(msg)) {
631+
results.push({
632+
eventType: 'response',
633+
event: emittableEvent,
634+
});
635+
return results;
636+
}
637+
638+
this.logger.info('Unhandled/unrecognised ws event message', {
639+
...WS_LOGGER_CATEGORY,
640+
message: msg || 'no message',
641+
// messageType: typeof msg,
642+
// messageString: JSON.stringify(msg),
643+
event,
644+
wsKey,
645+
});
646+
647+
// fallback emit anyway
648+
results.push({
649+
eventType: 'update',
650+
event: emittableEvent,
651+
});
652+
return results;
653+
} catch (e) {
654+
this.logger.error('Failed to parse ws event message', {
655+
...WS_LOGGER_CATEGORY,
656+
error: e,
657+
event,
658+
wsKey,
659+
});
660+
}
661+
662+
return results;
475663
}
476664

477665
async sendWSAPIRequest(wsKey: WsKey): Promise<unknown> {
@@ -652,77 +840,4 @@ export class WebsocketClient extends BaseWebsocketClient<
652840
// this.logger.error(e, logContext);
653841
// }
654842
// }
655-
656-
// private onWsMessageLegacy(event: any, wsKey: WsKey, ws: WebSocket) {
657-
// const logContext = { ...WS_LOGGER_CATEGORY, wsKey, method: 'onWsMessage' };
658-
659-
// try {
660-
// // any message can clear the pong timer - wouldn't get a message if the ws dropped
661-
// this.clearPongTimer(wsKey);
662-
663-
// if (isWsPong(event)) {
664-
// this.logger.trace('Received pong', logContext);
665-
// return;
666-
// }
667-
668-
// const msg = JSON.parse(event?.data || event);
669-
670-
// if (isWsErrorEvent(msg)) {
671-
// this.logger.error('WS error event: ', { ...msg, wsKey });
672-
// return this.emit('exception', { ...msg, wsKey });
673-
// }
674-
675-
// if (isWsDataEvent(msg)) {
676-
// return this.emit('update', { ...msg, wsKey });
677-
// }
678-
679-
// if (isWsLoginEvent(msg)) {
680-
// // Successfully authenticated
681-
// if (msg.code === WS_EVENT_CODE_ENUM.OK) {
682-
// this.logger.info(
683-
// `Authenticated successfully on wsKey(${wsKey})`,
684-
// logContext,
685-
// );
686-
// this.emit('response', { ...msg, wsKey });
687-
688-
// const topics = [...this.getWsStore().getTopicsAsArray(wsKey)];
689-
690-
// // Since private topics have a dedicated WsKey, these are automatically all private topics (no filtering required before the subscribe call)
691-
// this.requestSubscribeTopics(wsKey, topics);
692-
693-
// return;
694-
// }
695-
696-
// this.logger.error('Authentication failed: ', {
697-
// ...logContext,
698-
// ...msg,
699-
// wsKey,
700-
// });
701-
// return this.emit('exception', { ...msg, wsKey });
702-
// }
703-
704-
// if (isWsSubscribeEvent(msg) || isWsUnsubscribeEvent(msg)) {
705-
// // this.logger.trace(`Ws subscribe reply:`, { ...msg, wsKey });
706-
// return this.emit('response', { ...msg, wsKey });
707-
// }
708-
709-
// if (isConnCountEvent(msg)) {
710-
// return this.emit('response', { ...msg, wsKey });
711-
// }
712-
713-
// this.logger.error('Unhandled/unrecognised ws event message', {
714-
// ...logContext,
715-
// eventName: msg.event,
716-
// msg: JSON.stringify(msg, null, 2),
717-
// wsKey,
718-
// });
719-
// } catch (e) {
720-
// this.logger.error('Failed to parse ws event message', {
721-
// ...logContext,
722-
// error: e,
723-
// event,
724-
// wsKey,
725-
// });
726-
// }
727-
// }
728843
}

0 commit comments

Comments
 (0)