Skip to content

Commit 4df61e1

Browse files
committed
refactor: remove ConnectStreamingRequest && update document
1 parent c7f9c7b commit 4df61e1

File tree

5 files changed

+28
-64
lines changed

5 files changed

+28
-64
lines changed

connectrpc-axum/src/handler.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
validate_unary_content_type,
1313
},
1414
error::ConnectError,
15-
message::{ConnectRequest, ConnectResponse, ConnectStreamingRequest, StreamBody, Streaming},
15+
message::{ConnectRequest, ConnectResponse, StreamBody, Streaming},
1616
};
1717
use futures::Stream;
1818
use prost::Message;
@@ -619,10 +619,10 @@ pub struct ConnectClientStreamHandlerWrapper<F>(pub F);
619619
/// Type alias for compatibility with generated code
620620
pub type ConnectClientStreamHandler<F> = ConnectClientStreamHandlerWrapper<F>;
621621

622-
impl<F, Fut, Req, Resp> Handler<(ConnectStreamingRequest<Req>,), ()>
622+
impl<F, Fut, Req, Resp> Handler<(ConnectRequest<Streaming<Req>>,), ()>
623623
for ConnectClientStreamHandlerWrapper<F>
624624
where
625-
F: Fn(ConnectStreamingRequest<Req>) -> Fut + Clone + Send + Sync + 'static,
625+
F: Fn(ConnectRequest<Streaming<Req>>) -> Fut + Clone + Send + Sync + 'static,
626626
Fut: Future<Output = Result<ConnectResponse<Resp>, ConnectError>> + Send + 'static,
627627
Req: Message + DeserializeOwned + Default + Send + 'static,
628628
Resp: Message + serde::Serialize + Send + Clone + Sync + 'static,
@@ -644,10 +644,11 @@ where
644644
}
645645

646646
// Extract the streaming request
647-
let streaming_req = match ConnectStreamingRequest::<Req>::from_request(req, &()).await {
648-
Ok(value) => value,
649-
Err(err) => return err.into_response(),
650-
};
647+
let streaming_req =
648+
match ConnectRequest::<Streaming<Req>>::from_request(req, &()).await {
649+
Ok(value) => value,
650+
Err(err) => return err.into_response(),
651+
};
651652

652653
// Call the handler function
653654
// Note: Timeout is enforced by ConnectLayer, not here
@@ -672,7 +673,7 @@ where
672673
/// Client streaming: client sends a stream of messages, server responds with one message.
673674
pub fn post_client_stream<F, Req, Resp, Fut>(f: F) -> MethodRouter<()>
674675
where
675-
F: Fn(ConnectStreamingRequest<Req>) -> Fut + Clone + Send + Sync + 'static,
676+
F: Fn(ConnectRequest<Streaming<Req>>) -> Fut + Clone + Send + Sync + 'static,
676677
Fut: Future<Output = Result<ConnectResponse<Resp>, ConnectError>> + Send + 'static,
677678
Req: Message + DeserializeOwned + Default + Send + 'static,
678679
Resp: Message + serde::Serialize + Send + Clone + Sync + 'static,
@@ -694,10 +695,10 @@ pub struct ConnectBidiStreamHandlerWrapper<F>(pub F);
694695
/// Type alias for compatibility with generated code
695696
pub type ConnectBidiStreamHandler<F> = ConnectBidiStreamHandlerWrapper<F>;
696697

697-
impl<F, Fut, Req, Resp, St> Handler<(ConnectStreamingRequest<Req>,), ()>
698+
impl<F, Fut, Req, Resp, St> Handler<(ConnectRequest<Streaming<Req>>,), ()>
698699
for ConnectBidiStreamHandlerWrapper<F>
699700
where
700-
F: Fn(ConnectStreamingRequest<Req>) -> Fut + Clone + Send + Sync + 'static,
701+
F: Fn(ConnectRequest<Streaming<Req>>) -> Fut + Clone + Send + Sync + 'static,
701702
Fut: Future<Output = Result<ConnectResponse<StreamBody<St>>, ConnectError>> + Send + 'static,
702703
St: Stream<Item = Result<Resp, ConnectError>> + Send + 'static,
703704
Req: Message + DeserializeOwned + Default + Send + 'static,
@@ -720,10 +721,11 @@ where
720721
}
721722

722723
// Extract the streaming request
723-
let streaming_req = match ConnectStreamingRequest::<Req>::from_request(req, &()).await {
724-
Ok(value) => value,
725-
Err(err) => return err.into_response(),
726-
};
724+
let streaming_req =
725+
match ConnectRequest::<Streaming<Req>>::from_request(req, &()).await {
726+
Ok(value) => value,
727+
Err(err) => return err.into_response(),
728+
};
727729

728730
// Call the handler function
729731
// Note: Timeout is enforced by ConnectLayer, not here
@@ -748,7 +750,7 @@ where
748750
/// Requires HTTP/2 for full-duplex communication.
749751
pub fn post_bidi_stream<F, Req, Resp, St, Fut>(f: F) -> MethodRouter<()>
750752
where
751-
F: Fn(ConnectStreamingRequest<Req>) -> Fut + Clone + Send + Sync + 'static,
753+
F: Fn(ConnectRequest<Streaming<Req>>) -> Fut + Clone + Send + Sync + 'static,
752754
Fut: Future<Output = Result<ConnectResponse<StreamBody<St>>, ConnectError>> + Send + 'static,
753755
St: Stream<Item = Result<Resp, ConnectError>> + Send + 'static,
754756
Req: Message + DeserializeOwned + Default + Send + 'static,

connectrpc-axum/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,6 @@ pub mod prelude {
8080
ConnectStreamHandlerWrapper,
8181
};
8282
pub use crate::layer::{ConnectLayer, ConnectService};
83-
pub use crate::message::{
84-
ConnectRequest, ConnectResponse, ConnectStreamingRequest, StreamBody, Streaming,
85-
};
83+
pub use crate::message::{ConnectRequest, ConnectResponse, StreamBody, Streaming};
8684
pub use crate::service_builder::MakeServiceBuilder;
8785
}

connectrpc-axum/src/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
mod request;
44
mod response;
55

6-
pub use request::{ConnectRequest, ConnectStreamingRequest, Streaming};
6+
pub use request::{ConnectRequest, Streaming};
77
pub use response::{ConnectResponse, StreamBody};

connectrpc-axum/src/message/request.rs

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -256,48 +256,6 @@ where
256256
Ok(ConnectRequest(message))
257257
}
258258

259-
/// Streaming request extractor for client streaming and bidi streaming RPCs.
260-
///
261-
/// Parses multiple frames from the request body into a `Stream`.
262-
/// Each frame follows the Connect protocol format: `[flags:1][length:4][payload]`
263-
///
264-
/// This extractor is designed for generated code and does not support
265-
/// additional Axum extractors like `ConnectRequest` does.
266-
pub struct ConnectStreamingRequest<T> {
267-
/// Stream of decoded messages from the request body.
268-
pub stream: Pin<Box<dyn Stream<Item = Result<T, ConnectError>> + Send>>,
269-
}
270-
271-
impl<S, T> FromRequest<S> for ConnectStreamingRequest<T>
272-
where
273-
S: Send + Sync,
274-
T: Message + DeserializeOwned + Default + Send + 'static,
275-
{
276-
type Rejection = ConnectError;
277-
278-
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
279-
// Only POST is supported for streaming requests
280-
if *req.method() != Method::POST {
281-
return Err(ConnectError::new(
282-
Code::Unimplemented,
283-
"streaming requests only support POST method",
284-
));
285-
}
286-
287-
// Get context (with fallback to default if layer is missing)
288-
let ctx = get_context_or_default(&req);
289-
290-
let use_proto = ctx.protocol.is_proto();
291-
let request_encoding = ctx.compression.request_encoding;
292-
let body = req.into_body();
293-
294-
let stream = create_frame_stream::<T>(body, use_proto, ctx.limits, request_encoding);
295-
Ok(ConnectStreamingRequest {
296-
stream: Box::pin(stream),
297-
})
298-
}
299-
}
300-
301259
/// `FromRequest` implementation for streaming requests using the unified `ConnectRequest<Streaming<T>>` pattern.
302260
///
303261
/// This enables handlers to use the same `ConnectRequest` wrapper for both unary and streaming:

docs/guide/architecture.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ The `pipeline.rs` module provides the low-level functions used by extractors and
6363
- `decompress_bytes` - Decompress based on encoding
6464
- `decode_proto` / `decode_json` - Decode message from bytes
6565
- `unwrap_envelope` - Unwrap Connect streaming frame
66+
- `get_context_or_default` - Get `ConnectContext` from request extensions (with fallback)
67+
68+
**`RequestPipeline` methods:**
69+
- `decode` - Decode from HTTP request (read body, decompress, decode)
70+
- `decode_bytes` - Decode from raw bytes (decompress, check size, decode)
6671
- `decode_enveloped_bytes` - Decode from enveloped bytes (for `application/connect+json` or `application/connect+proto`)
6772

6873
**Response side:**
@@ -91,9 +96,8 @@ These are the types you interact with when building services:
9196
| Type | Purpose |
9297
|------|---------|
9398
| `ConnectRequest<T>` | Axum extractor - deserializes protobuf/JSON from request body |
94-
| `ConnectRequest<Streaming<T>>` | Extractor for client/bidi streaming requests (unified pattern) |
99+
| `ConnectRequest<Streaming<T>>` | Extractor for client/bidi streaming requests |
95100
| `Streaming<T>` | Stream of messages from client (similar to Tonic's `Streaming<T>`) |
96-
| `ConnectStreamingRequest<T>` | Extractor for client streaming requests (legacy) |
97101
| `ConnectResponse<T>` | Response wrapper - encodes per detected protocol |
98102
| `ConnectResponse<StreamBody<S>>` | Server streaming response wrapper |
99103
| `StreamBody<S>` | Marker for streaming response bodies |
@@ -272,12 +276,14 @@ Built-in implementations:
272276

273277
The `default_codec()` function returns the appropriate codec for a `CompressionEncoding`. Custom codecs (zstd, brotli, etc.) can implement the `Codec` trait.
274278

279+
Response compression negotiation follows RFC 7231: `negotiate_response_encoding()` parses `Accept-Encoding` headers respecting client preference order and `q=0` (not acceptable) values.
280+
275281
### message/ module
276282

277283
| Module | Purpose |
278284
|--------|---------|
279285
| `request.rs` | `ConnectRequest<T>` and `Streaming<T>` extractors |
280-
| `response.rs` | `ConnectResponse<T>` and `ConnectResponse<StreamBody<S>>` encoding |
286+
| `response.rs` | `ConnectResponse<T>` and streaming response encoding |
281287

282288
## Code Generation
283289

0 commit comments

Comments
 (0)