Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions src/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ function toMs(ts: number): number {
// poll chat.history to surface intermediate tool-call turns.
let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;

// Timer for delayed error finalization. When the Gateway reports a mid-stream
// error (e.g. "terminated"), it may retry internally and recover. We wait
// before committing the error to give the recovery path a chance.
let _errorRecoveryTimer: ReturnType<typeof setTimeout> | null = null;

function clearErrorRecoveryTimer(): void {
if (_errorRecoveryTimer) {
clearTimeout(_errorRecoveryTimer);
_errorRecoveryTimer = null;
}
}

function clearHistoryPoll(): void {
if (_historyPollTimer) {
clearTimeout(_historyPollTimer);
Expand Down Expand Up @@ -1137,6 +1149,7 @@ export const useChatStore = create<ChatState>((set, get) => ({
// entire agentic conversation finishes — the poll must run in parallel.
_lastChatEventAt = Date.now();
clearHistoryPoll();
clearErrorRecoveryTimer();

const POLL_START_DELAY = 3_000;
const POLL_INTERVAL = 4_000;
Expand Down Expand Up @@ -1245,6 +1258,7 @@ export const useChatStore = create<ChatState>((set, get) => ({

abortRun: async () => {
clearHistoryPoll();
clearErrorRecoveryTimer();
const { currentSessionKey } = get();
set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
set({ streamingTools: [] });
Expand Down Expand Up @@ -1296,7 +1310,13 @@ export const useChatStore = create<ChatState>((set, get) => ({

switch (resolvedState) {
case 'delta': {
// Streaming update - store the cumulative message
// If we're receiving new deltas, the Gateway has recovered from any
// prior error — cancel the error finalization timer and clear the
// stale error banner so the user sees the live stream again.
if (_errorRecoveryTimer) {
clearErrorRecoveryTimer();
set({ error: null });
}
const updates = collectToolUpdates(event.message, resolvedState);
set((s) => ({
streamingMessage: (() => {
Expand All @@ -1311,6 +1331,8 @@ export const useChatStore = create<ChatState>((set, get) => ({
break;
}
case 'final': {
clearErrorRecoveryTimer();
if (get().error) set({ error: null });
// Message complete - add to history and clear streaming
const finalMsg = event.message as RawMessage | undefined;
if (finalMsg) {
Expand Down Expand Up @@ -1449,23 +1471,65 @@ export const useChatStore = create<ChatState>((set, get) => ({
break;
}
case 'error': {
clearHistoryPoll();
const errorMsg = String(event.errorMessage || 'An error occurred');
const wasSending = get().sending;

// Snapshot the current streaming message into messages[] so partial
// content ("Let me get that written down...") is preserved in the UI
// rather than being silently discarded.
const currentStream = get().streamingMessage as RawMessage | null;
if (currentStream && (currentStream.role === 'assistant' || currentStream.role === undefined)) {
const snapId = (currentStream as RawMessage).id
|| `error-snap-${Date.now()}`;
const alreadyExists = get().messages.some(m => m.id === snapId);
if (!alreadyExists) {
set((s) => ({
messages: [...s.messages, { ...currentStream, role: 'assistant' as const, id: snapId }],
}));
}
}

set({
error: errorMsg,
sending: false,
activeRunId: null,
streamingText: '',
streamingMessage: null,
streamingTools: [],
pendingFinal: false,
lastUserMessageAt: null,
pendingToolImages: [],
});

// Don't immediately give up: the Gateway often retries internally
// after transient API failures (e.g. "terminated"). Keep `sending`
// true for a grace period so that recovery events are processed and
// the agent-phase-completion handler can still trigger loadHistory.
if (wasSending) {
clearErrorRecoveryTimer();
const ERROR_RECOVERY_GRACE_MS = 15_000;
_errorRecoveryTimer = setTimeout(() => {
_errorRecoveryTimer = null;
const state = get();
if (state.sending && !state.streamingMessage) {
clearHistoryPoll();
// Grace period expired with no recovery — finalize the error
set({
sending: false,
activeRunId: null,
lastUserMessageAt: null,
});
// One final history reload in case the Gateway completed in the
// background and we just missed the event.
state.loadHistory(true);
}
}, ERROR_RECOVERY_GRACE_MS);
} else {
clearHistoryPoll();
set({ sending: false, activeRunId: null, lastUserMessageAt: null });
}
break;
}
case 'aborted': {
clearHistoryPoll();
clearErrorRecoveryTimer();
set({
sending: false,
activeRunId: null,
Expand Down
12 changes: 11 additions & 1 deletion src/stores/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,18 @@ export const useGatewayStore = create<GatewayState>((set, get) => ({
import('./chat')
.then(({ useChatStore }) => {
const state = useChatStore.getState();
// Always reload history on agent completion, regardless of
// the `sending` flag. After a transient error the flag may
// already be false, but the Gateway may have retried and
// completed successfully in the background.
state.loadHistory(true);
if (state.sending) {
state.loadHistory(true);
useChatStore.setState({
sending: false,
activeRunId: null,
pendingFinal: false,
lastUserMessageAt: null,
});
}
})
.catch(() => {});
Expand Down