Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9c0f1cd
Add streaming support for Bedrock provider
cronus42 Dec 21, 2025
cfa8bc0
fix: use or_default() instead of or_insert_with(String::new)
cronus42 Dec 22, 2025
844e0e6
bedrock: add streaming provider and tests
cronus42 Jan 26, 2026
5fdf6ac
fix inconsistent byte/char issues
cronus42 Dec 23, 2025
1712df1
accumulate streamed text from same role if IDs mismatch
cronus42 Dec 29, 2025
821cda3
fix message consolidation logic in streaming, add handling for qwen s…
cronus42 Dec 31, 2025
c8c3129
Add cost tracking with provider pricing API
cronus42 Dec 31, 2025
a990adf
add bedrock cost estimation, fix linting issue
cronus42 Dec 31, 2025
21e4247
fix: add null check for tool_graph depends_on field
cronus42 Dec 31, 2025
b5f92b1
Fix compilation errors in bedrock.rs and openai.rs
cronus42 Jan 26, 2026
63d44f8
Format Rust code
cronus42 Jan 26, 2026
5e72c3f
fix linting warnings
cronus42 Jan 26, 2026
b020cb2
Support streaming in lead/worker provider
cronus42 Jan 26, 2026
1d91c69
Adjust Bedrock tool streaming test
cronus42 Jan 26, 2026
daf7b95
Format lead/worker provider imports
cronus42 Jan 26, 2026
6a08f0c
serialize access to session db to pool level to fix write-lock conten…
cronus42 Jan 26, 2026
db3d537
Add SessionStorage pool tests
cronus42 Jan 26, 2026
f4606c9
skip bedrock tests when no AWS env
cronus42 Jan 26, 2026
618e283
Revert "serialize access to session db to pool level to fix write-loc…
cronus42 Jan 27, 2026
53fdf87
Revert "Add SessionStorage pool tests"
cronus42 Jan 27, 2026
33da6ef
Revert "Add cost tracking with provider pricing API"
cronus42 Jan 27, 2026
ef655db
Revert "add bedrock cost estimation, fix linting issue"
cronus42 Jan 27, 2026
eb58959
fix test failure: gate lead/worker behavior off the live environment …
cronus42 Jan 27, 2026
826d961
remove CostTracker UI test
cronus42 Jan 27, 2026
8a382f5
session: consolidate streaming assistant messages and add tests
cronus42 Jan 27, 2026
5f32a0b
Merge origin/main into cronus42/main
Feb 12, 2026
73589e3
Fix AWS credentials check and migration ordering
Feb 12, 2026
cf9574d
use a transaction with BEGIN_IMMEDIATE to work with concurrency
michaelneale Feb 12, 2026
c36cbed
fmt
michaelneale Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions crates/goose/src/agents/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::tool_inspection::ToolInspectionManager;
use crate::tool_monitor::RepetitionInspector;
use crate::utils::is_token_cancelled;
use regex::Regex;
use rmcp::model::Role;
use rmcp::model::{
CallToolRequestParams, CallToolResult, Content, ErrorCode, ErrorData, GetPromptResult, Prompt,
ServerNotification, Tool,
Expand Down Expand Up @@ -1153,11 +1154,48 @@ impl Agent {

let num_tool_requests = frontend_requests.len() + remaining_requests.len();
if num_tool_requests == 0 {
let text = filtered_response.as_concat_text();
if !text.is_empty() {
last_assistant_text = text;
// Check if we should merge with the last message in messages_to_add
// instead of adding as a new message (for streaming responses)
let should_merge = if let Some(last_msg) = messages_to_add.last() {
// Check if both are assistant messages
last_msg.role == Role::Assistant && response.role == Role::Assistant &&
// Check if both are simple text-only messages
last_msg.content.len() == 1 &&
matches!(last_msg.content.first(), Some(MessageContent::Text(_))) &&
response.content.len() == 1 &&
matches!(response.content.first(), Some(MessageContent::Text(_))) &&
// Make sure neither has tool requests/responses
!last_msg.content.iter().any(|c| matches!(c, MessageContent::ToolRequest(_) | MessageContent::ToolResponse(_))) &&
!response.content.iter().any(|c| matches!(c, MessageContent::ToolRequest(_) | MessageContent::ToolResponse(_)))
} else {
false
};

if should_merge {
// Merge the text content with the last message in messages_to_add
if let Some(last_msg) = messages_to_add.messages().last().cloned() {
messages_to_add.pop();
if let (Some(MessageContent::Text(mut last_text)), Some(MessageContent::Text(new_text))) =
(last_msg.content.into_iter().next(), response.content.first())
{
last_text.text.push_str(&new_text.text);
let merged_msg = Message {
id: last_msg.id,
role: last_msg.role,
created: last_msg.created,
content: vec![MessageContent::Text(last_text)],
metadata: last_msg.metadata,
};
messages_to_add.push(merged_msg);
}
}
} else {
let text = filtered_response.as_concat_text();
if !text.is_empty() {
last_assistant_text = text;
}
messages_to_add.push(response.clone());
}
messages_to_add.push(response.clone());
continue;
}

Expand Down
38 changes: 38 additions & 0 deletions crates/goose/src/bin/consolidate-messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! One-time migration script to consolidate fragmented assistant messages
//!
//! This script fixes chat histories that were broken up during streaming before
//! the consolidation fix was implemented.
//!
//! Usage:
//! cargo run --bin consolidate-messages

use anyhow::Result;
use goose::session::session_manager::SessionManager;

#[tokio::main]
async fn main() -> Result<()> {
println!("🔧 Consolidating Fragmented Messages");
println!("=====================================");
println!();
println!("This will merge consecutive assistant text messages that were");
println!("fragmented during streaming. This operation is safe and can be");
println!("run multiple times.");
println!();

print!("Scanning database... ");
let count = SessionManager::consolidate_fragmented_messages().await?;
println!("done!");
println!();

if count == 0 {
println!("✅ No fragmented messages found - your database is already clean!");
} else {
println!("✅ Successfully consolidated {} message fragments", count);
println!(" Your chat history should now display correctly!");
}

println!();
println!("🎉 Migration complete!");
println!();
Ok(())
}
173 changes: 158 additions & 15 deletions crates/goose/src/providers/formats/bedrock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,158 @@ use serde_json::Value;
use super::super::base::Usage;
use crate::conversation::message::{Message, MessageContent};

/// Accumulates streaming chunks into a complete message
#[derive(Debug, Default)]
pub struct BedrockStreamAccumulator {
text_blocks: HashMap<i32, String>,
text_block_emitted_char_counts: HashMap<i32, usize>,
tool_blocks: HashMap<i32, (String, String, String)>,
role: Option<Role>,
usage: Option<bedrock::TokenUsage>,
}

impl BedrockStreamAccumulator {
pub fn new() -> Self {
Self::default()
}

pub fn handle_message_start(&mut self, role: &bedrock::ConversationRole) -> Result<()> {
self.role = Some(from_bedrock_role(role)?);
Ok(())
}

pub fn handle_content_block_start(
&mut self,
index: i32,
start: &bedrock::ContentBlockStart,
) -> Result<()> {
match start {
bedrock::ContentBlockStart::ToolUse(tool_use) => {
let tool_use_id = tool_use.tool_use_id().to_string();
let name = tool_use.name().to_string();
self.tool_blocks
.insert(index, (tool_use_id, name, String::new()));
}
_ => {
self.text_blocks.insert(index, String::new());
self.text_block_emitted_char_counts.insert(index, 0);
}
}
Ok(())
}

pub fn handle_content_block_delta(
&mut self,
index: i32,
delta: &bedrock::ContentBlockDelta,
) -> Result<Option<Message>> {
match delta {
bedrock::ContentBlockDelta::Text(text) => {
// Ensure the block exists (in case we get delta before start)
self.text_blocks.entry(index).or_default().push_str(text);
self.build_incremental_delta_message(index)
}
bedrock::ContentBlockDelta::ToolUse(tool_delta) => {
if let Some((_, _, json)) = self.tool_blocks.get_mut(&index) {
json.push_str(&tool_delta.input);
}
Ok(None)
}
_ => Ok(None),
}
}

pub fn handle_message_stop(
&mut self,
_stop_reason: bedrock::StopReason,
) -> Result<Option<Message>> {
self.build_final_message()
}

pub fn handle_metadata(&mut self, usage: Option<bedrock::TokenUsage>) {
if let Some(u) = usage {
self.usage = Some(u);
}
}

/// Build a message with only the new text delta for streaming
fn build_incremental_delta_message(&mut self, index: i32) -> Result<Option<Message>> {
if let Some(text) = self.text_blocks.get(&index) {
let emitted_char_count = *self
.text_block_emitted_char_counts
.get(&index)
.unwrap_or(&0);
let current_char_count = text.chars().count();

if current_char_count > emitted_char_count {
let delta = text.chars().skip(emitted_char_count).collect::<String>();
self.text_block_emitted_char_counts
.insert(index, current_char_count);

let role = self.role.clone().unwrap_or(Role::Assistant);
let created = Utc::now().timestamp();
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp is generated using Utc::now().timestamp() which returns seconds since epoch, but other parts of the codebase use milliseconds. This inconsistency could cause issues with message ordering and display. Consider using Utc::now().timestamp_millis() instead to match the timestamp format used elsewhere in the codebase (e.g., in the session_manager tests where chrono::Utc::now().timestamp_millis() is used).

Copilot uses AI. Check for mistakes.
let content = vec![MessageContent::text(delta)];

return Ok(Some(Message::new(role, created, content)));
Comment on lines +95 to +112
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BedrockStreamAccumulator::build_incremental_delta_message repeatedly computes text.chars().count() and then iterates again to skip() into the string, which is O(n) per chunk and can become quadratic for long outputs; track the emitted byte length (or store the last emitted text.len()) and slice the appended delta instead.

Copilot uses AI. Check for mistakes.
}
}
Ok(None)
}

fn build_final_message(&self) -> Result<Option<Message>> {
let role = self.role.clone().unwrap_or(Role::Assistant);
let created = Utc::now().timestamp();
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp is generated using Utc::now().timestamp() which returns seconds since epoch, but other parts of the codebase use milliseconds. This inconsistency could cause issues with message ordering and display. Consider using Utc::now().timestamp_millis() instead to match the timestamp format used elsewhere in the codebase (e.g., in the session_manager tests where chrono::Utc::now().timestamp_millis() is used).

Copilot uses AI. Check for mistakes.
let mut content = Vec::new();

// Only include text blocks that have remaining content not yet emitted during streaming
let mut indices: Vec<_> = self.text_blocks.keys().cloned().collect();
indices.sort();
for idx in indices {
if let Some(text) = self.text_blocks.get(&idx) {
let emitted_char_count =
*self.text_block_emitted_char_counts.get(&idx).unwrap_or(&0);
let current_char_count = text.chars().count();
if current_char_count > emitted_char_count {
let remaining = text.chars().skip(emitted_char_count).collect::<String>();
if !remaining.is_empty() {
content.push(MessageContent::text(remaining));
}
}
}
}

// Tool blocks are always included as they are only complete at the end of streaming
let mut tool_indices: Vec<_> = self.tool_blocks.keys().cloned().collect();
tool_indices.sort();
for idx in tool_indices {
if let Some((tool_use_id, name, json)) = self.tool_blocks.get(&idx) {
if let Ok(args) = serde_json::from_str::<serde_json::Value>(json) {
let tool_call = CallToolRequestParams {
meta: None,
task: None,
name: name.clone().into(),
arguments: args.as_object().cloned(),
};
content.push(MessageContent::tool_request(
tool_use_id.clone(),
Ok(tool_call),
));
}
}
}

if content.is_empty() {
Ok(None)
} else {
Ok(Some(Message::new(role, created, content)))
}
}

pub fn get_usage(&self) -> Option<Usage> {
self.usage.as_ref().map(from_bedrock_usage)
}
}

pub fn to_bedrock_message(message: &Message) -> Result<bedrock::Message> {
bedrock::Message::builder()
.role(to_bedrock_role(&message.role))
Expand All @@ -43,14 +195,8 @@ pub fn to_bedrock_message_content(content: &MessageContent) -> Result<bedrock::C
MessageContent::Image(image) => {
bedrock::ContentBlock::Image(to_bedrock_image(&image.data, &image.mime_type)?)
}
MessageContent::Thinking(_) => {
// Thinking blocks are not supported in Bedrock - skip
bedrock::ContentBlock::Text("".to_string())
}
MessageContent::RedactedThinking(_) => {
// Redacted thinking blocks are not supported in Bedrock - skip
bedrock::ContentBlock::Text("".to_string())
}
MessageContent::Thinking(_) => bedrock::ContentBlock::Text("".to_string()),
MessageContent::RedactedThinking(_) => bedrock::ContentBlock::Text("".to_string()),
MessageContent::SystemNotification(_) => {
bail!("SystemNotification should not get passed to the provider")
}
Expand Down Expand Up @@ -93,13 +239,10 @@ pub fn to_bedrock_message_content(content: &MessageContent) -> Result<bedrock::C
.map(|c| to_bedrock_tool_result_content_block(&tool_res.id, c.clone()))
.collect::<Result<_>>()?,
),
Err(error) => {
// For errors, create a text content block with the error message
Some(vec![bedrock::ToolResultContentBlock::Text(format!(
"The tool call returned the following error:\n{}",
error
))])
}
Err(error) => Some(vec![bedrock::ToolResultContentBlock::Text(format!(
"The tool call returned the following error:\n{}",
error
))]),
};
bedrock::ContentBlock::ToolResult(
bedrock::ToolResultBlock::builder()
Expand Down
37 changes: 36 additions & 1 deletion crates/goose/src/providers/lead_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;

use super::base::{
LeadWorkerProviderTrait, Provider, ProviderDef, ProviderMetadata, ProviderUsage,
LeadWorkerProviderTrait, MessageStream, Provider, ProviderDef, ProviderMetadata, ProviderUsage,
};
use super::errors::ProviderError;
use crate::conversation::message::{Message, MessageContent};
Expand Down Expand Up @@ -484,6 +484,41 @@ impl Provider for LeadWorkerProvider {
}
}

async fn stream(
&self,
session_id: &str,
system: &str,
messages: &[Message],
tools: &[Tool],
) -> Result<MessageStream, ProviderError> {
// Prefer the current active provider if it supports streaming, otherwise
// fall back to the other provider that does.
let count = *self.turn_count.lock().await;
let in_fallback = *self.in_fallback_mode.lock().await;

let (primary, secondary) = if count < self.lead_turns || in_fallback {
(&self.lead_provider, &self.worker_provider)
} else {
(&self.worker_provider, &self.lead_provider)
};

if primary.supports_streaming() {
return primary.stream(session_id, system, messages, tools).await;
}

if secondary.supports_streaming() {
return secondary.stream(session_id, system, messages, tools).await;
}

Err(ProviderError::NotImplemented(
"streaming not implemented for lead/worker configuration".to_string(),
))
}
Comment on lines +487 to +516
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeadWorkerProvider::stream() delegates directly to the underlying provider but never updates turn_count/fallback state (unlike complete_with_model), so in streaming mode the lead/worker switching and fallback logic will be wrong; wrap the returned stream so you update model selection state and increment/decrement the counters when the stream completes successfully (and handle errors consistently).

Copilot uses AI. Check for mistakes.
Comment on lines +487 to +516
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new stream() behavior in LeadWorkerProvider is not covered by tests even though this file has an existing test module; add tests that verify streaming increments turn count and that switching/fallback selection behaves the same as non-streaming completions.

Copilot uses AI. Check for mistakes.

fn supports_streaming(&self) -> bool {
self.lead_provider.supports_streaming() || self.worker_provider.supports_streaming()
}

/// Check if this provider is a LeadWorkerProvider
fn as_lead_worker(&self) -> Option<&dyn LeadWorkerProviderTrait> {
Some(self)
Expand Down
Loading
Loading