Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/client",
"description": "The fal.ai client for JavaScript and TypeScript",
"version": "1.9.3",
"version": "1.9.4",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
64 changes: 47 additions & 17 deletions libs/client/src/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
),
active: state(
transition("send", "active", reduce(sendMessage)),
transition("authenticated", "active", reduce(setToken)),
transition("unauthorized", "idle", reduce(expireToken)),
transition("connectionClosed", "idle", reduce(closeConnection)),
transition("close", "idle", reduce(closeConnection)),
Expand Down Expand Up @@ -539,6 +540,7 @@

let previousState: string | undefined;
let latestEnqueuedMessage: any;
let tokenRefreshTimer: ReturnType<typeof setTimeout> | undefined;

// Although the state machine is cached so we don't open multiple connections,
// we still need to update the callbacks so we can call the correct references
Expand Down Expand Up @@ -586,28 +588,52 @@
return getTemporaryAuthToken(app, config);
};

const effectiveExpiration = tokenProvider
? tokenExpirationSeconds
: TOKEN_EXPIRATION_SECONDS;

const scheduleTokenRefresh =
effectiveExpiration !== undefined
? () => {
clearTimeout(tokenRefreshTimer);
const refreshMs = Math.round(
effectiveExpiration * 0.9 * 1000,
);
tokenRefreshTimer = setTimeout(() => {
if (machine.current !== "active") {
return;
}
fetchToken()
.then((newToken) => {
if (machine.current !== "active") {
return;
}
queueMicrotask(() => {
send({ type: "authenticated", token: newToken });
});
scheduleTokenRefresh();
})
.catch(() => {
if (machine.current !== "active") {
return;
}
const retryMs = Math.round(
effectiveExpiration * 0.05 * 1000,
);
tokenRefreshTimer = setTimeout(() => {
scheduleTokenRefresh();
}, retryMs);
});
}, refreshMs);
}
: noop;

fetchToken()
.then((token) => {
// Use queueMicrotask to ensure the state machine processes
// this on a clean call stack, not nested inside onChange.
// robot3's interpret can lose track of state changes when
// send() is called synchronously inside an onChange handler.
queueMicrotask(() => {
send({ type: "authenticated", token });
});
// Only schedule token refresh if we know the expiration time.
// For custom tokenProvider without tokenExpirationSeconds, skip auto-refresh.
const effectiveExpiration = tokenProvider
? tokenExpirationSeconds
: TOKEN_EXPIRATION_SECONDS;
if (effectiveExpiration !== undefined) {
const tokenRefreshInterval = Math.round(
effectiveExpiration * 0.9 * 1000,
);
setTimeout(() => {
send({ type: "expireToken" });
}, tokenRefreshInterval);
}
scheduleTokenRefresh();
})
.catch((error) => {
queueMicrotask(() => {
Expand Down Expand Up @@ -648,7 +674,7 @@
}
send({ type: "connectionClosed", code: event.code });
};
ws.onerror = (event) => {

Check warning on line 677 in libs/client/src/realtime.ts

View workflow job for this annotation

GitHub Actions / build

'event' is defined but never used
// TODO specify error protocol for identified errors
const { onError = noop } = getCallbacks();
onError(new ApiError({ message: "Unknown error", status: 500 }));
Expand All @@ -669,6 +695,10 @@
});
};
}
if (previousState === "active" && machine.current !== "active") {
clearTimeout(tokenRefreshTimer);
tokenRefreshTimer = undefined;
}
previousState = machine.current;
},
);
Expand Down
Loading