Conversation
- Implemented stream() method in BedrockProvider - Set supports_streaming() to return true - Updated Cargo.toml to use futures and async-stream with proper features - Refactored provider tests Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
…tyle xml tool calls Signed-off-by: James Loope <cronus@stolenshoe.com>
- Add as_any() method to Provider trait for downcasting - Implement model_info() for Bedrock provider with pricing data - Add /providers/:provider/cost_info endpoint to server - Fix UI cost calculations to account for per-million-token pricing - Add test for Bedrock provider cost info - Clean up Qwen regex formatting The backend now exposes token pricing data per million tokens, and the UI correctly divides by 1_000_000 when calculating costs. Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
…tion errors Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
…k contention errors" This reverts commit 6a08f0c. Signed-off-by: James Loope <cronus@stolenshoe.com>
This reverts commit db3d537. Signed-off-by: James Loope <cronus@stolenshoe.com>
This reverts commit c8c3129. Signed-off-by: James Loope <cronus@stolenshoe.com>
This reverts commit a990adf. Signed-off-by: James Loope <cronus@stolenshoe.com>
…variable Signed-off-by: James Loope <cronus@stolenshoe.com>
Signed-off-by: James Loope <cronus@stolenshoe.com>
Co-Authored-By: Warp <agent@warp.dev> Signed-off-by: James Loope <cronus@stolenshoe.com>
Resolved merge conflicts: - crates/goose/Cargo.toml: Use workspace dependencies - crates/goose/src/agents/agent.rs: Keep message merge logic and last_assistant_text tracking - crates/goose/src/providers/init.rs: Add extensions parameter to create functions - crates/goose/src/providers/lead_worker.rs: Import both MessageStream and ProviderDef - crates/goose/src/session/session_manager.rs: Keep message consolidation logic with message_id support - ui/desktop/src/components/settings/extensions/utils.ts: Use parseShellQuote and add combineCmdAndArgs Additional fixes: - Use ORDER BY id instead of created_timestamp in add_message queries to ensure consistent ordering for rapid message insertions (fixes flaky tests) - Add consolidate_fragmented_messages method to SessionManager - Update bedrock streaming tests to pass extensions parameter
There was a problem hiding this comment.
Pull request overview
This PR replaces #6248 and focuses on improving streaming behavior (notably for Bedrock) while preventing/repairing chat history fragmentation caused by streamed assistant text arriving as multiple chunks.
Changes:
- Add message consolidation logic to merge adjacent streamed assistant text (runtime + one-time migration).
- Add/adjust streaming support paths (Lead/Worker provider stream selection and Bedrock streaming accumulator).
- Update desktop UI streaming merge behavior and a few supporting type/context definitions.
Reviewed changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ui/desktop/src/types/shell-quote.d.ts | Adds local TS typings for shell-quote used by the desktop app. |
| ui/desktop/src/hooks/useChatStream.ts | Merges streamed assistant text chunks in the UI when IDs are missing/unstable. |
| ui/desktop/src/components/ToolCallWithResponse.tsx | Makes depends_on optional and guards its usage. |
| ui/desktop/src/components/ModelAndProviderContext.tsx | Exposes provider/model setters via context. |
| crates/goose/tests/bedrock_streaming_test.rs | Adds Bedrock streaming integration tests (env-gated). |
| crates/goose/src/session/session_manager.rs | Consolidates streamed assistant text during add_message and adds a consolidation migration. |
| crates/goose/src/providers/lead_worker.rs | Adds a stream() implementation that selects a streaming-capable provider. |
| crates/goose/src/providers/formats/bedrock.rs | Introduces a Bedrock streaming accumulator for incremental message deltas + final message/tool assembly. |
| crates/goose/src/bin/consolidate-messages.rs | Adds a one-time CLI to run the consolidation migration against the sessions DB. |
| crates/goose/src/agents/agent.rs | Merges adjacent assistant text messages in-memory while processing streaming responses. |
| std::env::var("AWS_ACCESS_KEY_ID").is_ok() | ||
| || std::env::var("AWS_PROFILE").is_ok() | ||
| || std::env::var("AWS_SESSION_TOKEN").is_ok() |
There was a problem hiding this comment.
has_bedrock_credentials() treats AWS_SESSION_TOKEN alone as sufficient, which can cause these tests to run (and then fail) in environments where only a session token is set; consider checking for AWS_PROFILE OR (AWS_ACCESS_KEY_ID && AWS_SECRET_ACCESS_KEY) (and optionally AWS_SESSION_TOKEN) instead.
| std::env::var("AWS_ACCESS_KEY_ID").is_ok() | |
| || std::env::var("AWS_PROFILE").is_ok() | |
| || std::env::var("AWS_SESSION_TOKEN").is_ok() | |
| let has_profile = std::env::var("AWS_PROFILE").is_ok(); | |
| let has_access_keys = std::env::var("AWS_ACCESS_KEY_ID").is_ok() | |
| && std::env::var("AWS_SECRET_ACCESS_KEY").is_ok(); | |
| has_profile || has_access_keys |
| async fn stream( | ||
| &self, | ||
| session_id: &str, | ||
| system: &str, | ||
| messages: &[Message], | ||
| tools: &[Tool], | ||
| ) -> Result<MessageStream, ProviderError> { | ||
| // Prefer the current active provider if it supports streaming, otherwise | ||
| // fall back to the other provider that does. | ||
| let count = *self.turn_count.lock().await; | ||
| let in_fallback = *self.in_fallback_mode.lock().await; | ||
|
|
||
| let (primary, secondary) = if count < self.lead_turns || in_fallback { | ||
| (&self.lead_provider, &self.worker_provider) | ||
| } else { | ||
| (&self.worker_provider, &self.lead_provider) | ||
| }; | ||
|
|
||
| if primary.supports_streaming() { | ||
| return primary.stream(session_id, system, messages, tools).await; | ||
| } | ||
|
|
||
| if secondary.supports_streaming() { | ||
| return secondary.stream(session_id, system, messages, tools).await; | ||
| } | ||
|
|
||
| Err(ProviderError::NotImplemented( | ||
| "streaming not implemented for lead/worker configuration".to_string(), | ||
| )) | ||
| } |
There was a problem hiding this comment.
LeadWorkerProvider::stream() delegates directly to the underlying provider but never updates turn_count/fallback state (unlike complete_with_model), so in streaming mode the lead/worker switching and fallback logic will be wrong; wrap the returned stream so you update model selection state and increment/decrement the counters when the stream completes successfully (and handle errors consistently).
| async fn stream( | ||
| &self, | ||
| session_id: &str, | ||
| system: &str, | ||
| messages: &[Message], | ||
| tools: &[Tool], | ||
| ) -> Result<MessageStream, ProviderError> { | ||
| // Prefer the current active provider if it supports streaming, otherwise | ||
| // fall back to the other provider that does. | ||
| let count = *self.turn_count.lock().await; | ||
| let in_fallback = *self.in_fallback_mode.lock().await; | ||
|
|
||
| let (primary, secondary) = if count < self.lead_turns || in_fallback { | ||
| (&self.lead_provider, &self.worker_provider) | ||
| } else { | ||
| (&self.worker_provider, &self.lead_provider) | ||
| }; | ||
|
|
||
| if primary.supports_streaming() { | ||
| return primary.stream(session_id, system, messages, tools).await; | ||
| } | ||
|
|
||
| if secondary.supports_streaming() { | ||
| return secondary.stream(session_id, system, messages, tools).await; | ||
| } | ||
|
|
||
| Err(ProviderError::NotImplemented( | ||
| "streaming not implemented for lead/worker configuration".to_string(), | ||
| )) | ||
| } |
There was a problem hiding this comment.
The new stream() behavior in LeadWorkerProvider is not covered by tests even though this file has an existing test module; add tests that verify streaming increments turn count and that switching/fallback selection behaves the same as non-streaming completions.
| let messages = sqlx::query_as::<_, (i64, String, String, i64)>( | ||
| "SELECT id, role, content_json, created_timestamp | ||
| FROM messages WHERE session_id = ? | ||
| ORDER BY created_timestamp ASC", |
There was a problem hiding this comment.
migrate_consolidate_messages() orders messages by created_timestamp, but this code itself notes that created_timestamp can have duplicate values and SQLite does not guarantee stable ordering on ties, which can merge the wrong adjacent messages; order by id (or created_timestamp, id) to make consolidation deterministic and consistent with add_message’s merge logic.
| ORDER BY created_timestamp ASC", | |
| ORDER BY created_timestamp ASC, id ASC", |
| fn build_incremental_delta_message(&mut self, index: i32) -> Result<Option<Message>> { | ||
| if let Some(text) = self.text_blocks.get(&index) { | ||
| let emitted_char_count = *self | ||
| .text_block_emitted_char_counts | ||
| .get(&index) | ||
| .unwrap_or(&0); | ||
| let current_char_count = text.chars().count(); | ||
|
|
||
| if current_char_count > emitted_char_count { | ||
| let delta = text.chars().skip(emitted_char_count).collect::<String>(); | ||
| self.text_block_emitted_char_counts | ||
| .insert(index, current_char_count); | ||
|
|
||
| let role = self.role.clone().unwrap_or(Role::Assistant); | ||
| let created = Utc::now().timestamp(); | ||
| let content = vec![MessageContent::text(delta)]; | ||
|
|
||
| return Ok(Some(Message::new(role, created, content))); |
There was a problem hiding this comment.
BedrockStreamAccumulator::build_incremental_delta_message repeatedly computes text.chars().count() and then iterates again to skip() into the string, which is O(n) per chunk and can become quadratic for long outputs; track the emitted byte length (or store the last emitted text.len()) and slice the appended delta instead.
- has_bedrock_credentials(): AWS_SESSION_TOKEN alone is not sufficient; require either AWS_PROFILE OR (AWS_ACCESS_KEY_ID AND AWS_SECRET_ACCESS_KEY) - migrate_consolidate_messages(): Use ORDER BY id ASC instead of created_timestamp ASC for consistent ordering (id is auto-increment and guarantees insertion order, while created_timestamp can have duplicate values for rapid insertions)
|
@DOsinga pushed a fix due to failiung concurrency tests that use a tx. The alternative was to set |
| .insert(index, current_char_count); | ||
|
|
||
| let role = self.role.clone().unwrap_or(Role::Assistant); | ||
| let created = Utc::now().timestamp(); |
There was a problem hiding this comment.
The timestamp is generated using Utc::now().timestamp() which returns seconds since epoch, but other parts of the codebase use milliseconds. This inconsistency could cause issues with message ordering and display. Consider using Utc::now().timestamp_millis() instead to match the timestamp format used elsewhere in the codebase (e.g., in the session_manager tests where chrono::Utc::now().timestamp_millis() is used).
|
|
||
| fn build_final_message(&self) -> Result<Option<Message>> { | ||
| let role = self.role.clone().unwrap_or(Role::Assistant); | ||
| let created = Utc::now().timestamp(); |
There was a problem hiding this comment.
The timestamp is generated using Utc::now().timestamp() which returns seconds since epoch, but other parts of the codebase use milliseconds. This inconsistency could cause issues with message ordering and display. Consider using Utc::now().timestamp_millis() instead to match the timestamp format used elsewhere in the codebase (e.g., in the session_manager tests where chrono::Utc::now().timestamp_millis() is used).
| #[tokio::test] | ||
| async fn test_bedrock_streaming_basic() -> Result<()> { | ||
| dotenv().ok(); | ||
|
|
||
| if !has_bedrock_credentials() { | ||
| eprintln!("Skipping Bedrock streaming test: no AWS credentials in environment"); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Check if streaming is supported | ||
| // Note: Provider is registered as "aws_bedrock", not "bedrock" | ||
| let provider = create_with_named_model("aws_bedrock", BEDROCK_DEFAULT_MODEL, vec![]).await?; | ||
|
|
||
| assert!( | ||
| provider.supports_streaming(), | ||
| "Bedrock provider should support streaming" | ||
| ); | ||
|
|
||
| // Test basic streaming | ||
| let message = Message::user().with_text("Say hello in 3 words"); | ||
| let system_prompt = "You are a helpful assistant."; | ||
|
|
||
| let mut stream = provider | ||
| .stream("test-session-id", system_prompt, &[message], &[]) | ||
| .await?; | ||
|
|
||
| let mut message_count = 0; | ||
| let mut text_content = String::new(); | ||
|
|
||
| // Collect streamed messages | ||
| while let Some(result) = stream.next().await { | ||
| match result { | ||
| Ok((Some(msg), usage_opt)) => { | ||
| message_count += 1; | ||
|
|
||
| // Print debug info | ||
| println!( | ||
| "Stream chunk {}: role={:?}, content_items={}", | ||
| message_count, | ||
| msg.role, | ||
| msg.content.len() | ||
| ); | ||
|
|
||
| // Collect text content | ||
| for content in &msg.content { | ||
| if let goose::conversation::message::MessageContent::Text(text) = content { | ||
| text_content.push_str(&text.text); | ||
| println!(" Text chunk: {}", text.text); | ||
| } | ||
| } | ||
|
|
||
| // Print usage if available | ||
| if let Some(usage) = usage_opt { | ||
| println!( | ||
| " Usage - Input: {:?}, Output: {:?}", | ||
| usage.usage.input_tokens, usage.usage.output_tokens | ||
| ); | ||
| } | ||
| } | ||
| Ok((None, Some(usage))) => { | ||
| println!( | ||
| "Final usage - Input: {:?}, Output: {:?}", | ||
| usage.usage.input_tokens, usage.usage.output_tokens | ||
| ); | ||
| } | ||
| Ok((None, None)) => { | ||
| println!("Stream end marker received"); | ||
| } | ||
| Err(e) => { | ||
| eprintln!("Stream error: {:?}", e); | ||
| return Err(e.into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("\n=== Stream Summary ==="); | ||
| println!("Total chunks received: {}", message_count); | ||
| println!("Final text: {}", text_content); | ||
|
|
||
| assert!( | ||
| message_count > 0, | ||
| "Should receive at least one streamed message" | ||
| ); | ||
| assert!( | ||
| !text_content.is_empty(), | ||
| "Should receive text content in stream" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
These tests call provider.stream() and provider.supports_streaming() on the bedrock provider, but there's no implementation of these methods in the BedrockProvider in this PR. The tests will fail at runtime because the Provider trait's default stream() implementation returns a NotImplemented error and supports_streaming() returns false. Either this PR is incomplete and needs the actual streaming implementation added to bedrock.rs, or these tests should be removed/moved to a different PR that includes the implementation.
| async fn test_bedrock_streaming_with_tools() -> Result<()> { | ||
| dotenv().ok(); | ||
|
|
||
| if !has_bedrock_credentials() { | ||
| eprintln!("Skipping Bedrock streaming-with-tools test: no AWS credentials in environment"); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let provider = create_with_named_model("aws_bedrock", BEDROCK_DEFAULT_MODEL, vec![]).await?; | ||
|
|
||
| // Create a simple tool | ||
| let weather_tool = Tool::new( | ||
| "get_weather", | ||
| "Get the weather for a location", | ||
| object!( | ||
| { | ||
| "type": "object", | ||
| "required": ["location"], | ||
| "properties": { | ||
| "location": { | ||
| "type": "string", | ||
| "description": "The city name" | ||
| } | ||
| } | ||
| } | ||
| ), | ||
| ); | ||
|
|
||
| let message = Message::user().with_text("What's the weather in San Francisco?"); | ||
| let system_prompt = "You are a helpful weather assistant. Always use the get_weather tool to answer weather questions."; | ||
|
|
||
| let mut stream = provider | ||
| .stream( | ||
| "test-session-id", | ||
| system_prompt, | ||
| &[message], | ||
| &[weather_tool], | ||
| ) | ||
| .await?; | ||
|
|
||
| let mut received_tool_request = false; | ||
| let mut message_count = 0; | ||
|
|
||
| while let Some(result) = stream.next().await { | ||
| match result { | ||
| Ok((Some(msg), _)) => { | ||
| message_count += 1; | ||
|
|
||
| for content in &msg.content { | ||
| if matches!( | ||
| content, | ||
| goose::conversation::message::MessageContent::ToolRequest(_) | ||
| ) { | ||
| received_tool_request = true; | ||
| println!("✓ Received tool request in stream"); | ||
| } | ||
| } | ||
| } | ||
| Ok(_) => {} | ||
| Err(e) => { | ||
| eprintln!("Stream error: {:?}", e); | ||
| return Err(e.into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("Total chunks: {}", message_count); | ||
|
|
||
| assert!( | ||
| message_count > 0, | ||
| "Streaming should return at least one chunk" | ||
| ); | ||
|
|
||
| if !received_tool_request { | ||
| println!( | ||
| "Warning: no tool request seen in stream; provider may answer directly without tools" | ||
| ); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
There was a problem hiding this comment.
This test calls provider.stream() on the bedrock provider, but there's no implementation of this method in the BedrockProvider in this PR. The test will fail at runtime because the Provider trait's default stream() implementation returns a NotImplemented error. Either this PR is incomplete and needs the actual streaming implementation added to bedrock.rs, or this test should be removed/moved to a different PR that includes the implementation.
| async fn test_bedrock_streaming_with_tools() -> Result<()> { | |
| dotenv().ok(); | |
| if !has_bedrock_credentials() { | |
| eprintln!("Skipping Bedrock streaming-with-tools test: no AWS credentials in environment"); | |
| return Ok(()); | |
| } | |
| let provider = create_with_named_model("aws_bedrock", BEDROCK_DEFAULT_MODEL, vec![]).await?; | |
| // Create a simple tool | |
| let weather_tool = Tool::new( | |
| "get_weather", | |
| "Get the weather for a location", | |
| object!( | |
| { | |
| "type": "object", | |
| "required": ["location"], | |
| "properties": { | |
| "location": { | |
| "type": "string", | |
| "description": "The city name" | |
| } | |
| } | |
| } | |
| ), | |
| ); | |
| let message = Message::user().with_text("What's the weather in San Francisco?"); | |
| let system_prompt = "You are a helpful weather assistant. Always use the get_weather tool to answer weather questions."; | |
| let mut stream = provider | |
| .stream( | |
| "test-session-id", | |
| system_prompt, | |
| &[message], | |
| &[weather_tool], | |
| ) | |
| .await?; | |
| let mut received_tool_request = false; | |
| let mut message_count = 0; | |
| while let Some(result) = stream.next().await { | |
| match result { | |
| Ok((Some(msg), _)) => { | |
| message_count += 1; | |
| for content in &msg.content { | |
| if matches!( | |
| content, | |
| goose::conversation::message::MessageContent::ToolRequest(_) | |
| ) { | |
| received_tool_request = true; | |
| println!("✓ Received tool request in stream"); | |
| } | |
| } | |
| } | |
| Ok(_) => {} | |
| Err(e) => { | |
| eprintln!("Stream error: {:?}", e); | |
| return Err(e.into()); | |
| } | |
| } | |
| } | |
| println!("Total chunks: {}", message_count); | |
| assert!( | |
| message_count > 0, | |
| "Streaming should return at least one chunk" | |
| ); | |
| if !received_tool_request { | |
| println!( | |
| "Warning: no tool request seen in stream; provider may answer directly without tools" | |
| ); | |
| } | |
| Ok(()) | |
| } | |
| // NOTE: A previous `test_bedrock_streaming_with_tools` lived here, but it relied on | |
| // `BedrockProvider::stream`, which is not yet implemented and would always fail at runtime. | |
| // Reintroduce that test in a future PR once Bedrock streaming support is available. |
|
|
||
| #[tokio::test] | ||
| async fn test_bedrock_streaming_vs_non_streaming_consistency() -> Result<()> { | ||
| dotenv().ok(); | ||
|
|
||
| if !has_bedrock_credentials() { | ||
| eprintln!( | ||
| "Skipping Bedrock streaming vs non-streaming consistency test: no AWS credentials in environment" | ||
| ); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let provider = create_with_named_model("aws_bedrock", BEDROCK_DEFAULT_MODEL, vec![]).await?; | ||
|
|
||
| let message = Message::user().with_text("What is 2+2?"); | ||
| let system_prompt = "Answer very briefly."; | ||
|
|
||
| // Get non-streaming response | ||
| let (non_stream_msg, non_stream_usage) = provider | ||
| .complete( | ||
| "test-session-id", | ||
| system_prompt, | ||
| std::slice::from_ref(&message), | ||
| &[], | ||
| ) | ||
| .await?; | ||
|
|
||
| println!("Non-streaming response: {:?}", non_stream_msg.content); | ||
| println!("Non-streaming usage: {:?}", non_stream_usage); | ||
|
|
||
| // Get streaming response | ||
| let mut stream = provider | ||
| .stream("test-session-id", system_prompt, &[message], &[]) | ||
| .await?; | ||
|
|
||
| let mut stream_text = String::new(); | ||
| let mut stream_usage: Option<ProviderUsage> = None; | ||
|
|
||
| while let Some(result) = stream.next().await { | ||
| match result { | ||
| Ok((Some(msg), _)) => { | ||
| for content in &msg.content { | ||
| if let goose::conversation::message::MessageContent::Text(text) = content { | ||
| stream_text.push_str(&text.text); | ||
| } | ||
| } | ||
| } | ||
| Ok((None, Some(usage))) => { | ||
| stream_usage = Some(usage); | ||
| } | ||
| Ok(_) => {} | ||
| Err(e) => { | ||
| eprintln!("Stream error: {:?}", e); | ||
| return Err(e.into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| println!("Streaming response: {}", stream_text); | ||
| println!("Streaming usage: {:?}", stream_usage); | ||
|
|
||
| // Both should produce text | ||
| let non_stream_text = non_stream_msg | ||
| .content | ||
| .iter() | ||
| .filter_map(|c| { | ||
| if let goose::conversation::message::MessageContent::Text(t) = c { | ||
| Some(t.text.clone()) | ||
| } else { | ||
| None | ||
| } | ||
| }) | ||
| .collect::<Vec<_>>() | ||
| .join(""); | ||
|
|
||
| assert!( | ||
| !non_stream_text.is_empty(), | ||
| "Non-streaming response should have text" | ||
| ); | ||
| assert!( | ||
| !stream_text.is_empty(), | ||
| "Streaming response should have text" | ||
| ); | ||
|
|
||
| // Usage should be available | ||
| assert!( | ||
| non_stream_usage.usage.input_tokens.is_some(), | ||
| "Non-streaming usage should have input tokens" | ||
| ); | ||
| assert!( | ||
| stream_usage.is_some(), | ||
| "Streaming should provide usage information" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
This test calls provider.stream() on the bedrock provider, but there's no implementation of this method in the BedrockProvider in this PR. The test will fail at runtime because the Provider trait's default stream() implementation returns a NotImplemented error. Either this PR is incomplete and needs the actual streaming implementation added to bedrock.rs, or this test should be removed/moved to a different PR that includes the implementation.
| #[tokio::test] | |
| async fn test_bedrock_streaming_vs_non_streaming_consistency() -> Result<()> { | |
| dotenv().ok(); | |
| if !has_bedrock_credentials() { | |
| eprintln!( | |
| "Skipping Bedrock streaming vs non-streaming consistency test: no AWS credentials in environment" | |
| ); | |
| return Ok(()); | |
| } | |
| let provider = create_with_named_model("aws_bedrock", BEDROCK_DEFAULT_MODEL, vec![]).await?; | |
| let message = Message::user().with_text("What is 2+2?"); | |
| let system_prompt = "Answer very briefly."; | |
| // Get non-streaming response | |
| let (non_stream_msg, non_stream_usage) = provider | |
| .complete( | |
| "test-session-id", | |
| system_prompt, | |
| std::slice::from_ref(&message), | |
| &[], | |
| ) | |
| .await?; | |
| println!("Non-streaming response: {:?}", non_stream_msg.content); | |
| println!("Non-streaming usage: {:?}", non_stream_usage); | |
| // Get streaming response | |
| let mut stream = provider | |
| .stream("test-session-id", system_prompt, &[message], &[]) | |
| .await?; | |
| let mut stream_text = String::new(); | |
| let mut stream_usage: Option<ProviderUsage> = None; | |
| while let Some(result) = stream.next().await { | |
| match result { | |
| Ok((Some(msg), _)) => { | |
| for content in &msg.content { | |
| if let goose::conversation::message::MessageContent::Text(text) = content { | |
| stream_text.push_str(&text.text); | |
| } | |
| } | |
| } | |
| Ok((None, Some(usage))) => { | |
| stream_usage = Some(usage); | |
| } | |
| Ok(_) => {} | |
| Err(e) => { | |
| eprintln!("Stream error: {:?}", e); | |
| return Err(e.into()); | |
| } | |
| } | |
| } | |
| println!("Streaming response: {}", stream_text); | |
| println!("Streaming usage: {:?}", stream_usage); | |
| // Both should produce text | |
| let non_stream_text = non_stream_msg | |
| .content | |
| .iter() | |
| .filter_map(|c| { | |
| if let goose::conversation::message::MessageContent::Text(t) = c { | |
| Some(t.text.clone()) | |
| } else { | |
| None | |
| } | |
| }) | |
| .collect::<Vec<_>>() | |
| .join(""); | |
| assert!( | |
| !non_stream_text.is_empty(), | |
| "Non-streaming response should have text" | |
| ); | |
| assert!( | |
| !stream_text.is_empty(), | |
| "Streaming response should have text" | |
| ); | |
| // Usage should be available | |
| assert!( | |
| non_stream_usage.usage.input_tokens.is_some(), | |
| "Non-streaming usage should have input tokens" | |
| ); | |
| assert!( | |
| stream_usage.is_some(), | |
| "Streaming should provide usage information" | |
| ); | |
| Ok(()) | |
| } |
|
thanks @michaelneale this does the trick, but I still think @cronus42 that we should find a different way of doing this and understand better why Conversation doesn't fix this for us out of the box |
|
/cc @tlongwell-block - could this help with what you were seeing? |
|
/goose |
Code Review: PR #7182 - Cronus42/mainSummary: This PR attempts to fix fragmented assistant messages during streaming by consolidating them at multiple layers (agent, session storage, migration, and UI). However, it includes ~150 lines of dead code (BedrockStreamAccumulator) and a test file that will immediately fail. The core consolidation logic is sound but duplicated across three locations. 🔴 Blocking Issues1. Dead Code: BedrockStreamAccumulator is never used [HIGH]crates/goose/src/providers/formats/bedrock.rs:118-267 The PR adds a 150+ line pub struct BedrockStreamAccumulator {
text_blocks: HashMap<i32, String>,
text_block_emitted_char_counts: HashMap<i32, usize>,
tool_blocks: HashMap<i32, (String, String, String)>,
role: Option<Role>,
usage: Option<bedrock::TokenUsage>,
}Verification: rg "BedrockStreamAccumulator" --type rust
# Only shows definition, no usagesThe 2. Test File Will Fail Immediately [HIGH]crates/goose/tests/bedrock_streaming_test.rs:31-34 assert!(
provider.supports_streaming(),
"Bedrock provider should support streaming"
);This assertion will always fail because:
Verification: rg "supports_streaming|fn stream\(" crates/goose/src/providers/bedrock.rs
# Returns nothing - neither method is implementedThe test calls Recommendation: Either implement streaming for BedrockProvider (using the accumulator) or remove both the accumulator and the test file from this PR. 🟡 Warnings3. Logic Duplication: "is_simple_assistant_text" check appears 3 times [MEDIUM]The same merge-eligibility check appears in three places with slight variations:
Example from agent.rs: let should_merge = if let Some(last_msg) = messages_to_add.last() {
last_msg.role == Role::Assistant && response.role == Role::Assistant &&
last_msg.content.len() == 1 &&
matches!(last_msg.content.first(), Some(MessageContent::Text(_))) &&
// ... more conditionsRecommendation: Extract to a shared 4. LeadWorkerProvider streaming falls back to wrong provider [MEDIUM]crates/goose/src/providers/lead_worker.rs:343-349 if primary.supports_streaming() {
return primary.stream(session_id, system, messages, tools).await;
}
if secondary.supports_streaming() {
return secondary.stream(session_id, system, messages, tools).await;
}If the primary provider (e.g., worker) doesn't support streaming but the secondary (lead) does, it silently uses the wrong provider. This breaks the lead/worker model selection semantics - a turn intended for the "worker" model could be handled by the "lead" model. Recommendation: Either error if the intended provider doesn't support streaming, or document this behavior explicitly. Consider adding a debug log when falling back. 🟢 Suggestions5. Scope creep: Unrelated TypeScript type additionui/desktop/src/shell-quote.d.ts - Adding type declarations for shell-quote appears unrelated to the message consolidation fix. 6. ToolGraphNode.depends_on changed to optional- depends_on: string[];
+ depends_on?: string[];This seems like a workaround for some upstream issue rather than a deliberate design change. If ✅ Highlights
Review generated by goose |
|
I thought about this some more and I don't think this is the way. I had goose compare this approach, how streaming works and how it works in rig and then come up with something: @cronus42 could you look at that and see if it makes sense? I realize this might feel as if I am taking over your PR and then have you review it, but I'm just trying to find a good way out (and don't have a way to test this) the basic idea is though that we keep the changes to just bedrock and that we get the id when we start streaming and assign that to all messages and then Conversation puts them back together Does this make sense? |
|
ok, closing. this is not the way. let's find the way! |

Summary
Replaces: #6248