Skip to content

Commit 3195dfb

Browse files
authored
Test invalid builder rates (#339)
* Use Oxford -ize spelling and expect in docs * Add configurable DLQ logging and builder tests * Propagate send error on closed queue * Refactor push queue build configuration * Split connection actor tests and refine docs * Refactor DLQ constructor * Derive PushQueueConfig Debug and fix docs * Document unlimited builder and reuse test fixtures * Refine DLQ logging and showcase unlimited builder * Use shared builder in stream-end tests * Fix rate limiter hang; switch push handle to log facade and adjust DLQ logging - Avoid reserving tokens for probe-only futures by replacing with non-blocking loop plus short sleeps. - Use macros in push handle so captures warnings. - Log every DLQ drop by default to make tests deterministic. - Update docs to describe configurable DLQ logging cadence. * Standardize logging on log facade; add limiter regression test\n\n- Replace tracing macros with log across server/app/connection.\n- Emit both log and tracing panic events to satisfy tests.\n- Add regression test to guard rate limiter token reservation.\n- Keep DLQ logging default verbose for test determinism. --------- Co-authored-by: Payton McIntosh <pmcintosh@df12.net>
1 parent 745859e commit 3195dfb

36 files changed

Lines changed: 1946 additions & 1186 deletions

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ tracing-subscriber = "0.3"
3737
metrics = { version = "0.24.2", optional = true }
3838
metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = ["http-listener"] }
3939
thiserror = "2.0.12"
40+
static_assertions = "1"
4041

4142
[dev-dependencies]
4243
rstest = "0.26.1"

README.md

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,41 @@ big‑endian header【F:docs/rust-binary-router-library-design.md†L1082-L1123
140140
let app = WireframeApp::new()?;
141141
```
142142

143-
## Connection lifecycle
143+
## Push Queues
144+
145+
Push queues buffer frames before they are written to a connection. Configure
146+
them with capacities, rate limits, and an optional dead-letter queue:
147+
148+
```rust,no_run
149+
use tokio::sync::mpsc;
150+
use wireframe::push::PushQueues;
151+
152+
# async fn demo() {
153+
let (dlq_tx, _dlq_rx) = mpsc::channel(8);
154+
let (_queues, _handle) = PushQueues::<u8>::builder()
155+
.high_capacity(8)
156+
.low_capacity(8)
157+
.rate(Some(100))
158+
.dlq(Some(dlq_tx)) // frames drop if the DLQ is absent or full
159+
.build()
160+
.expect("failed to build PushQueues");
161+
# drop((_queues, _handle));
162+
# }
163+
```
164+
165+
Disable throttling with the `unlimited` convenience:
166+
167+
```rust,no_run
168+
use wireframe::push::PushQueues;
169+
let (_queues, _handle) = PushQueues::<u8>::builder()
170+
.high_capacity(8)
171+
.low_capacity(8)
172+
.unlimited()
173+
.build()
174+
.expect("failed to build PushQueues");
175+
```
176+
177+
## Connection Lifecycle
144178

145179
Protocol callbacks are consolidated under the `WireframeProtocol` trait,
146180
replacing the individual `on_connection_setup`/`on_connection_teardown`

docs/asynchronous-outbound-messaging-design.md

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ any part of an application—a request handler, a background timer, a separate
1616
worker task—to push frames to a live connection at any time.
1717

1818
Earlier releases spawned a short-lived worker per request. This approach made
19-
persistent state awkward and required extra synchronisation when multiple tasks
19+
persistent state awkward and required extra synchronization when multiple tasks
2020
needed to write to the same socket. The new design promotes each connection to
2121
a **stateful actor** that owns its context for the lifetime of the session.
2222
Actor state keeps sequencing rules and push queues local to one task,
@@ -60,7 +60,7 @@ metadata, and pending pushes—without cross-task sharing. Handlers now send
6060
commands back to the actor instead of writing directly to the socket,
6161
centralizing all output in one place.
6262

63-
### 3.1 Prioritised Message Queues
63+
### 3.1 Prioritized Message Queues
6464

6565
To handle different classes of outbound messages, each connection actor will
6666
manage two distinct, bounded `tokio::mpsc` channels for pushed frames:
@@ -77,7 +77,7 @@ back-pressure mechanism. When a channel's buffer is full, any task attempting
7777
to push a new message will be asynchronously suspended until space becomes
7878
available.
7979

80-
### 3.2 The Prioritised Write Loop
80+
### 3.2 The Prioritized Write Loop
8181

8282
The connection actor's write logic will be implemented within a
8383
`tokio::select!` loop. Crucially, this loop will use the `biased` keyword to
@@ -155,7 +155,7 @@ under sustained high-priority load.
155155

156156
<!-- markdownlint-disable MD033 -->
157157

158-
The flow diagram below summarises the fairness logic.
158+
The flow diagram below summarizes the fairness logic.
159159

160160
<description>The diagram shows how the actor yields to the low-priority queue
161161
after N high-priority frames.</description>
@@ -384,12 +384,20 @@ classDiagram
384384
class PushQueues~F~ {
385385
+high_priority_rx: mpsc::Receiver<F>
386386
+low_priority_rx: mpsc::Receiver<F>
387-
+bounded(high_capacity: usize, low_capacity: usize): (PushQueues~F~, PushHandle~F~)
387+
+builder(): PushQueuesBuilder~F~
388388
+recv(): Option<(PushPriority, F)>
389389
}
390+
class PushQueuesBuilder~F~ {
391+
+high_capacity(cap: usize): PushQueuesBuilder~F~
392+
+low_capacity(cap: usize): PushQueuesBuilder~F~
393+
+rate(rate: Option<usize>): PushQueuesBuilder~F~
394+
+dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
395+
+build(): (PushQueues~F~, PushHandle~F~)
396+
}
390397
391398
PushHandleInner <.. PushHandle~F~ : contains
392-
PushQueues~F~ o-- PushHandle~F~ : bounded(high_capacity, low_capacity)
399+
PushQueues~F~ o-- PushQueuesBuilder~F~ : builder()
400+
PushQueuesBuilder~F~ o-- PushHandle~F~ : build()
393401
PushHandle --> PushPriority
394402
PushHandle --> PushPolicy
395403
PushHandle --> PushError
@@ -483,7 +491,7 @@ so pruning while collecting may contend more than the previous post-collection
483491
`remove_if` sweep. Maintenance tasks may instead invoke `prune()` to avoid this
484492
contention.
485493

486-
The diagram below summarises the data structures and how they interact when
494+
The diagram below summarizes the data structures and how they interact when
487495
storing session handles. `SessionRegistry` maps `ConnectionId`s to weak
488496
references of `PushHandleInner<F>` so closed connections do not stay alive.
489497

@@ -534,7 +542,7 @@ sequenceDiagram
534542

535543
### 4.3 Configuration via the `WireframeProtocol` Trait
536544

537-
To provide a clean, organised, and extensible configuration API, all
545+
To provide a clean, organized, and extensible configuration API, all
538546
protocol-specific logic and callbacks will be encapsulated within a single
539547
`WireframeProtocol` trait. This is a significant ergonomic improvement over
540548
using a collection of individual closures.
@@ -733,7 +741,7 @@ sequenceDiagram
733741
This design is explicitly intended to work in concert with the other major
734742
features of the 1.0 release.
735743

736-
- **Streaming Responses:** The prioritised write loop (Section 3.2) naturally
744+
- **Streaming Responses:** The prioritized write loop (Section 3.2) naturally
737745
handles the interleaving of pushed messages and streaming responses, ensuring
738746
that urgent pushes can interrupt a long-running data stream.
739747

docs/hardening-wireframe-a-guide-to-production-resilience.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,20 @@ token-bucket algorithm is ideal.
253253
use wireframe::push::{PushQueues, MAX_PUSH_RATE};
254254

255255
// Configure a connection to allow at most MAX_PUSH_RATE pushes per second.
256-
let (queues, handle) = PushQueues::<Frame>::bounded_with_rate(8, 8, Some(MAX_PUSH_RATE))
256+
let (queues, handle) = PushQueues::<Frame>::builder()
257+
.high_capacity(8)
258+
.low_capacity(8)
259+
.rate(Some(MAX_PUSH_RATE))
260+
.build()
257261
.expect("rate within supported bounds");
258262

259-
// Passing `None` disables rate limiting entirely:
260-
let (_unlimited, _handle) = PushQueues::<Frame>::bounded_no_rate_limit(8, 8);
263+
// Calling `unlimited` disables rate limiting entirely:
264+
let (_unlimited, _handle) = PushQueues::<Frame>::builder()
265+
.high_capacity(8)
266+
.low_capacity(8)
267+
.unlimited()
268+
.build()
269+
.expect("failed to build unlimited queues");
261270

262271
// Inside PushHandle::push()
263272
async fn push(&self, frame: Frame) -> Result<(), PushError> {
@@ -324,7 +333,7 @@ operation, rather than just abruptly closing the connection.
324333

325334
When a stream concludes successfully, the connection actor calls the
326335
`stream_end_frame` hook to produce a terminator frame with no payload. This
327-
explicit marker lets clients recognise that the logical stream has ended and
336+
explicit marker lets clients recognize that the logical stream has ended and
328337
helps avoid lingering resources or stalled state machines. The terminator is
329338
only appended if the protocol supplies one (that is, the hook returns
330339
`Some(frame)`), and the frame passes through the `before_send` hook like any
@@ -349,7 +358,7 @@ match self.tx.try_send(frame) {
349358
if let Some(dlq_tx) = &self.dlq_tx {
350359
// Attempt to route the failed frame to the DLQ.
351360
if dlq_tx.try_send(failed_frame).is_err() {
352-
tracing::error!("Push queue and DLQ are both full. Frame lost.");
361+
tracing::warn!("Push queue and DLQ are both full. Frame lost.");
353362
}
354363
} else {
355364
// Default behaviour: drop the frame.
@@ -362,6 +371,11 @@ match self.tx.try_send(frame) {
362371

363372
```
364373

374+
By default the library logs each DLQ drop to maximise visibility during
375+
development and testing. Applications can tune verbosity using the push queue
376+
builder's `dlq_log_every_n` and `dlq_log_interval` settings to throttle
377+
warnings per handle in production.
378+
365379
A separate part of the application is then responsible for consuming from the
366380
DLQ's receiver to inspect, log, and re-process these failed messages, ensuring
367381
zero message loss even under transient high load.

docs/multi-packet-and-streaming-responses-design.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ not hang.
224224

225225
## 7. Synergy with Other 1.0 Features
226226

227-
- **Asynchronous Pushes:** The connection actor's prioritised write loop (as
227+
- **Asynchronous Pushes:** The connection actor's prioritized write loop (as
228228
defined in the outbound messaging design) will always poll for pushed
229229
messages *before* polling the response stream. This ensures that urgent,
230230
out-of-band messages are not starved by a long-running data stream.

src/app/connection.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{collections::HashMap, sync::Arc};
44

55
use bytes::BytesMut;
66
use futures::{SinkExt, StreamExt};
7+
use log::{debug, warn};
78
use tokio::{
89
io::{self, AsyncRead, AsyncWrite, AsyncWriteExt},
910
time::{Duration, timeout},
@@ -138,7 +139,10 @@ where
138139
.clone();
139140

140141
if let Err(e) = self.process_stream(stream, &routes).await {
141-
tracing::warn!(correlation_id = ?None::<u64>, error = ?e, "connection terminated with error");
142+
warn!(
143+
"connection terminated with error: correlation_id={:?}, error={e:?}",
144+
None::<u64>
145+
);
142146
}
143147

144148
if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) {
@@ -181,7 +185,7 @@ where
181185
Ok(Some(Err(e))) => return Err(e),
182186
Ok(None) => break,
183187
Err(_) => {
184-
tracing::debug!("read timeout elapsed; continuing to wait for next frame");
188+
debug!("read timeout elapsed; continuing to wait for next frame");
185189
}
186190
}
187191
}
@@ -207,7 +211,10 @@ where
207211
}
208212
Err(EnvelopeDecodeError::Parse(e)) => {
209213
*deser_failures += 1;
210-
tracing::warn!(correlation_id = ?None::<u64>, error = ?e, "failed to parse message");
214+
warn!(
215+
"failed to parse message: correlation_id={:?}, error={e:?}",
216+
None::<u64>
217+
);
211218
crate::metrics::inc_deser_errors();
212219
if *deser_failures >= MAX_DESER_FAILURES {
213220
return Err(io::Error::new(
@@ -219,7 +226,10 @@ where
219226
}
220227
Err(EnvelopeDecodeError::Deserialize(e)) => {
221228
*deser_failures += 1;
222-
tracing::warn!(correlation_id = ?None::<u64>, error = ?e, "failed to deserialize message");
229+
warn!(
230+
"failed to deserialize message: correlation_id={:?}, error={e:?}",
231+
None::<u64>
232+
);
223233
crate::metrics::inc_deser_errors();
224234
if *deser_failures >= MAX_DESER_FAILURES {
225235
return Err(io::Error::new(
@@ -242,36 +252,36 @@ where
242252
match self.serializer.serialize(&response) {
243253
Ok(bytes) => {
244254
if let Err(e) = framed.send(bytes.into()).await {
245-
tracing::warn!(
246-
id = env.id,
247-
correlation_id = ?correlation_id,
248-
error = ?e,
249-
"failed to send response",
255+
warn!(
256+
"failed to send response: id={}, correlation_id={:?}, \
257+
error={e:?}",
258+
env.id, correlation_id
250259
);
251260
crate::metrics::inc_handler_errors();
252261
}
253262
}
254263
Err(e) => {
255-
tracing::warn!(
256-
id = env.id,
257-
correlation_id = ?correlation_id,
258-
error = ?e,
259-
"failed to serialize response",
264+
warn!(
265+
"failed to serialize response: id={}, correlation_id={:?}, \
266+
error={e:?}",
267+
env.id, correlation_id
260268
);
261269
crate::metrics::inc_handler_errors();
262270
}
263271
}
264272
}
265273
Err(e) => {
266-
tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, error = ?e, "handler error");
274+
warn!(
275+
"handler error: id={}, correlation_id={:?}, error={e:?}",
276+
env.id, env.correlation_id
277+
);
267278
crate::metrics::inc_handler_errors();
268279
}
269280
}
270281
} else {
271-
tracing::warn!(
272-
id = env.id,
273-
correlation_id = ?env.correlation_id,
274-
"no handler for message id"
282+
warn!(
283+
"no handler for message id: id={}, correlation_id={:?}",
284+
env.id, env.correlation_id
275285
);
276286
}
277287

src/app/envelope.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,11 @@ impl PacketParts {
138138
pub fn inherit_correlation(mut self, source: Option<u64>) -> Self {
139139
let (next, mismatched) = Self::select_correlation(self.correlation_id, source);
140140
if mismatched && let (Some(found), Some(expected)) = (self.correlation_id, next) {
141-
tracing::warn!(
142-
id = self.id,
141+
log::warn!(
142+
"mismatched correlation id in response: id={}, expected={}, found={}",
143+
self.id,
143144
expected,
144-
found,
145-
"mismatched correlation id in response",
145+
found
146146
);
147147
}
148148
self.correlation_id = next;

0 commit comments

Comments
 (0)