Skip to content

Commit a8a88ef

Browse files
tyiuhccursoragent
andcommitted
feat(experiment-tag): dual-write RTBT events to relay (WEB-131)
EventStorageManager writes to relay on addEvent, merges relay store on syncFromRelay (migrate if needed, relay wins on dedup), and flushes relay on unload/visibility. BehavioralTargetingManager exposes setRelayClient and syncFromRelay for Pass 2 evaluation. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 3a567e0 commit a8a88ef

3 files changed

Lines changed: 259 additions & 1 deletion

File tree

packages/experiment-tag/src/behavioral-targeting/behavioral-targeting-manager.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { BehavioralTargetingRules } from '../types';
22

33
import { BehavioralTargetingEvaluator } from './evaluator';
44
import { EventStorageManager } from './event-storage';
5+
import { RelayClient } from './relay-client';
56
import { SessionManager } from './session-manager';
67
import { BehavioralTargeting } from './types';
78

@@ -51,6 +52,24 @@ export class BehavioralTargetingManager {
5152
this.evaluateEvent(eventType);
5253
}
5354

55+
/**
56+
* Attach the relay client for cross-subdomain event dual-write.
57+
*/
58+
public setRelayClient(relayClient: RelayClient | null): void {
59+
this.eventStorage.setRelayClient(relayClient);
60+
}
61+
62+
/**
63+
* Pass 2: migrate local events to relay if needed, merge relay store, re-evaluate.
64+
*/
65+
public async syncFromRelay(): Promise<boolean> {
66+
const synced = await this.eventStorage.syncFromRelay();
67+
if (synced) {
68+
this.evaluateAll();
69+
}
70+
return synced;
71+
}
72+
5473
/**
5574
* Check if a flag has behavioral targeting rules.
5675
* @param flagKey The flag key to check

packages/experiment-tag/src/behavioral-targeting/event-storage.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1+
import { RelayClient } from './relay-client';
2+
import { RelayEventStorage } from './relay-protocol';
13
import { SessionManager } from './session-manager';
24

5+
/**
6+
* Dedup key for cross-subdomain merge (matches relay.js MIGRATE_EVENTS).
7+
*/
8+
export function eventDedupKey(event: {
9+
event_type: string;
10+
timestamp: number;
11+
}): string {
12+
return `${event.event_type}:${event.timestamp}`;
13+
}
14+
315
/**
416
* Represents a stored event record.
517
*/
@@ -33,15 +45,18 @@ export class EventStorageManager {
3345
private hasPendingWrites = false; // Track if cache has unsaved changes
3446
private persistedEvents?: Set<string>; // Optional set of event types to persist
3547
private storageKey: string;
48+
private relayClient: RelayClient | null = null;
3649

3750
constructor(
3851
apiKey: string,
3952
sessionManager: SessionManager,
4053
persistedEvents?: Set<string>,
54+
relayClient?: RelayClient | null,
4155
) {
4256
this.storageKey = `EXP_${apiKey.slice(0, 10)}_rtbt_events`;
4357
this.sessionManager = sessionManager;
4458
this.persistedEvents = persistedEvents;
59+
this.relayClient = relayClient ?? null;
4560

4661
// Load from localStorage into memory on initialization
4762
this.memoryCache = this.loadFromLocalStorage();
@@ -86,6 +101,83 @@ export class EventStorageManager {
86101

87102
// Trigger debounced write to localStorage
88103
this.scheduleDebouncedWrite();
104+
105+
// Fire-and-forget relay write when cross-subdomain sync is enabled
106+
this.relayClient?.writeEvent(event);
107+
}
108+
109+
/**
110+
* Attach or detach the relay client for cross-subdomain dual-write.
111+
*/
112+
setRelayClient(relayClient: RelayClient | null): void {
113+
this.relayClient = relayClient;
114+
}
115+
116+
/**
117+
* Flushes pending relay writes (e.g. on page unload).
118+
*/
119+
flushRelay(): void {
120+
this.relayClient?.flush();
121+
}
122+
123+
/**
124+
* Merges relay events into the in-memory cache. Relay wins on dedup conflicts.
125+
*/
126+
mergeFromRelay(relayStore: RelayEventStorage): void {
127+
const byKey = new Map<string, EventRecord>();
128+
for (const event of this.memoryCache.events) {
129+
byKey.set(eventDedupKey(event), event);
130+
}
131+
for (const event of relayStore.events) {
132+
byKey.set(eventDedupKey(event), event);
133+
}
134+
135+
let events = Array.from(byKey.values()).sort(
136+
(a, b) => a.timestamp - b.timestamp,
137+
);
138+
if (events.length > EventStorageManager.MAX_EVENTS) {
139+
events = events.slice(-EventStorageManager.MAX_EVENTS);
140+
}
141+
142+
let nextId = Math.max(this.memoryCache.nextId, relayStore.nextId);
143+
for (const event of events) {
144+
if (event.id + 1 > nextId) {
145+
nextId = event.id + 1;
146+
}
147+
}
148+
149+
this.memoryCache = { events, nextId };
150+
this.hasPendingWrites = true;
151+
this.scheduleDebouncedWrite();
152+
}
153+
154+
/**
155+
* Pass 2 sync: migrate local store to relay if needed, then merge relay events.
156+
* Returns true when sync completed; false when relay unavailable or RPC failed.
157+
*/
158+
async syncFromRelay(): Promise<boolean> {
159+
const relay = this.relayClient;
160+
if (!relay?.relayAvailable) {
161+
return false;
162+
}
163+
164+
try {
165+
const origin = window.location.origin;
166+
const migrated = await relay.checkMigrated(origin);
167+
168+
if (!migrated && this.memoryCache.events.length > 0) {
169+
await relay.migrateEvents(origin, {
170+
events: [...this.memoryCache.events],
171+
nextId: this.memoryCache.nextId,
172+
});
173+
}
174+
175+
const relayStore = await relay.readEvents();
176+
this.mergeFromRelay(relayStore);
177+
return true;
178+
} catch {
179+
return false;
180+
}
89181
}
90182

91183
/**
@@ -148,6 +240,7 @@ export class EventStorageManager {
148240
*/
149241
flush(): void {
150242
this.flushToLocalStorage();
243+
this.flushRelay();
151244
}
152245

153246
/**
@@ -254,6 +347,7 @@ export class EventStorageManager {
254347
*/
255348
private handleBeforeUnload = (): void => {
256349
this.flushToLocalStorage();
350+
this.flushRelay();
257351
};
258352

259353
/**
@@ -262,6 +356,7 @@ export class EventStorageManager {
262356
private handleVisibilityChange = (): void => {
263357
if (document.visibilityState === 'hidden') {
264358
this.flushToLocalStorage();
359+
this.flushRelay();
265360
}
266361
};
267362

@@ -272,6 +367,7 @@ export class EventStorageManager {
272367
cleanup(): void {
273368
// Flush any pending writes
274369
this.flushToLocalStorage();
370+
this.flushRelay();
275371

276372
// Clear debounce timeout
277373
if (this.debouncedWriteTimeout) {

packages/experiment-tag/test/behavioral-targeting/event-storage.test.ts

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,20 @@
1-
import { EventStorageManager } from 'src/behavioral-targeting/event-storage';
1+
import {
2+
EventStorageManager,
3+
eventDedupKey,
4+
} from 'src/behavioral-targeting/event-storage';
5+
import { RelayClient } from 'src/behavioral-targeting/relay-client';
26
import { SessionManager } from 'src/behavioral-targeting/session-manager';
37

8+
type MockRelayClient = Pick<
9+
RelayClient,
10+
| 'relayAvailable'
11+
| 'writeEvent'
12+
| 'flush'
13+
| 'readEvents'
14+
| 'checkMigrated'
15+
| 'migrateEvents'
16+
>;
17+
418
describe('EventStorageManager', () => {
519
let eventStorage: EventStorageManager;
620
let sessionManager: SessionManager;
@@ -411,4 +425,133 @@ describe('EventStorageManager', () => {
411425
expect(events[0].timestamp).toBeLessThanOrEqual(after);
412426
});
413427
});
428+
429+
describe('eventDedupKey', () => {
430+
test('uses event_type and timestamp', () => {
431+
expect(eventDedupKey({ event_type: 'click', timestamp: 1000 })).toBe(
432+
'click:1000',
433+
);
434+
});
435+
});
436+
437+
describe('relay dual-write', () => {
438+
const createMockRelay = (available = true) => ({
439+
relayAvailable: available,
440+
writeEvent: jest.fn(),
441+
flush: jest.fn(),
442+
readEvents: jest.fn().mockResolvedValue({ events: [], nextId: 1 }),
443+
checkMigrated: jest.fn().mockResolvedValue(true),
444+
migrateEvents: jest.fn().mockResolvedValue(undefined),
445+
});
446+
447+
test('writes events to relay on addEvent when relay is attached', () => {
448+
const relay = createMockRelay();
449+
eventStorage.setRelayClient(relay as unknown as RelayClient);
450+
eventStorage.addEvent('click', { page: 'home' });
451+
452+
expect(relay.writeEvent).toHaveBeenCalledWith(
453+
expect.objectContaining({
454+
event_type: 'click',
455+
properties: { page: 'home' },
456+
id: 1,
457+
}),
458+
);
459+
});
460+
461+
test('does not write to relay when relay is not attached', () => {
462+
const relay = createMockRelay();
463+
eventStorage.addEvent('click');
464+
expect(relay.writeEvent).not.toHaveBeenCalled();
465+
});
466+
467+
test('flush calls relay flush', () => {
468+
const relay = createMockRelay();
469+
eventStorage.setRelayClient(relay as unknown as RelayClient);
470+
eventStorage.flush();
471+
expect(relay.flush).toHaveBeenCalled();
472+
});
473+
});
474+
475+
describe('mergeFromRelay', () => {
476+
test('merges relay events and relay wins on dedup key conflict', () => {
477+
eventStorage.addEvent('click', { source: 'local' }, 1000);
478+
const local = eventStorage.getAllEvents()[0];
479+
480+
eventStorage.mergeFromRelay({
481+
events: [
482+
{
483+
...local,
484+
properties: { source: 'relay' },
485+
},
486+
{
487+
id: 99,
488+
event_type: 'view',
489+
timestamp: 1001,
490+
session_id: local.session_id,
491+
properties: {},
492+
},
493+
],
494+
nextId: 100,
495+
});
496+
497+
const events = eventStorage.getAllEvents();
498+
expect(events).toHaveLength(2);
499+
expect(events[0].properties).toEqual({ source: 'relay' });
500+
expect(events[1].event_type).toBe('view');
501+
});
502+
});
503+
504+
describe('syncFromRelay', () => {
505+
test('migrates local store when origin not migrated', async () => {
506+
eventStorage.addEvent('click', {}, 1000);
507+
const relay = {
508+
relayAvailable: true,
509+
writeEvent: jest.fn(),
510+
flush: jest.fn(),
511+
checkMigrated: jest.fn().mockResolvedValue(false),
512+
migrateEvents: jest.fn().mockResolvedValue(undefined),
513+
readEvents: jest.fn().mockResolvedValue({
514+
events: [
515+
{
516+
id: 1,
517+
event_type: 'click',
518+
timestamp: 1000,
519+
session_id: 's1',
520+
properties: {},
521+
},
522+
],
523+
nextId: 2,
524+
}),
525+
};
526+
eventStorage.setRelayClient(relay as unknown as RelayClient);
527+
528+
const synced = await eventStorage.syncFromRelay();
529+
530+
expect(synced).toBe(true);
531+
expect(relay.migrateEvents).toHaveBeenCalledWith(
532+
window.location.origin,
533+
expect.objectContaining({
534+
events: expect.arrayContaining([
535+
expect.objectContaining({ event_type: 'click' }),
536+
]),
537+
}),
538+
);
539+
expect(relay.readEvents).toHaveBeenCalled();
540+
});
541+
542+
test('returns false when relay is unavailable', async () => {
543+
const relay = {
544+
relayAvailable: false,
545+
writeEvent: jest.fn(),
546+
flush: jest.fn(),
547+
readEvents: jest.fn(),
548+
checkMigrated: jest.fn(),
549+
migrateEvents: jest.fn(),
550+
};
551+
eventStorage.setRelayClient(relay as unknown as RelayClient);
552+
553+
expect(await eventStorage.syncFromRelay()).toBe(false);
554+
expect(relay.readEvents).not.toHaveBeenCalled();
555+
});
556+
});
414557
});

0 commit comments

Comments
 (0)