-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathusage-tracker.ts
More file actions
341 lines (298 loc) · 9.89 KB
/
usage-tracker.ts
File metadata and controls
341 lines (298 loc) · 9.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import { waitUntil } from '@vercel/functions';
import { version } from '../../package.json';
const RESOLVED_VOID: Promise<void> = Promise.resolve();
const isDebugMode = process.env.DEBUG?.includes('@vercel/flags-core');
const debugLog = (...args: any[]) => {
if (!isDebugMode) return;
console.log(...args);
};
export interface FlagsConfigReadEvent {
type: 'FLAGS_CONFIG_READ';
ts: number;
payload: {
deploymentId?: string;
region?: string;
invocationHost?: string;
vercelRequestId?: string;
cacheStatus?: 'HIT' | 'MISS' | 'BYPASS' | 'STALE';
cacheAction?: 'REFRESHING' | 'FOLLOWING' | 'NONE';
cacheIsBlocking?: boolean;
cacheIsFirstRead?: boolean;
duration?: number;
configUpdatedAt?: number;
configOrigin?: 'in-memory' | 'embedded' | 'poll' | 'stream' | 'constructor';
mode?: 'poll' | 'stream' | 'build' | 'offline';
revision?: string;
environment?: string;
};
}
export interface FlagEvaluationEvent {
type: 'FLAG_EVALUATION';
ts: number;
payload: {
flagId: string;
variantId: string;
reason: string;
deploymentId?: string;
region?: string;
};
}
export type IngestEvent = FlagsConfigReadEvent | FlagEvaluationEvent;
interface EventBatcher {
events: IngestEvent[];
/** Resolves the current wait period early (e.g., when batch size is reached) */
resolveWait: (() => void) | null;
/** Promise for flush operation */
pending: null | Promise<void>;
}
const MAX_RETRIES = 3;
const MAX_BATCH_SIZE = 50;
const MAX_BATCH_WAIT_MS = 5000;
interface RequestContext {
ctx: object | undefined;
headers: Record<string, string> | undefined;
}
const SYMBOL_FOR_REQ_CONTEXT = Symbol.for('@vercel/request-context');
const fromSymbol = globalThis as typeof globalThis & {
[key: symbol]:
| { get?: () => { headers?: Record<string, string> } }
| undefined;
};
/**
* Gets the Vercel request context and headers from the global symbol.
*/
function getRequestContext(): RequestContext {
try {
const ctx = fromSymbol[SYMBOL_FOR_REQ_CONTEXT]?.get?.();
if (ctx && Object.hasOwn(ctx, 'headers')) {
return {
ctx,
headers: ctx.headers as Record<string, string>,
};
}
return { ctx, headers: undefined };
} catch {
return { ctx: undefined, headers: undefined };
}
}
export interface UsageTrackerOptions {
sdkKey: string;
host: string;
fetch: typeof fetch;
}
export interface TrackReadOptions {
/** Whether the config was read from in-memory cache or embedded bundle */
configOrigin: 'in-memory' | 'embedded';
/** HIT when definitions exist in memory, MISS when not, BYPASS when using fallback as primary source */
cacheStatus?: 'HIT' | 'MISS' | 'BYPASS';
/** FOLLOWING when streaming, REFRESHING when polling, NONE otherwise */
cacheAction?: 'REFRESHING' | 'FOLLOWING' | 'NONE';
/** True for the very first getData call */
cacheIsFirstRead?: boolean;
/** Whether the cache read was blocking */
cacheIsBlocking?: boolean;
/** Duration in milliseconds from start of getData until trackRead */
duration?: number;
/** Timestamp when the config was last updated */
configUpdatedAt?: number;
/** The mode the SDK is operating in */
mode?: 'poll' | 'stream' | 'build' | 'offline';
/** Revision of the config */
revision?: number;
}
export interface TrackEvaluationOptions {
flagId: string;
variantId: string;
reason: string;
}
/**
* Tracks usage events and batches them for submission to the ingest endpoint.
*/
export class UsageTracker {
private flushCounter: number = 0;
private options: UsageTrackerOptions;
private trackedRequests = new WeakSet<object>();
private batcher: EventBatcher = {
events: [],
resolveWait: null,
pending: null,
};
constructor(options: UsageTrackerOptions) {
this.options = options;
}
/**
* Triggers an immediate flush of any pending events.
* Returns a promise that resolves when the flush completes.
*/
flush(): Promise<void> {
if (this.batcher.pending) {
this.batcher.resolveWait?.();
return this.batcher.pending;
}
// No scheduled flush yet — flush directly if there are queued events
if (this.batcher.events.length > 0) {
return this.flushEvents();
}
return RESOLVED_VOID;
}
/**
* Tracks a config read event. Deduplicates by request context.
*/
trackRead(options?: TrackReadOptions): void {
try {
const { ctx, headers } = getRequestContext();
// Skip if request context can't be inferred
if (!ctx) return;
// Skip if we've already tracked this request
if (this.trackedRequests.has(ctx)) return;
this.trackedRequests.add(ctx);
const event: FlagsConfigReadEvent = {
type: 'FLAGS_CONFIG_READ',
ts: Date.now(),
payload: {
deploymentId: process.env.VERCEL_DEPLOYMENT_ID,
region: process.env.VERCEL_REGION,
},
};
if (headers) {
event.payload.vercelRequestId = headers['x-vercel-id'] ?? undefined;
event.payload.invocationHost = headers.host ?? undefined;
}
if (options) {
event.payload.configOrigin = options.configOrigin;
if (options.cacheStatus !== undefined) {
event.payload.cacheStatus = options.cacheStatus;
}
if (options.cacheAction !== undefined) {
event.payload.cacheAction = options.cacheAction;
}
if (options.cacheIsFirstRead !== undefined) {
event.payload.cacheIsFirstRead = options.cacheIsFirstRead;
}
if (options.cacheIsBlocking !== undefined) {
event.payload.cacheIsBlocking = options.cacheIsBlocking;
}
if (options.duration !== undefined) {
event.payload.duration = options.duration;
}
if (options.configUpdatedAt !== undefined) {
event.payload.configUpdatedAt = options.configUpdatedAt;
}
if (options.mode !== undefined) {
event.payload.mode = options.mode;
}
if (options.revision !== undefined) {
event.payload.revision = String(options.revision);
}
}
const environment =
process.env.VERCEL_ENV || process.env.NODE_ENV || undefined;
if (environment) {
event.payload.environment = environment;
}
this.batcher.events.push(event);
this.scheduleFlush();
} catch (error) {
// trackRead should never throw, but log the error
console.error('@vercel/flags-core: Failed to record event:', error);
}
}
/**
* Tracks a flag evaluation event.
*/
trackEvaluation(options: TrackEvaluationOptions): void {
try {
const event: FlagEvaluationEvent = {
type: 'FLAG_EVALUATION',
ts: Date.now(),
payload: {
flagId: options.flagId,
variantId: options.variantId,
reason: options.reason,
deploymentId: process.env.VERCEL_DEPLOYMENT_ID,
region: process.env.VERCEL_REGION,
},
};
this.batcher.events.push(event);
this.scheduleFlush();
} catch (error) {
console.error(
'@vercel/flags-core: Failed to record evaluation event:',
error,
);
}
}
private scheduleFlush(): void {
if (!this.batcher.pending) {
let timeout: null | ReturnType<typeof setTimeout> = null;
const pending = (async () => {
await new Promise<void>((res) => {
this.batcher.resolveWait = res;
timeout = setTimeout(res, MAX_BATCH_WAIT_MS);
});
this.batcher.pending = null;
this.batcher.resolveWait = null;
if (timeout) clearTimeout(timeout);
await this.flushEvents();
})();
// Use waitUntil to keep the function alive until flush completes
// If `waitUntil` is not available this will be a no-op and leave
// a floating promise that will be completed in the background
try {
waitUntil(pending);
} catch {
// waitUntil is best-effort; falling through leaves a floating promise
}
this.batcher.pending = pending;
}
// Trigger early flush if threshold was reached
if (this.batcher.events.length >= MAX_BATCH_SIZE) {
this.batcher.resolveWait?.();
}
}
private async flushEvents(): Promise<void> {
if (this.batcher.events.length === 0) return;
// Take all events and clear the queue
const eventsToSend = this.batcher.events;
this.batcher.events = [];
const flushId = ++this.flushCounter;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const response = await this.options.fetch(
`${this.options.host}/v1/ingest`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.options.sdkKey}`,
'User-Agent': `VercelFlagsCore/${version}`,
...(process.env.VERCEL_ENV
? { 'X-Vercel-Env': process.env.VERCEL_ENV }
: null),
...(isDebugMode ? { 'x-vercel-debug-ingest': '1' } : null),
},
body: JSON.stringify(eventsToSend),
},
);
debugLog(
`@vercel/flags-core: Ingest response ${response.status} for ${eventsToSend.length} events on ${response.headers.get('x-vercel-id')}`,
);
if (response.ok) {
break; // Break the loop if the request succeeded
}
throw new Error(
`Ingest endpoint responded with status ${response.status} for ${eventsToSend.length} events on request ${response.headers.get('x-vercel-id')}.\n` +
`Response body: ${await response.text().catch(() => null)}`,
);
} catch (error) {
console.error(
`@vercel/flags-core: Error sending events (attempt=${attempt}/${MAX_RETRIES} flushId=${flushId}):`,
error,
);
if (attempt < MAX_RETRIES) {
await new Promise((res) => setTimeout(res, attempt * 100));
}
}
}
}
}