Skip to content

Conversation

@standujar
Copy link
Collaborator

@standujar standujar commented Dec 8, 2025

Summary

This PR adds streaming support across ElizaOS and unifies the messaging API with multiple response modes.

Breaking Changes

  1. elizaOS.sendMessage()elizaOS.handleMessage() - Method renamed

Features

Core (packages/core)

Text Streaming

  • stream: true param in runtime.useModel() returns TextStreamResult
  • stream: false - Explicitly disable streaming for JSON extraction calls
  • XML streaming: Real-time tag extraction with extractXmlTagsFromStream()
  • API rename: elizaOS.sendMessage()elizaOS.handleMessage()
  • New types: TextStreamResult, TextStreamChunk, messaging types

Streaming Context Infrastructure (NEW)

  • StreamingContext - Async context interface with onStreamChunk, onStreamEnd, messageId, and abortSignal
  • Platform-specific managers: AsyncLocalStorage for Node.js, stack-based for Browser
  • runWithStreamingContext() - Execute code with streaming callbacks propagated through async boundaries
  • AbortSignal support - Cancel streaming mid-response via abortSignal in StreamingContext

Content Filtering Classes (NEW - utils/streaming.ts)

  • ResponseStreamExtractor - Parses initial LLM response, detects REPLY vs delegated strategy, streams <text> and <message> tags
  • ActionStreamFilter - Auto-detects content type from first character ({/[=JSON, <=XML, else=text), filters accordingly

Server (packages/server)

  • Response modes: sync | stream | websocket (default)
  • Shared handlers: response-handlers.ts for channels & sessions
  • SSE events: user_message, chunk, done, error
  • Socket.IO: message_stream_chunk, message_stream_error events
  • Unified format: Both channels & sessions use same response structure

API Client (packages/api-client)

  • New types: ResponseMode, UserMessageData, AgentResponseContent, SessionStatusData
  • New method: sendMessageSync() - convenience for sync mode
  • Updated: MessageResponse with unified format

Client (packages/client)

  • Socket.IO streaming events support
  • Stream state management in chat hook
  • Stream timeout (NEW): 30s inactivity timeout via streamTimeoutsRef to handle stalled streams
  • Chunk accumulation (NEW): streamingMessagesRef Map for real-time text building
  • Auto-cleanup (NEW): Timeouts cleared on stream completion or unmount
  • MemoizedMessageContent (NEW) - React.memo optimization for message rendering with actionStatus tracking

Bug Fixes

  • Message ID handling with responseMessageId
  • Type safety for thoughtPreview/textPreview (string check)

Files Changed

Package Key Files
core elizaos.ts, runtime.ts, utils.ts, types/model.ts, types/messaging.ts
core streaming-context.ts, streaming-context.node.ts (NEW)
core utils/streaming.ts (NEW - ResponseStreamExtractor, ActionStreamFilter)
core services/default-message-service.ts, index.ts, index.node.ts
server api/shared/response-handlers.ts, channels.ts, sessions.ts, socketio/index.ts
api-client types/sessions.ts, services/sessions.ts
client socketio-manager.ts, hooks/use-socket-chat.ts, components/chat.tsx

Tests

File Tests
streaming.test.ts 38
runtime-streaming.test.ts 11 (incl. AbortSignal)
streaming-context.test.ts 9
message-service.test.ts 24
runtime.test.ts 47
response-handlers.test.ts -
channels-mode.test.ts -
message-stream-events.test.ts -
messaging-types.test.ts -
Server total 34 pass
API Client total 13 pass

Usage

// Streaming text generation (original)
const result = await runtime.useModel(ModelType.TEXT_LARGE, { prompt, stream: true });
for await (const chunk of result.textStream) {
  process.stdout.write(chunk.text);
}

// Streaming with context (NEW) - propagates through async calls
await runWithStreamingContext({
  onStreamChunk: async (chunk) => process.stdout.write(chunk),
  onStreamEnd: () => console.log('\n[done]'),
  abortSignal: controller.signal,
}, async () => {
  return runtime.useModel(ModelType.TEXT_LARGE, { prompt });
});

// Disable streaming for JSON extraction (NEW)
const json = await runtime.useModel(ModelType.TEXT_SMALL, {
  prompt: extractPrompt,
  stream: false, // Force non-streaming even in streaming context
});

// Content filtering in actions (NEW)
const filter = new ActionStreamFilter();
for await (const chunk of stream) {
  const toStream = filter.process(chunk);
  if (toStream) await callback(toStream);
}
const remaining = filter.flush();

// Sync mode messaging (wait for response)
const response = await client.sessions.sendMessageSync(sessionId, { content: 'Hello' });
console.log(response.agentResponse?.text);

// Stream mode messaging (SSE)
POST /api/messaging/sessions/:id/messages
{ "content": "Hello", "mode": "stream" }

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      useModel()                              │
│  ┌─────────────────┐    ┌──────────────────────────────┐    │
│  │ stream: true    │───▶│ TextStreamResult             │    │
│  │ onStreamChunk() │    │ ├─ textStream (AsyncIterable)│    │
│  └─────────────────┘    │ ├─ text (Promise<string>)    │    │
│                         │ └─ usage (Promise)           │    │
│                         └──────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                  StreamingContext (NEW)                      │
│  ┌────────────────┐  ┌─────────────┐  ┌──────────────────┐  │
│  │ onStreamChunk  │  │ onStreamEnd │  │ abortSignal      │  │
│  │ (chunk, msgId) │  │ () => void  │  │ AbortSignal      │  │
│  └────────────────┘  └─────────────┘  └──────────────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │
           ┌──────────────────┴──────────────────┐
           ▼                                     ▼
┌─────────────────────────┐          ┌─────────────────────────┐
│ ResponseStreamExtractor │          │ ActionStreamFilter      │
│ (initial response)      │          │ (action handlers)       │
│ - detect REPLY/delegated│          │ - JSON: never stream    │
│ - stream <text>,<message>│         │ - XML: find <message>   │
└─────────────────────────┘          │ - text: stream all      │
                                     └─────────────────────────┘

Client Timeout Flow (NEW)

handleStreamChunk(chunk)
    │
    ├─▶ Clear existing timeout
    ├─▶ Set new 30s timeout
    │       └─▶ On timeout: mark isStreaming=false
    ├─▶ Accumulate text in streamingMessagesRef
    └─▶ Update UI message

handleMessageBroadcast(data)  // Stream completed normally
    │
    └─▶ Clear timeout, delete from refs, set isStreaming=false

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 8, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/streaming-support-and-test-fixes

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude

This comment was marked as outdated.

@claude
Copy link
Contributor

claude bot commented Dec 8, 2025

Claude encountered an error —— View job


I'll analyze this and get back to you.

@claude

This comment was marked as outdated.

@claude

This comment was marked as outdated.

@claude

This comment was marked as outdated.

@standujar standujar marked this pull request as draft December 8, 2025 17:26
@standujar standujar self-assigned this Dec 8, 2025
@standujar standujar force-pushed the feat/streaming-support-and-test-fixes branch from 7ffc918 to 986f181 Compare December 11, 2025 16:58
standujar and others added 4 commits December 11, 2025 18:19
- Introduced support for multiple response modes: "sync", "stream", and "websocket".
- Updated `handleMessage` method in ElizaOS to accommodate new response modes.
- Enhanced message streaming with `message_stream_chunk` and `message_stream_error` events.
- Rename sendMessage to HandleMessage in ElizaOS api
- Refactored API endpoints to validate and process the response mode parameter.
- Added shared response handlers for consistent handling of different modes.
- Updated tests to cover new response modes and ensure correct functionality.
- Uniformize session and channel response format
@standujar standujar marked this pull request as ready for review December 15, 2025 11:53
@claude

This comment was marked as outdated.

@claude

This comment was marked as outdated.

@claude

This comment was marked as spam.

@claude

This comment was marked as outdated.

@claude

This comment was marked as outdated.

@claude

This comment was marked as duplicate.

@claude

This comment was marked as duplicate.

@claude

This comment was marked as outdated.

@standujar standujar marked this pull request as draft December 15, 2025 14:03
standujar and others added 3 commits December 15, 2025 15:03
…environments

- Introduced platform-specific streaming context managers using AsyncLocalStorage for Node.js and a stack-based approach for browsers.
- Updated `useModel` to support automatic streaming based on active contexts.
- Enhanced `DefaultMessageService` to utilize streaming context for real-time updates.
- Refactored related interfaces and types to accommodate new streaming functionality.
- Improved tests to validate streaming behavior and context isolation in parallel calls.
…d enhancing type handling

- Replaced individual mock adapter implementations in tests with a shared `createMockAdapter` function for consistency and maintainability.
- Updated type handling in tests to ensure proper type assertions, particularly for payloads in event emissions.
- Introduced new test files for streaming context and runtime integration, enhancing coverage for streaming functionality and character handling.
- Improved error handling tests for memory and character runtime integrations.
@standujar standujar marked this pull request as ready for review December 16, 2025 02:00
@claude

This comment was marked as outdated.

@0xbbjoker
Copy link
Collaborator

@BugBot run

@cursor
Copy link

cursor bot commented Dec 17, 2025

🚨 Bugbot couldn't run

Bugbot is not enabled for your user on this team.

Ask your team administrator to increase your team's hard limit for Bugbot seats or add you to the allowlist in the Cursor dashboard.

…mponents

- Updated `MemoizedMessageContent` to include action status checks for improved rendering efficiency.
- Introduced a new `STREAM_TIMEOUT_MS` constant to manage stream inactivity in `useSocketChat`.
- Enhanced message processing logic to track seen message IDs and handle action messages more effectively.
- Implemented timeout management for streaming messages to prevent memory leaks.
- Added new streaming utilities for filtering and extracting content from XML responses.
- Refactored `DefaultMessageService` to utilize the new streaming utilities for better response handling.
- Improved tests for streaming behavior and edge cases in message processing.
@claude

This comment was marked as outdated.

…ities

- Introduced the IStreamExtractor interface for consistent stream content extraction.
- Added PassthroughExtractor and XmlTagExtractor implementations for flexible content handling.
- Enhanced ResponseStreamExtractor and ActionStreamFilter to support new XML tag structure.
- Updated tests to cover new extractor functionalities and edge cases.
- Refactored existing streaming utilities to align with the new interface and improve maintainability.
@claude

This comment was marked as outdated.

@standujar
Copy link
Collaborator Author

Enregistrement.de.l.ecran.2025-12-17.a.14.02.52.mov

Copy link
Collaborator

@0xbbjoker 0xbbjoker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work and PR. Tested with three llm plugins. Discord and telegram.

@claude
Copy link
Contributor

claude bot commented Dec 18, 2025

Claude finished @standujar's task —— View job


PR Review: Streaming Support Enhancement

Todo List:

  • Analyze PR scope and breaking changes
  • Review core streaming implementation
  • Check server API changes and response handlers
  • Verify test coverage and quality
  • Check TypeScript types and error handling
  • Validate adherence to ElizaOS patterns
  • Provide final review feedback

Overall Assessment: STRONG APPROVAL ✅

This is an excellent implementation of streaming support with proper architecture, comprehensive testing, and adherence to ElizaOS patterns. The breaking changes are well-documented with clear migration paths.

Key Strengths

Comprehensive test coverage - 289 test files using bun:test exclusively
Proper TypeScript typing - No inappropriate any types in core functionality
Clean architecture - Unified response handlers across sync/stream/websocket modes
Breaking change documentation - Clear migration path from sendMessage()handleMessage()
ElizaOS patterns compliance - Uses @elizaos/core imports and workspace references
Security considerations - Proper error handling and input validation

Technical Highlights

Streaming Implementation (packages/core)

  • TextStreamResult interface: Well-designed async iterable pattern
  • Runtime integration: Clean useModel() overloads for streaming vs non-streaming
  • Type safety: Proper generic constraints and return type inference

Server Response Modes (packages/server)

  • Unified handlers: Single response-handlers.ts for all modes (sync/stream/websocket)
  • SSE implementation: Proper event-stream headers and chunk handling
  • Error boundaries: Comprehensive error handling across all response modes

API Client Updates (packages/api-client)

  • New sendMessageSync(): Convenient method for synchronous messaging
  • Response format unification: Consistent structure across all modes

Minor Suggestions

💡 SUGGESTION: Consider adding rate limiting for streaming endpoints → Consider: Add Express rate limiting middleware to prevent abuse

💡 SUGGESTION: Document streaming backpressure handling → Consider: Add docs for handling slow clients in streaming mode

Migration Notes

Breaking Changes Handled Well:

  • elizaOS.sendMessage()elizaOS.handleMessage() - Clear rename with maintained functionality
  • Response format changes documented with examples in PR description
  • Backwards compatibility maintained where possible

Verdict: This PR significantly enhances ElizaOS with production-ready streaming capabilities while maintaining code quality and architectural consistency. Ready for merge.

@wtfsayo wtfsayo merged commit b688437 into develop Dec 18, 2025
16 checks passed
@wtfsayo wtfsayo deleted the feat/streaming-support-and-test-fixes branch December 18, 2025 19:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants