Skip to content

Commit 0f0f656

Browse files
committed
feat: streambuffer
Signed-off-by: Shane Utt <shaneutt@linux.com>
1 parent 3662e4e commit 0f0f656

24 files changed

Lines changed: 731 additions & 410 deletions

File tree

benchmarks/microbenchmarks/common.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ pub(crate) fn make_ctx(req: &Request) -> HttpFilterContext<'_> {
4242
health_registry: None,
4343
request: req,
4444
request_body_bytes: 0,
45+
request_body_mode: praxis_filter::BodyMode::Stream,
4546
request_start: std::time::Instant::now(),
4647
response_body_bytes: 0,
48+
response_body_mode: praxis_filter::BodyMode::Stream,
4749
response_header: None,
4850
response_headers_modified: false,
4951
rewritten_path: None,

docs/architecture.md

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,10 @@ flowchart TD
250250
Proto --> |"on Release or EOS: forward buffer"| Upstream
251251
```
252252

253-
Three delivery modes:
253+
Two delivery modes:
254254

255255
- **Stream**: chunks flow through filters as they arrive.
256256
Low latency, low memory.
257-
- **Buffer**: the protocol layer accumulates chunks into a
258-
`BodyBuffer`, then delivers the full body in a single
259-
call. Required when any filter needs the complete body.
260257
- **StreamBuffer**: chunks are delivered to filters
261258
incrementally (like Stream) but accumulated in a buffer
262259
and not forwarded to upstream until a filter returns
@@ -275,10 +272,9 @@ routing decisions. The pre-read body is stored and
275272
forwarded to the upstream after the connection is
276273
established.
277274

278-
Precedence: `Buffer` > `StreamBuffer` > `Stream`. If any
279-
filter requests `Buffer`, the entire pipeline buffers. If
280-
any filter requests `StreamBuffer` (and none requests
281-
`Buffer`), the pipeline uses stream-buffered mode.
275+
Precedence: `StreamBuffer` > `SizeLimit` > `Stream`. If
276+
any filter requests `StreamBuffer`, the pipeline uses
277+
stream-buffered mode.
282278
Global `body_limits.max_request_bytes` / `body_limits.max_response_bytes`
283279
config limits force buffer mode for size enforcement even
284280
when no filter requests body access.
@@ -411,7 +407,7 @@ praxis-filter Filter pipeline engine
411407
│ ├── access BodyAccess enum
412408
│ ├── buffer BodyBuffer and overflow handling
413409
│ ├── builder Pre-computed BodyCapabilities
414-
│ └── mode BodyMode enum (Stream, Buffer, StreamBuffer)
410+
│ └── mode BodyMode enum (Stream, StreamBuffer, SizeLimit)
415411
├── condition/ Condition evaluation for filter gating
416412
│ ├── request Request condition evaluation
417413
│ └── response Response condition evaluation

docs/configuration.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -545,8 +545,8 @@ Each rule has:
545545
| `negate` | bool | no | Invert match (default: false) |
546546

547547
Each rule must have either `contains` or `pattern`, not
548-
both. Body rules use Buffer mode (up to 1 MiB by default)
549-
to inspect the full request body.
548+
both. Body rules use StreamBuffer mode (up to 1 MiB by
549+
default) to inspect the full request body.
550550

551551
### CORS
552552

@@ -649,7 +649,7 @@ body hooks. A filter can have both `conditions` and
649649
## Payload Size Limits
650650

651651
Global hard ceilings on request and response payload
652-
size. These apply across all body modes (Stream, Buffer,
652+
size. These apply across all body modes (Stream,
653653
StreamBuffer). When a filter also declares a per-filter
654654
`max_bytes`, the smaller of the two limits is enforced.
655655
Requests exceeding the limit receive 413 (Payload Too

docs/extensions.md

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,15 +326,40 @@ chunks in place.
326326
- `Stream`: lowest latency; chunks flow through as they
327327
arrive. Best for filters that inspect headers only or
328328
process chunks independently.
329-
- `Buffer`: accumulates the entire body before
330-
delivering it. Use when your filter needs the complete
331-
body (e.g. signature verification). Set `max_bytes` to
332-
avoid unbounded memory growth.
333329
- `StreamBuffer`: chunks flow through filters
334330
incrementally but forwarding to upstream is deferred
335331
until `Release` or end-of-stream. Use when body
336-
content influences routing or when you need to inspect
337-
the full body before upstream selection.
332+
content influences routing, when you need the complete
333+
body (e.g. signature verification), or when you need
334+
to inspect the full body before upstream selection.
335+
Set `max_bytes` to avoid unbounded memory growth.
336+
337+
Two patterns for declaring `StreamBuffer`:
338+
339+
**Static declaration** (filter always needs the body):
340+
341+
```rust
342+
fn request_body_mode(&self) -> BodyMode {
343+
BodyMode::StreamBuffer { max_bytes: Some(1_048_576) }
344+
}
345+
```
346+
347+
**Per-request upgrade** (conditional buffering):
348+
349+
```rust
350+
async fn on_request(
351+
&self, ctx: &mut HttpFilterContext<'_>,
352+
) -> Result<FilterAction, FilterError> {
353+
if needs_body_inspection(ctx) {
354+
ctx.set_request_body_mode(
355+
BodyMode::StreamBuffer {
356+
max_bytes: Some(1_048_576),
357+
},
358+
);
359+
}
360+
Ok(FilterAction::Continue)
361+
}
362+
```
338363

339364
### `on_response_body` is synchronous
340365

docs/features.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
by default, opt-in buffered or stream-buffered payload
4545
access with configurable size limits.
4646
Stream mode passes chunks through as they arrive
47-
(lowest latency). Buffer mode accumulates the full
48-
body. StreamBuffer delivers chunks to filters
47+
(lowest latency). StreamBuffer delivers chunks to
48+
filters
4949
incrementally but defers upstream forwarding until
5050
release. See [Payload Processing][payload-processing]
5151
in the architecture docs.

docs/filters.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,10 +336,10 @@ fn request_body_access(&self) -> BodyAccess {
336336
| Mode | Behavior | Use case |
337337
| ----------------------------- | --------------- | ------------------------- |
338338
| `Stream` (default) | Per chunk | Logging, transforms |
339-
| `Buffer { max_bytes }` | Full body | JSON, payload routing |
340339
| `StreamBuffer { max_bytes }` | Deferred stream | Inspection before forward |
341340

342-
If any filter requests `Buffer`, the pipeline buffers.
341+
If any filter requests `StreamBuffer`, the pipeline
342+
defers upstream forwarding until release.
343343

344344
### StreamBuffer Mode
345345

filter/src/actions.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,17 @@ impl Rejection {
9090
}
9191
}
9292

93-
/// Add a header to the rejection response.
94-
pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
95-
self.headers.push((name.into(), value.into()));
96-
self
97-
}
98-
9993
/// Set the body of the rejection response.
10094
pub fn with_body(mut self, body: impl Into<Bytes>) -> Self {
10195
self.body = Some(body.into());
10296
self
10397
}
98+
99+
/// Add a header to the rejection response.
100+
pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
101+
self.headers.push((name.into(), value.into()));
102+
self
103+
}
104104
}
105105

106106
// -----------------------------------------------------------------------------

filter/src/body/builder.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@ use super::BodyMode;
2222
///
2323
/// let caps = BodyCapabilities {
2424
/// needs_request_body: true,
25-
/// request_body_mode: BodyMode::Buffer { max_bytes: 4096 },
25+
/// request_body_mode: BodyMode::StreamBuffer {
26+
/// max_bytes: Some(4096),
27+
/// },
2628
/// ..Default::default()
2729
/// };
2830
/// assert!(caps.needs_request_body);
2931
/// assert!(matches!(
3032
/// caps.request_body_mode,
31-
/// BodyMode::Buffer { max_bytes: 4096 }
33+
/// BodyMode::StreamBuffer {
34+
/// max_bytes: Some(4096)
35+
/// }
3236
/// ));
3337
/// assert!(!caps.needs_response_body, "unset fields stay at default");
3438
/// ```
@@ -38,12 +42,12 @@ pub struct BodyCapabilities {
3842
/// Whether any filter writes to the request body.
3943
pub any_request_body_writer: bool,
4044

41-
/// Whether any response condition references headers.
42-
pub any_response_condition_uses_headers: bool,
43-
4445
/// Whether any filter writes to the response body.
4546
pub any_response_body_writer: bool,
4647

48+
/// Whether any response condition references headers.
49+
pub any_response_condition_uses_headers: bool,
50+
4751
/// Whether any filter needs request body access.
4852
pub needs_request_body: bool,
4953

@@ -53,10 +57,10 @@ pub struct BodyCapabilities {
5357
/// Whether any filter needs response body access.
5458
pub needs_response_body: bool,
5559

56-
/// Resolved request body mode (Buffer if any filter requires it).
60+
/// Resolved request body mode (`StreamBuffer` if any filter requires it).
5761
pub request_body_mode: BodyMode,
5862

59-
/// Resolved response body mode (Buffer if any filter requires it).
63+
/// Resolved response body mode (`StreamBuffer` if any filter requires it).
6064
pub response_body_mode: BodyMode,
6165
}
6266

filter/src/body/mode.rs

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,15 @@
1515
/// let mode = BodyMode::default();
1616
/// assert!(matches!(mode, BodyMode::Stream));
1717
///
18-
/// let buffered = BodyMode::Buffer { max_bytes: 1024 };
19-
/// assert!(matches!(buffered, BodyMode::Buffer { max_bytes: 1024 }));
18+
/// let buffered = BodyMode::StreamBuffer {
19+
/// max_bytes: Some(1024),
20+
/// };
21+
/// assert!(matches!(
22+
/// buffered,
23+
/// BodyMode::StreamBuffer {
24+
/// max_bytes: Some(1024)
25+
/// }
26+
/// ));
2027
///
2128
/// let stream_buf = BodyMode::StreamBuffer { max_bytes: None };
2229
/// assert!(matches!(
@@ -41,6 +48,7 @@
4148
/// ));
4249
/// ```
4350
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51+
#[non_exhaustive]
4452
pub enum BodyMode {
4553
/// Deliver chunks as they arrive. Low latency, low memory.
4654
///
@@ -53,19 +61,6 @@ pub enum BodyMode {
5361
#[default]
5462
Stream,
5563

56-
/// Buffer the entire body, then deliver it in a single call.
57-
///
58-
/// ```
59-
/// use praxis_filter::BodyMode;
60-
///
61-
/// let mode = BodyMode::Buffer { max_bytes: 8192 };
62-
/// assert!(matches!(mode, BodyMode::Buffer { max_bytes: 8192 }));
63-
/// ```
64-
Buffer {
65-
/// Maximum body size in bytes.
66-
max_bytes: usize,
67-
},
68-
6964
/// Deliver chunks incrementally (like [`Stream`]) but accumulate
7065
/// them and defer upstream forwarding until a filter returns
7166
/// [`FilterAction::Release`] or end-of-stream is reached.
@@ -141,16 +136,6 @@ mod tests {
141136
);
142137
}
143138

144-
#[test]
145-
fn body_mode_buffer_carries_limit() {
146-
let mode = BodyMode::Buffer { max_bytes: 4096 };
147-
148-
assert!(
149-
matches!(mode, BodyMode::Buffer { max_bytes: 4096 }),
150-
"Buffer variant should carry configured limit"
151-
);
152-
}
153-
154139
#[test]
155140
fn body_mode_stream_buffer_unlimited() {
156141
let mode = BodyMode::StreamBuffer { max_bytes: None };
@@ -179,21 +164,16 @@ mod tests {
179164
}
180165

181166
#[test]
182-
fn body_mode_size_limit_is_distinct_from_buffer() {
167+
fn body_mode_size_limit_is_distinct_from_stream_buffer() {
183168
assert_ne!(
184169
BodyMode::SizeLimit { max_bytes: 100 },
185-
BodyMode::Buffer { max_bytes: 100 },
186-
"SizeLimit and Buffer should be distinct even with same limit"
170+
BodyMode::StreamBuffer { max_bytes: Some(100) },
171+
"SizeLimit and StreamBuffer should be distinct even with same limit"
187172
);
188173
}
189174

190175
#[test]
191-
fn body_mode_stream_buffer_is_distinct() {
192-
assert_ne!(
193-
BodyMode::StreamBuffer { max_bytes: None },
194-
BodyMode::Buffer { max_bytes: 100 },
195-
"StreamBuffer and Buffer should be distinct variants"
196-
);
176+
fn body_mode_stream_buffer_is_distinct_from_stream() {
197177
assert_ne!(
198178
BodyMode::StreamBuffer { max_bytes: None },
199179
BodyMode::Stream,

filter/src/builtins/http/security/guardrails/filter.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ impl HttpFilter for GuardrailsFilter {
199199

200200
fn request_body_mode(&self) -> BodyMode {
201201
if self.needs_body {
202-
BodyMode::Buffer {
203-
max_bytes: DEFAULT_MAX_BODY_BYTES,
202+
BodyMode::StreamBuffer {
203+
max_bytes: Some(DEFAULT_MAX_BODY_BYTES),
204204
}
205205
} else {
206206
BodyMode::Stream
@@ -224,8 +224,12 @@ impl HttpFilter for GuardrailsFilter {
224224
&self,
225225
ctx: &mut HttpFilterContext<'_>,
226226
body: &mut Option<Bytes>,
227-
_end_of_stream: bool,
227+
end_of_stream: bool,
228228
) -> Result<FilterAction, FilterError> {
229+
if !end_of_stream {
230+
return Ok(FilterAction::Continue);
231+
}
232+
229233
let Some(chunk) = body.as_ref() else {
230234
write_result(ctx, "passed");
231235
return Ok(FilterAction::Continue);

0 commit comments

Comments
 (0)