Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 98 additions & 26 deletions ui/src/trace_processor/http_rpc_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import {assertExists, reportError} from '../base/logging';
import {EngineBase} from '../trace_processor/engine';

const RPC_CONNECT_TIMEOUT_MS = 2000;
const INITIAL_RETRY_DELAY_MS = 100;
const MAX_RETRY_DELAY_MS = 30000;
const BACKOFF_MULTIPLIER = 2;

export interface HttpRpcState {
connected: boolean;
Expand All @@ -34,6 +37,8 @@ export class HttpRpcEngine extends EngineBase {
private disposed = false;
private queue: Blob[] = [];
private isProcessingQueue = false;
private retryDelayMs = INITIAL_RETRY_DELAY_MS;
private retryTimeoutId?: ReturnType<typeof setTimeout>;

// Can be changed by frontend/index.ts when passing ?rpc_port=1234 .
static defaultRpcPort = '9001';
Expand All @@ -47,47 +52,109 @@ export class HttpRpcEngine extends EngineBase {
}

rpcSendRequestBytes(data: Uint8Array): void {
if (this.websocket === undefined) {
if (this.disposed) return;
const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`;
this.websocket = new WebSocket(wsUrl);
this.websocket.onopen = () => this.onWebsocketConnected();
this.websocket.onmessage = (e) => this.onWebsocketMessage(e);
this.websocket.onclose = (e) => this.onWebsocketClosed(e);
this.websocket.onerror = (e) =>
super.fail(
`WebSocket error rs=${(e.target as WebSocket)?.readyState} (ERR:ws)`,
);
}
if (this.disposed) return;
const websocket = this.getOrCreateWebSocket();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much prefer this nomenclature. Thanks!


if (this.connected) {
this.websocket.send(data);
websocket.send(data);
} else {
this.requestQueue.push(data); // onWebsocketConnected() will flush this.
}
}

/**
* Returns the existing WebSocket if one exists and is not closed,
* otherwise creates a new one (closing any stale socket first).
*/
private getOrCreateWebSocket(): WebSocket {
// If we have an active websocket that's not closed/closing, reuse it
if (
this.websocket !== undefined &&
this.websocket.readyState !== WebSocket.CLOSED &&
this.websocket.readyState !== WebSocket.CLOSING
) {
return this.websocket;
}

// Close any stale websocket before creating a new one
this.closeWebSocket();

const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`;
this.websocket = new WebSocket(wsUrl);
this.websocket.onopen = () => this.onWebsocketConnected();
this.websocket.onmessage = (e) => this.onWebsocketMessage(e);
this.websocket.onclose = (e) => this.onWebsocketClosed(e);
this.websocket.onerror = (e) => this.onWebsocketError(e);
return this.websocket;
}

/**
* Closes the current websocket if one exists, clearing event handlers
* to prevent spurious callbacks.
*/
private closeWebSocket(): void {
if (this.websocket === undefined) return;

// Clear handlers to prevent callbacks from a closing socket
this.websocket.onopen = null;
this.websocket.onmessage = null;
this.websocket.onclose = null;
this.websocket.onerror = null;
this.websocket.close();
this.websocket = undefined;
}

private onWebsocketError(e: Event): void {
if (this.disposed) return;
const readyState = (e.target as WebSocket)?.readyState;
console.warn(`WebSocket error rs=${readyState}, will retry with backoff`);
// The close event will fire after this, which will trigger the retry logic
}

private scheduleReconnect(): void {
if (this.disposed) return;

console.debug(
`Scheduling WebSocket reconnection in ${this.retryDelayMs}ms`,
);

this.retryTimeoutId = setTimeout(() => {
if (this.disposed) return;
console.debug('Attempting WebSocket reconnection...');
this.getOrCreateWebSocket();
}, this.retryDelayMs);

// Exponential backoff with cap
this.retryDelayMs = Math.min(
this.retryDelayMs * BACKOFF_MULTIPLIER,
MAX_RETRY_DELAY_MS,
);
}

private onWebsocketConnected() {
// Reset retry delay on successful connection
this.retryDelayMs = INITIAL_RETRY_DELAY_MS;

for (;;) {
const queuedMsg = this.requestQueue.shift();
if (queuedMsg === undefined) break;
assertExists(this.websocket).send(queuedMsg);
}
console.debug('WebSocket (re)connected on port', this.port);
this.connected = true;
}

private onWebsocketClosed(e: CloseEvent) {
if (this.disposed) return;
if (e.code === 1006 && this.connected) {
// On macbooks the act of closing the lid / suspending often causes socket
// disconnections. Try to gracefully re-connect.
console.log('Websocket closed, reconnecting');
this.websocket = undefined;
this.connected = false;
this.rpcSendRequestBytes(new Uint8Array()); // Triggers a reconnection.
} else {
super.fail(`Websocket closed (${e.code}: ${e.reason}) (ERR:ws)`);
}

// Always attempt to reconnect with backoff, regardless of close code
console.debug(
`WebSocket closed (code=${e.code}, reason=${e.reason || 'none'}, wasConnected=${this.connected}), scheduling reconnect`,
);

this.websocket = undefined;
this.connected = false;
this.scheduleReconnect();
}

private onWebsocketMessage(e: MessageEvent) {
Expand Down Expand Up @@ -147,8 +214,13 @@ export class HttpRpcEngine extends EngineBase {
[Symbol.dispose]() {
this.disposed = true;
this.connected = false;
const websocket = this.websocket;
this.websocket = undefined;
websocket?.close();

// Clear any pending retry timeout
if (this.retryTimeoutId !== undefined) {
clearTimeout(this.retryTimeoutId);
this.retryTimeoutId = undefined;
}

this.closeWebSocket();
}
}