Skip to content

Add into_stream helper for Response#350

Merged
leynos merged 4 commits intomainfrom
codex/add-unified-stream-interface-for-multipacket-responses
Sep 8, 2025
Merged

Add into_stream helper for Response#350
leynos merged 4 commits intomainfrom
codex/add-unified-stream-interface-for-multipacket-responses

Conversation

@leynos
Copy link
Copy Markdown
Owner

@leynos leynos commented Sep 5, 2025

Summary

  • add into_stream helper to convert responses into frame streams
  • use new stream interface in multi-packet utilities and tests
  • document unified stream approach for multi-packet responses

Testing

  • make fmt
  • make lint
  • make test

https://chatgpt.com/codex/tasks/task_e_68bb302ff8488322b72730cfd6402ed7

Summary by Sourcery

Introduce a unified streaming interface for all Response variants by adding into_stream, migrate multi-packet code and tests to use it, and update documentation to describe the new approach

New Features:

  • Add Response::into_stream method to convert any Response variant into a FrameStream

Enhancements:

  • Refactor multi-packet utilities and world handling to consume responses via into_stream instead of manual channel matching

Documentation:

  • Document the unified into_stream approach for multi-packet and streaming responses in the design guide

Tests:

  • Update multi-packet tests to use into_stream with try_collect and simplify the drain_all helper

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Sep 5, 2025

Reviewer's Guide

This PR introduces a unified streaming interface for all Response variants by adding the into_stream helper, refactors multi-packet utilities and tests to leverage this helper, and updates the design documentation to describe the new multi-packet streaming approach.

Class diagram for updated Response and FrameStream

classDiagram
class Response {
  +Single(frame: F)
  +Vec(frames: Vec<F>)
  +Stream(stream: FrameStream<F, E>)
  +MultiPacket(rx: Receiver<F>)
  +Empty
  +into_stream() FrameStream<F, E>
}
class FrameStream {
  <<type alias>>
}
Response --> FrameStream : into_stream()
Response : +from(Vec<F>)
Loading

Flow diagram for unified Response streaming via into_stream

flowchart TD
    A["Response (any variant)"] --> B["into_stream()"]
    B --> C["FrameStream<F, E>"]
    C --> D["Downstream code iterates frames"]
Loading

File-Level Changes

Change Details Files
Add Response::into_stream helper converting each variant to a FrameStream
  • Implement into_stream method with match arms for Single, Vec, Stream, MultiPacket, and Empty
  • Use Box::pin and futures::stream utilities (once, iter, unfold, empty) to build streams
src/response.rs
Refactor multi-packet tests to use the new stream helper
  • Replace manual channel drain logic with drain_all accepting a FrameStream and try_collect()
  • Update tests to call resp.into_stream() and import TryStreamExt
tests/multi_packet.rs
Update world example to use into_stream for draining
  • Remove manual MultiPacket matching and loop
  • Use resp.into_stream().try_collect() to gather frames and extend message list
tests/world.rs
Document unified streaming approach in design guide
  • Add section describing Response::into_stream and its benefits
  • Show how downstream code can consume frames without matching on variants
docs/multi-packet-and-streaming-responses-design.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 5, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Note

Reviews paused

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Summary by CodeRabbit

  • New Features
    • Introduced a unified streaming API to consume any response as a stream, simplifying handling of single, multi-packet, vector, or empty responses.
  • Documentation
    • Added guidance and examples for using the new streaming convenience to iterate over response frames consistently.
  • Tests
    • Updated tests to use the new streaming approach, improving clarity and aligning with the unified API.

Walkthrough

Introduce Response::into_stream to convert all Response variants into a FrameStream. Update tests to consume responses via streams instead of matching MultiPacket. Add design documentation describing the behaviour and usage.

Changes

Cohort / File(s) Summary
Docs
docs/multi-packet-and-streaming-responses-design.md
Add design doc describing Response::into_stream and how it normalises all Response variants to FrameStream.
Core API
src/response.rs
Add impl<F: Send + 'static, E: Send + 'static> Response<F, E>::into_stream(self) -> FrameStream<F, E>. Map Single, Vec, Stream, MultiPacket, and Empty variants to a unified FrameStream. Include a doc example.
Tests
tests/* (tests/multi_packet.rs, tests/world.rs)
Import futures::StreamExt / futures::TryStreamExt. Replace manual MultiPacket handling with resp.into_stream(). Refactor drain helpers to consume FrameStream via stream collect/try_collect and mapping, and update comments.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller as Caller
  participant Resp as Response<F,E>
  participant API as Response::into_stream
  participant FS as FrameStream<F,E>
  participant Cons as Consumer

  Caller->>Resp: obtain response
  Caller->>API: into_stream(self)
  API->>FS: build FrameStream from variant
  note right of API #88ccee: Single → one Ok(frame)
  note right of API #88ccee: Vec → Ok(frame) per item
  note right of API #88ccee: Stream → passthrough
  note right of API #88ccee: MultiPacket → unfold(rx.recv)
  note right of API #88ccee: Empty → completes immediately
  API-->>Caller: FrameStream
  loop iterate frames
    Caller->>FS: next / poll
    FS-->>Cons: Result<Frame, E>
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

Streams now flow where packets stood,
One river from a branching wood.
Frames drift by with gentle gleam,
Tests sip truth from a single stream.
Channels hush, the docs proclaim—
Into the current, all the same. 🌊

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch codex/add-unified-stream-interface-for-multipacket-responses

Comment @coderabbitai help to get the list of available commands and usage tips.

codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.


Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Sep 5, 2025

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 5, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Sep 6, 2025

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 6, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2f95cc0 and 65e2807.

📒 Files selected for processing (4)
  • docs/multi-packet-and-streaming-responses-design.md (1 hunks)
  • src/response.rs (1 hunks)
  • tests/multi_packet.rs (5 hunks)
  • tests/world.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.md

⚙️ CodeRabbit configuration file

**/*.md: * Avoid 2nd person or 1st person pronouns ("I", "you", "we")

  • Use en-GB-oxendict (-ize / -our) spelling and grammar
  • Headings must not be wrapped.
  • Documents must start with a level 1 heading
  • Headings must correctly increase or decrease by no more than one level at a time
  • Use GitHub-flavoured Markdown style for footnotes and endnotes.
  • Numbered footnotes must be numbered by order of appearance in the document.

Files:

  • docs/multi-packet-and-streaming-responses-design.md
**/*.rs

⚙️ CodeRabbit configuration file

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments and docs must follow en-GB-oxendict (-ize / -our) spelling and grammar

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.

  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

  • Files must not exceed 400 lines in length

    • Large modules must be decomposed
    • Long match statements or dispatch tables should be decomposed by domain and collocated with targets
    • Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
  • Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such

    • For testing of functionality depending upon environment variables, dependency injection and the mockable crate are the preferred option.
    • If mockable cannot be used, env mutations in tests ...

Files:

  • tests/world.rs
  • src/response.rs
  • tests/multi_packet.rs
🔇 Additional comments (7)
docs/multi-packet-and-streaming-responses-design.md (1)

113-116: Document the unified consumption path clearly (LGTM).

The addition succinctly explains the purpose of into_stream and avoids second-person phrasing. No further edits required for style or UK spelling.

tests/world.rs (1)

11-11: Import only the extension used (LGTM).

StreamExt is required for .next(). Keep as-is.

tests/multi_packet.rs (5)

3-3: Import StreamExt is correct for map/collect usage.
Keep this; it enables the combinators used below.


25-25: Adopt into_stream in the assertion path.
The new unified interface is exercised correctly.


35-35: Use into_stream for the empty-channel case.
This correctly asserts EOS behaviour.


46-46: Use into_stream for early sender drop.
Covers termination semantics; looks good.


60-60: Stream through capacity pressure scenario is correct.
Backpressure is exercised while preserving order.

Comment thread src/response.rs
Comment thread tests/multi_packet.rs
Comment thread tests/multi_packet.rs Outdated
Comment thread tests/world.rs Outdated
codescene-delta-analysis[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
tests/multi_packet.rs (1)

10-13: Generalise drain_all to any TryStream.

Widen the helper for reuse across tests.

-async fn drain_all(stream: wireframe::FrameStream<TestMsg, ()>) -> Vec<TestMsg> {
-    stream.try_collect::<Vec<_>>().await.expect("stream error")
-}
+async fn drain_all<S, E>(stream: S) -> Vec<TestMsg>
+where
+    S: futures::stream::TryStream<Ok = TestMsg, Error = E>,
+    E: std::fmt::Debug,
+{
+    use futures::TryStreamExt;
+    stream.try_collect::<Vec<_>>().await.expect("stream error")
+}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65e2807 and 408670d.

📒 Files selected for processing (3)
  • src/response.rs (1 hunks)
  • tests/multi_packet.rs (5 hunks)
  • tests/world.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

**/*.rs: Use precise names; boolean names should start with is/has/should
Use en-GB-oxendict spelling and grammar in comments
Function documentation must include clear examples; test documentation should omit redundant examples
Keep code files ≤ 400 lines; split long switch/dispatch logic by feature; move large test data to external files
Disallow Clippy warnings
Fix warnings emitted during tests in code rather than silencing them
Extract helper functions for long functions; adhere to separation of concerns and CQRS
Group related parameters into meaningful structs when functions have many parameters
Consider using Arc for large error returns to reduce data size
Each Rust module must begin with a module-level //! comment describing purpose and utility
Document public APIs with Rustdoc /// comments to enable cargo doc generation
Prefer immutable data; avoid unnecessary mut
Handle errors with Result instead of panicking where feasible
Avoid unsafe code unless necessary and document any usage clearly
Place function attributes after doc comments
Do not use return in single-line functions
Use predicate functions for conditional criteria with more than two branches
Do not silence lints except as a last resort
Lint suppressions must be tightly scoped and include a clear reason
Prefer #[expect(..)] over #[allow(..)] for lints
Prefer .expect() over .unwrap()
Use concat!() to combine long string literals rather than escaping newlines
Prefer single-line function bodies where appropriate (e.g., pub fn new(id: u64) -> Self { Self(id) })
Prefer semantic error enums deriving std::error::Error via thiserror for inspectable conditions

Files:

  • tests/world.rs
  • tests/multi_packet.rs
  • src/response.rs

⚙️ CodeRabbit configuration file

**/*.rs: * Seek to keep the cyclomatic complexity of functions no more than 12.

  • Adhere to single responsibility and CQRS

  • Place function attributes after doc comments.

  • Do not use return in single-line functions.

  • Move conditionals with >2 branches into a predicate function.

  • Avoid unsafe unless absolutely necessary.

  • Every module must begin with a //! doc comment that explains the module's purpose and utility.

  • Comments and docs must follow en-GB-oxendict (-ize / -our) spelling and grammar

  • Lints must not be silenced except as a last resort.

    • #[allow] is forbidden.
    • Only narrowly scoped #[expect(lint, reason = "...")] is allowed.
    • No lint groups, no blanket or file-wide suppression.
    • Include FIXME: with link if a fix is expected.
  • Where code is only used by specific features, it must be conditionally compiled or a conditional expectation for unused_code applied.

  • Use rstest fixtures for shared setup and to avoid repetition between tests.

  • Replace duplicated tests with #[rstest(...)] parameterised cases.

  • Prefer mockall for mocks/stubs.

  • Prefer .expect() over .unwrap()

  • Ensure that any API or behavioural changes are reflected in the documentation in docs/

  • Ensure that any completed roadmap steps are recorded in the appropriate roadmap in docs/

  • Files must not exceed 400 lines in length

    • Large modules must be decomposed
    • Long match statements or dispatch tables should be decomposed by domain and collocated with targets
    • Large blocks of inline data (e.g., test fixtures, constants or templates) must be moved to external files and inlined at compile-time or loaded at run-time.
  • Environment access (env::set_var and env::remove_var) are always unsafe in Rust 2024 and MUST be marked as such

    • For testing of functionality depending upon environment variables, dependency injection and the mockable crate are the preferred option.
    • If mockable cannot be used, env mutations in tests ...

Files:

  • tests/world.rs
  • tests/multi_packet.rs
  • src/response.rs
{src,tests}/**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

Write unit and behavioural tests for new functionality

Files:

  • tests/world.rs
  • tests/multi_packet.rs
  • src/response.rs
tests/**/*.rs

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.rs: Use rstest fixtures for shared setup
Replace duplicated tests with #[rstest(...)] parameterised cases
Prefer mockall for mocks/stubs

Files:

  • tests/world.rs
  • tests/multi_packet.rs
🧬 Code graph analysis (1)
tests/multi_packet.rs (1)
tests/world.rs (1)
  • resp (220-223)
⏰ Context from checks skipped due to timeout of 120000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build-test
🔇 Additional comments (2)
src/response.rs (1)

116-152: LGTM: Unified streaming API is clean and bounds are correct.

The variant coverage and Send + 'static bounds look solid; Vec arm uses explicit Ok typing.

tests/multi_packet.rs (1)

3-3: LGTM: Tests align with into_stream and use error-aware draining.

Import of TryStreamExt, try_collect in drain_all, updated doc comment, and into_stream usage read well.

Also applies to: 10-13, 15-16, 25-25, 35-35, 46-46, 60-60

Comment thread src/response.rs
Comment thread src/response.rs Outdated
Comment thread tests/world.rs Outdated
Comment thread tests/world.rs
codescene-delta-analysis[bot]

This comment was marked as outdated.

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Sep 7, 2025

@coderabbitai pause

@leynos
Copy link
Copy Markdown
Owner Author

leynos commented Sep 7, 2025

@sourcery-ai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 7, 2025

✅ Actions performed

Reviews paused.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `tests/multi_packet.rs` </location>
<code_context>
+async fn multi_packet_handles_channel_capacity() {
</code_context>

<issue_to_address>
Consider adding a test for empty Response::Vec and Response::Empty variants.

Please add tests for Response::Vec with an empty vector and Response::Empty to ensure into_stream returns an empty stream for these cases.
</issue_to_address>

### Comment 2
<location> `src/response.rs:139` </location>
<code_context>
+    /// # }
+    /// ```
+    #[must_use]
+    pub fn into_stream(self) -> FrameStream<F, E> {
+        match self {
+            Response::Single(frame) => Box::pin(futures::stream::once(async move {
</code_context>

<issue_to_address>
Consider refactoring the method to box the stream only once and use standard stream adapters for clarity and efficiency.

Here’s a trimmed-down version that

• boxes only once,  
• uses `tokio_stream::wrappers::ReceiverStream` for `MultiPacket`,  
• and leverages common adapters (`map(Ok)`, `stream::once`, `stream::iter`):

```rust
use tokio_stream::wrappers::ReceiverStream;
use futures::{stream, StreamExt, TryStreamExt};

impl<F: Send + 'static, E: Send + 'static> Response<F, E> {
    pub fn into_stream(self) -> FrameStream<F, E> {
        let s = match self {
            Response::Single(f)      => stream::once(async move { Ok::<F, WireframeError<E>>(f) }),
            Response::Vec(frames)    => stream::iter(frames.into_iter().map(Ok)),
            Response::Stream(s)      => s,
            Response::MultiPacket(rx) => ReceiverStream::new(rx).map(Ok),
            Response::Empty           => stream::empty(),
        };
        Box::pin(s)
    }
}
```

Steps to apply:

1. Import:
   ```rust
   use tokio_stream::wrappers::ReceiverStream;
   use futures::{stream, StreamExt, TryStreamExt};
   ```
2. Move the `Box::pin(...)` call to wrap the entire `match` result instead of each arm.
3. Replace the manual `unfold` on the channel with `ReceiverStream::new(rx).map(Ok)`.
4. Use `stream::once` and `stream::iter(... .map(Ok))` for `Single` / `Vec`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread tests/multi_packet.rs
Comment thread src/response.rs
@leynos leynos merged commit 966d9cc into main Sep 8, 2025
5 checks passed
@leynos leynos deleted the codex/add-unified-stream-interface-for-multipacket-responses branch September 8, 2025 22:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant