Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3a87524
feat(experiment-tag): add RTBT relay protocol and RelayClient
tyiuhc Jun 15, 2026
66b119f
fix(experiment-tag): harden RelayClient init and write paths
tyiuhc Jun 15, 2026
58aabf4
fix(experiment-tag): defer relay init and flush queued writes on ready
tyiuhc Jun 16, 2026
ae85a13
fix(experiment-tag): bound relay init timeout and guard body append
tyiuhc Jun 16, 2026
29477ce
fix: late iframe after init timeout and verify message source
tyiuhc Jun 16, 2026
320f547
fix: reset destroyed on re-init and drop writes after destroy
tyiuhc Jun 16, 2026
9e947b4
fix: clear pendingWrites on RelayClient destroy
tyiuhc Jun 16, 2026
7d3d2d4
fix: cancel whenBodyReady poll on RelayClient destroy
tyiuhc Jun 16, 2026
fa78dbc
refactor: drop unused dev flag from getRelayUrl
tyiuhc Jun 16, 2026
5be0c0f
Pass web_exp_id_v2 on relay RPCs for per-visitor storage.
tyiuhc Jun 16, 2026
3a567e0
fix(relay-client): reject pending RPCs on destroy
tyiuhc Jun 16, 2026
a8a88ef
feat(experiment-tag): dual-write RTBT events to relay (WEB-131)
tyiuhc Jun 16, 2026
4d3eff1
chore(experiment-tag): remove unused MockRelayClient type in tests
tyiuhc Jun 16, 2026
2536294
Revert "chore(experiment-tag): remove unused MockRelayClient type in …
tyiuhc Jun 16, 2026
56e8794
Revert "feat(experiment-tag): dual-write RTBT events to relay (WEB-131)"
tyiuhc Jun 16, 2026
845d4bc
chore(experiment-tag): drop WEB_EXP_ID_V2_PATTERN from relay protocol
tyiuhc Jun 16, 2026
b7b10a8
fix(experiment-tag): add waitForAvailable and in-flight write queue
tyiuhc Jun 16, 2026
0ed2ddf
fix(experiment-tag): keep pending writes until relay RPC confirms
tyiuhc Jun 17, 2026
0e9bab9
fix(experiment-tag): dedupe relay pending writes on writeEvent
tyiuhc Jun 17, 2026
bd419e3
fix(experiment-tag): match pending write removal by event id
tyiuhc Jun 17, 2026
45057fe
fix(experiment-tag): harden RelayClient re-init and ignore post-destr…
tyiuhc Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/experiment-tag/src/behavioral-targeting/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ export {
BehavioralCondition,
BehavioralConditionSet,
} from './types';
export * from './relay-protocol';
export * from './relay-client';
349 changes: 349 additions & 0 deletions packages/experiment-tag/src/behavioral-targeting/relay-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
import { whenBodyReady } from '../util/when-body-ready';

import {
RELAY_READY_MESSAGE,
RELAY_RPC_TIMEOUT_MS,
RelayEventRecord,
RelayEventStorage,
RelayRequest,
RelayResponse,
} from './relay-protocol';

export function getRelayUrl(apiKey: string): string {
return `https://cdn.amplitude.com/script/${apiKey}.relay.html`;
}

function isRelayReadyMessage(data: unknown): boolean {
if (data === RELAY_READY_MESSAGE) {
return true;
}
return (
typeof data === 'object' &&
data !== null &&
(data as { type?: string }).type === RELAY_READY_MESSAGE
);
}

function createRequestId(): string {
return `${Date.now()}-${Math.random().toString(36).slice(2)}`;
}

function isSameRelayEvent(a: RelayEventRecord, b: RelayEventRecord): boolean {
return (
a.id === b.id &&
a.event_type === b.event_type &&
a.timestamp === b.timestamp
);
}

export class RelayClient {
private iframe: HTMLIFrameElement | null = null;
private iframeWindow: Window | null = null;
private relayOrigin = '';
private ready = false;
private available = false;
private pendingWrites: RelayEventRecord[] = [];
private readonly pendingRequests = new Map<
string,
{
resolve: (response: RelayResponse) => void;
reject: (error: Error) => void;
}
>();

private messageListener: ((event: MessageEvent) => void) | null = null;
private initPromise: Promise<void> | null = null;
private initTimeoutId: number | null = null;
private initResolve: (() => void) | null = null;
private cancelBodyReadyPoll: (() => void) | null = null;
private destroyed = false;
private readonly availableWaiters: Array<() => void> = [];

constructor(
private readonly apiKey: string,
private readonly webExpIdV2: string,
private readonly relayUrl: string,
) {
this.relayOrigin = new URL(relayUrl).origin;
}

private createRelayRequest(
type: RelayRequest['type'],
payload?: unknown,
): RelayRequest {
return {
type,
requestId: createRequestId(),
apiKey: this.apiKey,
web_exp_id_v2: this.webExpIdV2,
payload,
};
}

get relayAvailable(): boolean {
return this.available;
}

private notifyAvailable(): void {
const waiters = [...this.availableWaiters];
this.availableWaiters.length = 0;
for (const waiter of waiters) {
waiter();
}
}

/**
* Resolves when the relay becomes available, or after timeout.
* Use after init() when the init timer may have fired before RELAY_READY.
*/
waitForAvailable(timeoutMs = RELAY_RPC_TIMEOUT_MS): Promise<boolean> {
if (this.destroyed) {
return Promise.resolve(false);
}
if (this.available) {
return Promise.resolve(true);
}

return new Promise((resolve) => {
let settled = false;
const finish = () => {
if (settled) {
return;
}
settled = true;
window.clearTimeout(timeoutId);
const idx = this.availableWaiters.indexOf(onAvailable);
if (idx !== -1) {
this.availableWaiters.splice(idx, 1);
}
resolve(this.available && !this.destroyed);
};
const onAvailable = () => finish();
this.availableWaiters.push(onAvailable);
const timeoutId = window.setTimeout(() => finish(), timeoutMs);
});
}

async init(): Promise<void> {
if (this.initPromise) {
return this.initPromise;
}
Comment thread
cursor[bot] marked this conversation as resolved.

// Reset transient state so a re-init never inherits a stale listener,
// iframe window, or availability flag from a prior lifecycle.
this.destroyed = false;
this.available = false;
this.ready = false;
this.iframeWindow = null;
if (this.messageListener) {
window.removeEventListener('message', this.messageListener);
this.messageListener = null;
}

this.initPromise = new Promise((resolve) => {
this.initResolve = resolve;

const finishInit = () => {
if (!this.initResolve) {
return;
}
if (this.initTimeoutId !== null) {
window.clearTimeout(this.initTimeoutId);
this.initTimeoutId = null;
}
this.initResolve = null;
this.ready = true;
resolve();
};

this.initTimeoutId = window.setTimeout(() => {
this.initTimeoutId = null;
finishInit();
}, RELAY_RPC_TIMEOUT_MS);

this.cancelBodyReadyPoll?.();
this.cancelBodyReadyPoll = whenBodyReady(() => {
if (this.destroyed || this.iframe) {
return;
}
Comment thread
cursor[bot] marked this conversation as resolved.
if (!document.body) {
return;
}

const iframe = document.createElement('iframe');
iframe.src = this.relayUrl;
iframe.style.display = 'none';
iframe.setAttribute('aria-hidden', 'true');
document.body.appendChild(iframe);
Comment thread
cursor[bot] marked this conversation as resolved.
this.iframe = iframe;

const onMessage = (event: MessageEvent) => {
if (this.destroyed) {
return;
}
if (event.origin !== this.relayOrigin) {
return;
}
if (event.source !== iframe.contentWindow) {
return;
}

if (!this.available && isRelayReadyMessage(event.data)) {
this.iframeWindow = iframe.contentWindow;
this.available = true;
this.flush();
this.notifyAvailable();
finishInit();
return;
}
Comment thread
cursor[bot] marked this conversation as resolved.

const response = event.data as RelayResponse;
if (!response?.requestId) {
return;
}
const pending = this.pendingRequests.get(response.requestId);
if (!pending) {
return;
}
this.pendingRequests.delete(response.requestId);
pending.resolve(response);
};

window.addEventListener('message', onMessage);
this.messageListener = onMessage;
Comment thread
cursor[bot] marked this conversation as resolved.
});
});
Comment thread
cursor[bot] marked this conversation as resolved.

return this.initPromise;
}

private sendRequest(request: RelayRequest): Promise<RelayResponse> {
return new Promise((resolve, reject) => {
if (!this.available || !this.iframeWindow) {
reject(new Error('relay unavailable'));
return;
}

this.pendingRequests.set(request.requestId, { resolve, reject });
this.iframeWindow.postMessage(request, this.relayOrigin);

window.setTimeout(() => {
if (!this.pendingRequests.has(request.requestId)) {
return;
}
this.pendingRequests.delete(request.requestId);
reject(new Error('relay rpc timeout'));
}, RELAY_RPC_TIMEOUT_MS);
});
}

async readEvents(): Promise<RelayEventStorage> {
const response = await this.sendRequest(
this.createRelayRequest('READ_EVENTS'),
);
if (!response.ok) {
throw new Error(response.error ?? 'read events failed');
}
return (response.payload as RelayEventStorage) ?? { events: [], nextId: 1 };
}

writeEvent(event: RelayEventRecord): void {
if (this.destroyed) {
return;
}

const alreadyQueued = this.pendingWrites.some((queued) =>
isSameRelayEvent(queued, event),
);
if (!alreadyQueued) {
this.pendingWrites.push(event);
}
this.sendPendingWrite(event);
}

private removeConfirmedWrite(event: RelayEventRecord): void {
const idx = this.pendingWrites.findIndex((queued) =>
isSameRelayEvent(queued, event),
);
if (idx !== -1) {
this.pendingWrites.splice(idx, 1);
}
}

private sendPendingWrite(event: RelayEventRecord): void {
if (!this.available || !this.iframeWindow) {
return;
Comment thread
cursor[bot] marked this conversation as resolved.
}

void this.sendRequest(this.createRelayRequest('WRITE_EVENT', { event }))
.then((response) => {
if (response.ok) {
this.removeConfirmedWrite(event);
Comment thread
cursor[bot] marked this conversation as resolved.
}
})
.catch(() => {
// Keep in pendingWrites for a later flush()
});
}

flush(): void {
if (!this.available || !this.iframeWindow) {
return;
}
for (const event of [...this.pendingWrites]) {
this.sendPendingWrite(event);
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

async checkMigrated(origin: string): Promise<boolean> {
const response = await this.sendRequest(
this.createRelayRequest('CHECK_MIGRATED', { sourceOrigin: origin }),
);
if (!response.ok) {
throw new Error(response.error ?? 'check migrated failed');
}
return Boolean((response.payload as { migrated?: boolean })?.migrated);
}

async migrateEvents(origin: string, store: RelayEventStorage): Promise<void> {
const response = await this.sendRequest(
this.createRelayRequest('MIGRATE_EVENTS', {
sourceOrigin: origin,
store,
}),
);
if (!response.ok) {
throw new Error(response.error ?? 'migrate events failed');
}
}

destroy(): void {
this.destroyed = true;
this.cancelBodyReadyPoll?.();
this.cancelBodyReadyPoll = null;
if (this.initTimeoutId !== null) {
window.clearTimeout(this.initTimeoutId);
this.initTimeoutId = null;
}
if (this.initResolve) {
this.initResolve();
this.initResolve = null;
}
if (this.messageListener) {
window.removeEventListener('message', this.messageListener);
this.messageListener = null;
}
for (const pending of this.pendingRequests.values()) {
pending.reject(new Error('relay destroyed'));
}
this.pendingRequests.clear();
this.notifyAvailable();
this.iframe?.remove();
this.iframe = null;
this.iframeWindow = null;
this.available = false;
this.ready = false;
this.pendingWrites = [];
this.initPromise = null;
}
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
}
Loading
Loading