forked from google/perfetto
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhttp_rpc_engine.ts
More file actions
226 lines (196 loc) · 7.02 KB
/
http_rpc_engine.ts
File metadata and controls
226 lines (196 loc) · 7.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import protos from '../protos';
import {fetchWithTimeout} from '../base/http_utils';
import {assertExists, reportError} from '../base/logging';
import {EngineBase} from '../trace_processor/engine';
const RPC_CONNECT_TIMEOUT_MS = 2000;
const INITIAL_RETRY_DELAY_MS = 100;
const MAX_RETRY_DELAY_MS = 30000;
const BACKOFF_MULTIPLIER = 2;
export interface HttpRpcState {
connected: boolean;
status?: protos.StatusResult;
failure?: string;
}
export class HttpRpcEngine extends EngineBase {
readonly mode = 'HTTP_RPC';
readonly id: string;
private requestQueue = new Array<Uint8Array>();
private websocket?: WebSocket;
private connected = false;
private disposed = false;
private queue: Blob[] = [];
private isProcessingQueue = false;
private retryDelayMs = INITIAL_RETRY_DELAY_MS;
private retryTimeoutId?: ReturnType<typeof setTimeout>;
// Can be changed by frontend/index.ts when passing ?rpc_port=1234 .
static defaultRpcPort = '9001';
constructor(
id: string,
private port: string,
) {
super();
this.id = id;
}
rpcSendRequestBytes(data: Uint8Array): void {
if (this.disposed) return;
const websocket = this.getOrCreateWebSocket();
if (this.connected) {
websocket.send(data);
} else {
this.requestQueue.push(data); // onWebsocketConnected() will flush this.
}
}
/**
* Returns the existing WebSocket if one exists and is not closed,
* otherwise creates a new one (closing any stale socket first).
*/
private getOrCreateWebSocket(): WebSocket {
// If we have an active websocket that's not closed/closing, reuse it
if (
this.websocket !== undefined &&
this.websocket.readyState !== WebSocket.CLOSED &&
this.websocket.readyState !== WebSocket.CLOSING
) {
return this.websocket;
}
// Close any stale websocket before creating a new one
this.closeWebSocket();
const wsUrl = `ws://${HttpRpcEngine.getHostAndPort(this.port)}/websocket`;
this.websocket = new WebSocket(wsUrl);
this.websocket.onopen = () => this.onWebsocketConnected();
this.websocket.onmessage = (e) => this.onWebsocketMessage(e);
this.websocket.onclose = (e) => this.onWebsocketClosed(e);
this.websocket.onerror = (e) => this.onWebsocketError(e);
return this.websocket;
}
/**
* Closes the current websocket if one exists, clearing event handlers
* to prevent spurious callbacks.
*/
private closeWebSocket(): void {
if (this.websocket === undefined) return;
// Clear handlers to prevent callbacks from a closing socket
this.websocket.onopen = null;
this.websocket.onmessage = null;
this.websocket.onclose = null;
this.websocket.onerror = null;
this.websocket.close();
this.websocket = undefined;
}
private onWebsocketError(e: Event): void {
if (this.disposed) return;
const readyState = (e.target as WebSocket)?.readyState;
console.warn(`WebSocket error rs=${readyState}, will retry with backoff`);
// The close event will fire after this, which will trigger the retry logic
}
private scheduleReconnect(): void {
if (this.disposed) return;
console.debug(
`Scheduling WebSocket reconnection in ${this.retryDelayMs}ms`,
);
this.retryTimeoutId = setTimeout(() => {
if (this.disposed) return;
console.debug('Attempting WebSocket reconnection...');
this.getOrCreateWebSocket();
}, this.retryDelayMs);
// Exponential backoff with cap
this.retryDelayMs = Math.min(
this.retryDelayMs * BACKOFF_MULTIPLIER,
MAX_RETRY_DELAY_MS,
);
}
private onWebsocketConnected() {
// Reset retry delay on successful connection
this.retryDelayMs = INITIAL_RETRY_DELAY_MS;
for (;;) {
const queuedMsg = this.requestQueue.shift();
if (queuedMsg === undefined) break;
assertExists(this.websocket).send(queuedMsg);
}
console.debug('WebSocket (re)connected on port', this.port);
this.connected = true;
}
private onWebsocketClosed(e: CloseEvent) {
if (this.disposed) return;
// Always attempt to reconnect with backoff, regardless of close code
console.debug(
`WebSocket closed (code=${e.code}, reason=${e.reason || 'none'}, wasConnected=${this.connected}), scheduling reconnect`,
);
this.websocket = undefined;
this.connected = false;
this.scheduleReconnect();
}
private onWebsocketMessage(e: MessageEvent) {
const blob = assertExists(e.data as Blob);
this.queue.push(blob);
this.processQueue();
}
private async processQueue() {
if (this.isProcessingQueue) return;
this.isProcessingQueue = true;
while (this.queue.length > 0) {
try {
const blob = assertExists(this.queue.shift());
const buf = await blob.arrayBuffer();
super.onRpcResponseBytes(new Uint8Array(buf));
} catch (e) {
reportError(e);
}
}
this.isProcessingQueue = false;
}
static async checkConnection(port: string): Promise<HttpRpcState> {
const RPC_URL = `http://${HttpRpcEngine.getHostAndPort(port)}/`;
const httpRpcState: HttpRpcState = {connected: false};
console.info(
`It's safe to ignore the ERR_CONNECTION_REFUSED on ${RPC_URL} below. ` +
`That might happen while probing the external native accelerator. The ` +
`error is non-fatal and unlikely to be the culprit for any UI bug.`,
);
try {
const resp = await fetchWithTimeout(
RPC_URL + 'status',
{method: 'post', cache: 'no-cache'},
RPC_CONNECT_TIMEOUT_MS,
);
if (resp.status !== 200) {
httpRpcState.failure = `${resp.status} - ${resp.statusText}`;
} else {
const buf = new Uint8Array(await resp.arrayBuffer());
// Decode the response buffer first. If decoding is successful, update the connection state.
// This ensures that the connection state is only set to true if the data is correctly parsed.
httpRpcState.status = protos.StatusResult.decode(buf);
httpRpcState.connected = true;
}
} catch (err) {
httpRpcState.failure = `${err}`;
}
return httpRpcState;
}
static getHostAndPort(port = HttpRpcEngine.defaultRpcPort) {
return `127.0.0.1:${port}`;
}
[Symbol.dispose]() {
this.disposed = true;
this.connected = false;
// Clear any pending retry timeout
if (this.retryTimeoutId !== undefined) {
clearTimeout(this.retryTimeoutId);
this.retryTimeoutId = undefined;
}
this.closeWebSocket();
}
}