Skip to content

Commit bf79eb3

Browse files
[ui] switch to asynch queue for trace loading (#164)
1 parent dc64278 commit bf79eb3

File tree

3 files changed

+296
-11
lines changed

3 files changed

+296
-11
lines changed

ui/src/base/async_queue.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (C) 2025 The Android Open Source Project
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {Deferred, defer} from './deferred';
16+
17+
type Callback<T> = () => Promise<T>;
18+
19+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
20+
interface Task<T = any> {
21+
deferred: Deferred<T>;
22+
work: Callback<T>;
23+
}
24+
25+
/**
26+
* A tiny task queue management utility that ensures async tasks are not
27+
* executed concurrently.
28+
*
29+
* If a task is run while a previous one is still running, it is enqueued and
30+
* run after the first task completes.
31+
*/
32+
export class AsyncQueue {
33+
private readonly taskQueue: Task[] = [];
34+
private isRunning: boolean = false;
35+
36+
/**
37+
* Schedule a task to be run.
38+
*
39+
* @param work An async function to schedule.
40+
* @returns A promise that resolves with the result of running of the task
41+
*/
42+
schedule<T>(work: Callback<T>): Promise<T> {
43+
const deferred = defer<T>();
44+
this.taskQueue.push({work, deferred});
45+
46+
if (!this.isRunning) {
47+
this.isRunning = true;
48+
this.runTaskQueue().finally(() => (this.isRunning = false));
49+
}
50+
51+
return deferred;
52+
}
53+
54+
private async runTaskQueue(): Promise<void> {
55+
let task: Task | undefined;
56+
57+
while ((task = this.taskQueue.shift())) {
58+
try {
59+
const result = await task.work();
60+
task.deferred.resolve(result);
61+
} catch (e) {
62+
task.deferred.reject(e);
63+
}
64+
}
65+
}
66+
}
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// Copyright (C) 2025 The Android Open Source Project
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import { AsyncQueue } from './async_queue';
16+
import { defer } from './deferred';
17+
18+
test('no concurrent callbacks', async () => {
19+
const queue = new AsyncQueue();
20+
21+
const barrier = defer<void>();
22+
const mock1 = jest.fn();
23+
queue.schedule(async () => {
24+
await barrier;
25+
mock1();
26+
});
27+
expect(mock1).not.toHaveBeenCalled();
28+
29+
const mock2 = jest.fn();
30+
queue.schedule(async () => mock2());
31+
expect(mock2).not.toHaveBeenCalled();
32+
33+
barrier.resolve();
34+
await new Promise((resolve) => setTimeout(resolve, 0));
35+
36+
expect(mock1).toHaveBeenCalled();
37+
expect(mock2).toHaveBeenCalled();
38+
});
39+
40+
test('queueing', async () => {
41+
const queue = new AsyncQueue();
42+
43+
const mock1 = jest.fn();
44+
queue.schedule(async () => mock1());
45+
46+
const mock2 = jest.fn();
47+
await queue.schedule(async () => mock2());
48+
49+
expect(mock1).toHaveBeenCalled();
50+
expect(mock2).toHaveBeenCalled();
51+
});
52+
53+
test('multiple queuing executes all tasks', async () => {
54+
const queue = new AsyncQueue();
55+
56+
const mock1 = jest.fn();
57+
queue.schedule(async () => mock1());
58+
59+
const mock2 = jest.fn();
60+
queue.schedule(async () => mock2());
61+
62+
const mock3 = jest.fn();
63+
await queue.schedule(async () => mock3());
64+
65+
expect(mock1).toHaveBeenCalled();
66+
expect(mock2).toHaveBeenCalled();
67+
expect(mock3).toHaveBeenCalled();
68+
});
69+
70+
test('tasks execute in order', async () => {
71+
const queue = new AsyncQueue();
72+
const order: number[] = [];
73+
74+
queue.schedule(async () => order.push(1));
75+
queue.schedule(async () => order.push(2));
76+
queue.schedule(async () => order.push(3));
77+
await queue.schedule(async () => order.push(4));
78+
79+
expect(order).toEqual([1, 2, 3, 4]);
80+
});
81+
82+
test('returns value from task', async () => {
83+
const queue = new AsyncQueue();
84+
85+
const result = await queue.schedule(async () => 42);
86+
87+
expect(result).toBe(42);
88+
});
89+
90+
test('returns different values from different tasks', async () => {
91+
const queue = new AsyncQueue();
92+
93+
const promise1 = queue.schedule(async () => 'first');
94+
const promise2 = queue.schedule(async () => 'second');
95+
const promise3 = queue.schedule(async () => 'third');
96+
97+
expect(await promise1).toBe('first');
98+
expect(await promise2).toBe('second');
99+
expect(await promise3).toBe('third');
100+
});
101+
102+
test('error in callback bubbles up to caller', async () => {
103+
const queue = new AsyncQueue();
104+
const failingCallback = async () => {
105+
throw Error('test error');
106+
};
107+
108+
await expect(queue.schedule(failingCallback)).rejects.toThrow('test error');
109+
});
110+
111+
test('chain continues even when one callback fails', async () => {
112+
const queue = new AsyncQueue();
113+
114+
const failingCallback = async () => {
115+
throw Error();
116+
};
117+
queue.schedule(failingCallback).catch(() => { });
118+
119+
const mock = jest.fn();
120+
await queue.schedule(async () => mock());
121+
122+
expect(mock).toHaveBeenCalled();
123+
});
124+
125+
test('error in middle task does not affect other tasks', async () => {
126+
const queue = new AsyncQueue();
127+
128+
const mock1 = jest.fn();
129+
const promise1 = queue.schedule(async () => mock1());
130+
131+
const failingCallback = async () => {
132+
throw Error('middle error');
133+
};
134+
const promise2 = queue.schedule(failingCallback);
135+
136+
const mock3 = jest.fn();
137+
const promise3 = queue.schedule(async () => mock3());
138+
139+
await promise1;
140+
await expect(promise2).rejects.toThrow('middle error');
141+
await promise3;
142+
143+
expect(mock1).toHaveBeenCalled();
144+
expect(mock3).toHaveBeenCalled();
145+
});
146+
147+
test('handles async operations correctly', async () => {
148+
const queue = new AsyncQueue();
149+
const order: number[] = [];
150+
151+
const barrier1 = defer<void>();
152+
const barrier2 = defer<void>();
153+
154+
queue.schedule(async () => {
155+
await barrier1;
156+
order.push(1);
157+
});
158+
159+
queue.schedule(async () => {
160+
await barrier2;
161+
order.push(2);
162+
});
163+
164+
const promise3 = queue.schedule(async () => {
165+
order.push(3);
166+
});
167+
168+
expect(order).toEqual([]);
169+
170+
barrier1.resolve();
171+
await new Promise((resolve) => setTimeout(resolve, 0));
172+
expect(order).toEqual([1]);
173+
174+
barrier2.resolve();
175+
await promise3;
176+
expect(order).toEqual([1, 2, 3]);
177+
});
178+
179+
test('can schedule new tasks from within executing task', async () => {
180+
const queue = new AsyncQueue();
181+
const order: number[] = [];
182+
183+
await queue.schedule(async () => {
184+
order.push(1);
185+
queue.schedule(async () => order.push(3));
186+
order.push(2);
187+
});
188+
189+
await queue.schedule(async () => order.push(4));
190+
191+
expect(order).toEqual([1, 2, 3, 4]);
192+
});
193+
194+
test('multiple concurrent schedule calls wait for each other', async () => {
195+
const queue = new AsyncQueue();
196+
const barrier = defer<void>();
197+
const results: string[] = [];
198+
199+
queue.schedule(async () => {
200+
await barrier;
201+
results.push('first');
202+
});
203+
204+
const promises = [
205+
queue.schedule(async () => {
206+
results.push('second');
207+
return 'second';
208+
}),
209+
queue.schedule(async () => {
210+
results.push('third');
211+
return 'third';
212+
}),
213+
queue.schedule(async () => {
214+
results.push('fourth');
215+
return 'fourth';
216+
}),
217+
];
218+
219+
expect(results).toEqual([]);
220+
221+
barrier.resolve();
222+
const values = await Promise.all(promises);
223+
224+
expect(results).toEqual(['first', 'second', 'third', 'fourth']);
225+
expect(values).toEqual(['second', 'third', 'fourth']);
226+
});

ui/src/core/app_impl.ts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
import {AsyncLimiter} from '../base/async_limiter';
15+
import {AsyncQueue} from '../base/async_queue';
1616
import {defer} from '../base/deferred';
1717
import {EvtSource} from '../base/events';
1818
import {assertExists, assertIsInstance, assertTrue} from '../base/logging';
@@ -103,7 +103,7 @@ export class AppContext {
103103
readonly initArgs: AppInitArgs;
104104
readonly embeddedMode: boolean;
105105
readonly testingMode: boolean;
106-
readonly openTraceAsyncLimiter = new AsyncLimiter();
106+
readonly openTraceAsyncQueue = new AsyncQueue();
107107
readonly settingsManager: SettingsManagerImpl;
108108

109109
// This is normally empty and is injected with extra google-internal packages
@@ -451,15 +451,13 @@ export class AppImpl implements App {
451451
}
452452
}
453453

454-
const result = defer<TraceImpl>();
455-
456454
// Rationale for asyncLimiter: openTrace takes several seconds and involves
457455
// a long sequence of async tasks (e.g. invoking plugins' onLoad()). These
458456
// tasks cannot overlap if the user opens traces in rapid succession, as
459457
// they will mess up the state of registries. So once we start, we must
460458
// complete trace loading (we don't bother supporting cancellations. If the
461459
// user is too bothered, they can reload the tab).
462-
await this.appCtx.openTraceAsyncLimiter.schedule(async () => {
460+
return this.appCtx.openTraceAsyncQueue.schedule(async () => {
463461
// Wait for extras parsing descriptors to be loaded
464462
// via is_internal_user.js. This prevents a race condition where
465463
// trace loading would otherwise begin before this data is available.
@@ -483,17 +481,12 @@ export class AppImpl implements App {
483481
// loadTrace to be finished before setting it because some internal
484482
// implementation details of loadTrace() rely on that trace to be current
485483
// to work properly (mainly the router hash uuid).
486-
487-
result.resolve(trace);
488-
} catch (error) {
489-
result.reject(error);
484+
return trace;
490485
} finally {
491486
this.appCtx.setTraceLoading(src, false);
492487
raf.scheduleFullRedraw();
493488
}
494489
});
495-
496-
return result;
497490
}
498491

499492
// Called by trace_loader.ts soon after it has created a new TraceImpl.

0 commit comments

Comments
 (0)