Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/studio-pending-signal-badge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@internal/playground': patch
---

Fixed Studio chat leaving a stale "pending" signal indicator above the message input. Sending a follow-up message while the agent was idle could leave one or more shimmering "pending: …" badges that lingered after the reply finished (and piled up across messages), only disappearing on a page refresh. The badge now clears reliably regardless of whether the send confirmation or its echo arrives first.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// @vitest-environment jsdom
import type { MastraDBMessage } from '@mastra/core/agent/message-list';
import { useChat } from '@mastra/react';
import { act, render, waitFor } from '@testing-library/react';
import { act, cleanup, render, waitFor } from '@testing-library/react';
import type { ReactNode } from 'react';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

const mocks = vi.hoisted(() => ({
cancelRun: vi.fn(),
Expand Down Expand Up @@ -143,6 +143,12 @@ describe('MastraRuntimeProvider', () => {
delete (window as any).MASTRA_AGENT_SIGNALS;
});

// This package runs vitest with globals disabled, so @testing-library/react's
// auto-cleanup never registers. Unmount between tests explicitly, otherwise
// every render() leaks a mounted provider and getSignalCallbacks/threadRuntimeState
// (both last-writer-wins sinks) would read a stale instance.
afterEach(() => cleanup());

it('opts Playground into thread signals by default', () => {
render(
<MastraRuntimeProvider agentId="agent-1" threadId="thread-1" initialMessages={[]} modelVersion="v2">
Expand Down Expand Up @@ -228,6 +234,90 @@ describe('MastraRuntimeProvider', () => {
});
});

const getSignalCallbacks = () => {
const calls = vi.mocked(useChat).mock.calls;
const lastArgs = calls[calls.length - 1]?.[0] as
| { onSignalSent?: (id: string, preview: string) => void; onSignalEcho?: (id: string) => void }
| undefined;
if (!lastArgs?.onSignalSent || !lastArgs.onSignalEcho) {
throw new Error('useChat was not called with signal callbacks — render the provider before reading them');
}
return { onSignalSent: lastArgs.onSignalSent, onSignalEcho: lastArgs.onSignalEcho };
};

it('shows then clears a pending signal when the echo arrives after the send (normal order)', async () => {
render(
<MastraRuntimeProvider agentId="agent-1" threadId="thread-1" initialMessages={[]} modelVersion="v2">
<div />
</MastraRuntimeProvider>,
);

const { onSignalSent, onSignalEcho } = getSignalCallbacks();

await act(async () => {
onSignalSent('signal-1', 'ok');
});

expect(mocks.threadRuntimeState.hasPendingMessages).toBe(true);
expect(mocks.threadRuntimeState.pendingSignals).toEqual([{ id: 'signal-1', preview: 'ok' }]);

await act(async () => {
onSignalEcho('signal-1');
});

expect(mocks.threadRuntimeState.pendingSignals).toEqual([]);
expect(mocks.threadRuntimeState.hasPendingMessages).toBe(false);
});

it('clears a pending signal even when the echo arrives before the send announces it (race)', async () => {
render(
<MastraRuntimeProvider agentId="agent-1" threadId="thread-1" initialMessages={[]} modelVersion="v2">
<div />
</MastraRuntimeProvider>,
);

const { onSignalSent, onSignalEcho } = getSignalCallbacks();

// On the idle path the server starts the run (and emits the `data-user-message`
// echo over the already-open thread subscription) before the send-message HTTP
// response returns. So `onSignalEcho` can fire before `onSignalSent` has added
// the pending entry. The badge must not linger afterwards.
await act(async () => {
onSignalEcho('signal-1');
onSignalSent('signal-1', 'ok');
});

expect(mocks.threadRuntimeState.pendingSignals).toEqual([]);
expect(mocks.threadRuntimeState.hasPendingMessages).toBe(false);
});

it('tracks premature echoes per id without suppressing unrelated signals', async () => {
render(
<MastraRuntimeProvider agentId="agent-1" threadId="thread-1" initialMessages={[]} modelVersion="v2">
<div />
</MastraRuntimeProvider>,
);

const { onSignalSent, onSignalEcho } = getSignalCallbacks();

// signal-1 loses the race (echo first); signal-2 is a normal in-order send.
await act(async () => {
onSignalEcho('signal-1');
onSignalSent('signal-1', 'first');
onSignalSent('signal-2', 'second');
});

expect(mocks.threadRuntimeState.pendingSignals).toEqual([{ id: 'signal-2', preview: 'second' }]);
expect(mocks.threadRuntimeState.hasPendingMessages).toBe(true);

await act(async () => {
onSignalEcho('signal-2');
});

expect(mocks.threadRuntimeState.pendingSignals).toEqual([]);
expect(mocks.threadRuntimeState.hasPendingMessages).toBe(false);
});

it('restores OM progress when initial messages arrive after mount', async () => {
const progress = {
windows: {
Expand Down
39 changes: 33 additions & 6 deletions packages/playground/src/services/mastra-runtime-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { useMemoryConfig } from '@/domains/memory/hooks';
import { useTracingSettings } from '@/domains/observability/context/tracing-settings-context';
import { useAdapters } from '@/lib/ai-ui/hooks/use-adapters';
import { ThreadRuntimeStateProvider } from '@/lib/ai-ui/thread-runtime-state';
import type { PendingSignalMessage } from '@/lib/ai-ui/thread-runtime-state';
import type { ChatProps } from '@/types';

const getAppendMessageText = (message: AppendMessage) => {
Expand Down Expand Up @@ -114,30 +115,56 @@ export function MastraRuntimeProvider({
// `initialMessages` refreshes after a stream ends. Track them in a parallel
// state that survives those resets so the chat still surfaces the failure.
const [streamErrors, setStreamErrors] = useState<MastraDBMessage[]>([]);
const [pendingSignals, setPendingSignals] = useState<{ id: string; preview: string }[]>([]);
const [pendingSignals, setPendingSignals] = useState<PendingSignalMessage[]>([]);
// `onSignalSent` (add, via the send-message HTTP response) and `onSignalEcho`
// (remove, via the thread-subscription stream) are independent async channels
// with no ordering guarantee. On the idle path the server starts the run and
// emits the `data-user-message` echo over the already-open subscription before
// the send-message response returns, so the echo can arrive before the pending
// signal is added. Decisions are made against these synchronous id mirrors
// (not the deferred `setPendingSignals` updater) so add/remove are commutative:
// `pendingSignalIdsRef` mirrors the visible badges, and `prematurelyEchoedSignalIdsRef`
// remembers echoes that landed before their add so the late add is skipped.
const pendingSignalIdsRef = useRef<Set<string>>(new Set());
const prematurelyEchoedSignalIdsRef = useRef<Set<string>>(new Set());
const [threadSignalsUnsupported, setThreadSignalsUnsupported] = useState(false);
const threadSignalsUnsupportedRef = useRef(false);
const threadSignalsEnabled =
window.MASTRA_AGENT_SIGNALS !== 'false' &&
supportsMemory !== false &&
!settings?.modelSettings?.chatWithLegacyStream;

const clearPendingSignals = useCallback(() => {
pendingSignalIdsRef.current.clear();
prematurelyEchoedSignalIdsRef.current.clear();
setPendingSignals([]);
}, []);

const addPendingSignal = useCallback((signalId: string, preview: string) => {
// The echo already arrived before this add — consume it, don't resurrect a badge.
if (prematurelyEchoedSignalIdsRef.current.delete(signalId)) return;
pendingSignalIdsRef.current.add(signalId);
setPendingSignals(prev => [...prev.filter(signal => signal.id !== signalId), { id: signalId, preview }]);
}, []);

const removePendingSignal = useCallback((signalId: string) => {
setPendingSignals(prev => prev.filter(signal => signal.id !== signalId));
if (pendingSignalIdsRef.current.delete(signalId)) {
setPendingSignals(prev => prev.filter(signal => signal.id !== signalId));
return;
}
// The echo beat the add. Remember it so the pending add that follows is
// skipped instead of leaving a badge that never clears.
prematurelyEchoedSignalIdsRef.current.add(signalId);
}, []);

// Clear any persisted stream errors when switching threads or agents so they
// don't leak across conversations.
useEffect(() => {
setStreamErrors([]);
setPendingSignals([]);
clearPendingSignals();
threadSignalsUnsupportedRef.current = false;
setThreadSignalsUnsupported(false);
}, [agentId, threadId]);
}, [agentId, clearPendingSignals, threadId]);

const chatRequestContext = useMemo(() => {
if (!agentVersionId && !requestContext) return undefined;
Expand Down Expand Up @@ -177,7 +204,7 @@ export function MastraRuntimeProvider({
onThreadSignalsUnsupported: () => {
threadSignalsUnsupportedRef.current = true;
setThreadSignalsUnsupported(true);
setPendingSignals([]);
clearPendingSignals();
},
});

Expand Down Expand Up @@ -522,7 +549,7 @@ export function MastraRuntimeProvider({
const onCancel = async () => {
abortControllerRef.current?.abort();
abortControllerRef.current = null;
setPendingSignals([]);
clearPendingSignals();
// Reset OM streaming state in case observation was in progress
resetObservationalMemoryStreamState();
cancelRun?.();
Expand Down