-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpolling-observer.ts
133 lines (107 loc) · 3.87 KB
/
polling-observer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import { delayUntil } from '../delay-until/index';
import { PollingMeasure } from './polling-measure';
export interface PollingObserverOptions {
timeout?: number;
interval?: number;
}
type PollingCallback<T> = () => T | Promise<T>;
type ConditionCallback<T> = (
data: T | null | undefined,
records: PollingObserver<T>['_records'],
object: PollingObserver<T>
) => boolean | Promise<boolean>;
export interface OnfinishFulfilled<T> {
status: 'finish' | 'timeout';
value: T | null | undefined;
}
export interface OnfinishRejected {
readonly status: 'error';
reason: Error;
}
export type OnfinishValue<T> = OnfinishFulfilled<T> | OnfinishRejected;
type OnfinishCallback<T> = (
value: OnfinishValue<T>,
records: PollingObserver<T>['_records'],
object: PollingObserver<T>
) => unknown;
function isPromise<T>(r: T | Promise<T>): r is Promise<T> {
return 'function' === typeof((r as Promise<T>).then);
}
const perf = globalThis.performance;
export class PollingObserver<T> {
public onfinish?: OnfinishCallback<T>;
private _forceStop = false;
private _records: PollingMeasure[] = [];
private _isPolling = false;
constructor(public conditionCallback: ConditionCallback<T>) {
if ('function' !== typeof(conditionCallback)) {
throw new TypeError(`'conditionCallback' is not defined`);
}
}
public disconnect(): void {
this._forceStop = true;
if (!this._isPolling) this._records = [];
}
public async observe(
callback: PollingCallback<T>,
options?: PollingObserverOptions
): Promise<void> {
/**
* NOTE(motss): To ensure `this._forceStop` is always reset before start observing.
*/
this._forceStop = false;
const { interval, timeout }: PollingObserverOptions = options || {};
const isValidInterval = 'number' === typeof(interval) && interval > 0;
const obsTimeout = 'number' === typeof(timeout) ? +timeout : -1;
const obsInterval = isValidInterval ? +(interval as number) : 100;
const isInfinitePolling = obsTimeout < 1;
const records = this._records;
const onfinishCallback = this.onfinish;
const conditionCallback = this.conditionCallback;
const loop = true;
let totalTime = 0;
let value: T | null | undefined = void 0;
let i = 0;
let status: OnfinishFulfilled<T>['status'] = 'finish';
let result: OnfinishValue<T> = {} as OnfinishValue<T>;
try {
polling: while (loop) {
if (this._forceStop) break polling;
/** NOTE(motss): Set to indicate polling initiates */
this._isPolling = true;
const conditionResult = conditionCallback(value, records, this);
const didConditionMeet = isPromise(conditionResult) ?
await conditionResult : conditionResult;
const didTimeout = isInfinitePolling ? false : totalTime >= obsTimeout;
if (didTimeout || didConditionMeet) {
status = didTimeout ? 'timeout' : status;
break polling;
}
const startAt = perf.now();
const r = callback();
value = isPromise(r) ? await r : r;
const endAt = perf.now();
const duration = endAt - startAt;
const timeLeft = isValidInterval ? obsInterval - duration : 0;
records.push(new PollingMeasure(`polling:${i}`, duration, startAt));
totalTime += (duration > obsInterval ? duration : obsInterval);
i += 1;
if (timeLeft > 0) await delayUntil(timeLeft);
}
result = { status, value };
} catch (e) {
result = { status: 'error', reason: e as Error };
} finally {
const recordsSlice = records.slice();
if (this._forceStop) this._records = [];
/** NOTE(motss): Reset flags */
this._isPolling = this._forceStop = false;
if ('function' === typeof(onfinishCallback)) {
onfinishCallback(result, recordsSlice, this);
}
}
}
public takeRecords(): PollingMeasure[] {
return this._records;
}
}