Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tracing-subscriber = "0.3"
metrics = { version = "0.24.2", optional = true }
metrics-exporter-prometheus = { version = "0.17.2", optional = true, features = ["http-listener"] }
thiserror = "2.0.12"
static_assertions = "1"

[dev-dependencies]
rstest = "0.26.1"
Expand Down
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,41 @@ big‑endian header【F:docs/rust-binary-router-library-design.md†L1082-L1123
let app = WireframeApp::new()?;
```

## Connection lifecycle
## Push Queues

Push queues buffer frames before they are written to a connection. Configure
them with capacities, rate limits, and an optional dead-letter queue:

```rust,no_run
use tokio::sync::mpsc;
use wireframe::push::PushQueues;

# async fn demo() {
let (dlq_tx, _dlq_rx) = mpsc::channel(8);
let (_queues, _handle) = PushQueues::<u8>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(Some(100))
.dlq(Some(dlq_tx)) // frames drop if the DLQ is absent or full
.build()
.expect("failed to build PushQueues");
# drop((_queues, _handle));
# }
```

Disable throttling with the `unlimited` convenience:

```rust,no_run
use wireframe::push::PushQueues;
let (_queues, _handle) = PushQueues::<u8>::builder()
.high_capacity(8)
.low_capacity(8)
.unlimited()
.build()
.expect("failed to build PushQueues");
```

## Connection Lifecycle

Protocol callbacks are consolidated under the `WireframeProtocol` trait,
replacing the individual `on_connection_setup`/`on_connection_teardown`
Expand Down
26 changes: 17 additions & 9 deletions docs/asynchronous-outbound-messaging-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ any part of an application—a request handler, a background timer, a separate
worker task—to push frames to a live connection at any time.

Earlier releases spawned a short-lived worker per request. This approach made
persistent state awkward and required extra synchronisation when multiple tasks
persistent state awkward and required extra synchronization when multiple tasks
needed to write to the same socket. The new design promotes each connection to
a **stateful actor** that owns its context for the lifetime of the session.
Actor state keeps sequencing rules and push queues local to one task,
Expand Down Expand Up @@ -60,7 +60,7 @@ metadata, and pending pushes—without cross-task sharing. Handlers now send
commands back to the actor instead of writing directly to the socket,
centralizing all output in one place.

### 3.1 Prioritised Message Queues
### 3.1 Prioritized Message Queues

To handle different classes of outbound messages, each connection actor will
manage two distinct, bounded `tokio::mpsc` channels for pushed frames:
Expand All @@ -77,7 +77,7 @@ back-pressure mechanism. When a channel's buffer is full, any task attempting
to push a new message will be asynchronously suspended until space becomes
available.

### 3.2 The Prioritised Write Loop
### 3.2 The Prioritized Write Loop

The connection actor's write logic will be implemented within a
`tokio::select!` loop. Crucially, this loop will use the `biased` keyword to
Expand Down Expand Up @@ -155,7 +155,7 @@ under sustained high-priority load.

<!-- markdownlint-disable MD033 -->

The flow diagram below summarises the fairness logic.
The flow diagram below summarizes the fairness logic.

<description>The diagram shows how the actor yields to the low-priority queue
after N high-priority frames.</description>
Expand Down Expand Up @@ -384,12 +384,20 @@ classDiagram
class PushQueues~F~ {
+high_priority_rx: mpsc::Receiver<F>
+low_priority_rx: mpsc::Receiver<F>
+bounded(high_capacity: usize, low_capacity: usize): (PushQueues~F~, PushHandle~F~)
+builder(): PushQueuesBuilder~F~
+recv(): Option<(PushPriority, F)>
}
class PushQueuesBuilder~F~ {
+high_capacity(cap: usize): PushQueuesBuilder~F~
+low_capacity(cap: usize): PushQueuesBuilder~F~
+rate(rate: Option<usize>): PushQueuesBuilder~F~
+dlq(sender: Option<mpsc::Sender<F>>): PushQueuesBuilder~F~
+build(): (PushQueues~F~, PushHandle~F~)
}

PushHandleInner <.. PushHandle~F~ : contains
PushQueues~F~ o-- PushHandle~F~ : bounded(high_capacity, low_capacity)
PushQueues~F~ o-- PushQueuesBuilder~F~ : builder()
PushQueuesBuilder~F~ o-- PushHandle~F~ : build()
PushHandle --> PushPriority
PushHandle --> PushPolicy
PushHandle --> PushError
Expand Down Expand Up @@ -483,7 +491,7 @@ so pruning while collecting may contend more than the previous post-collection
`remove_if` sweep. Maintenance tasks may instead invoke `prune()` to avoid this
contention.

The diagram below summarises the data structures and how they interact when
The diagram below summarizes the data structures and how they interact when
storing session handles. `SessionRegistry` maps `ConnectionId`s to weak
references of `PushHandleInner<F>` so closed connections do not stay alive.

Expand Down Expand Up @@ -534,7 +542,7 @@ sequenceDiagram

### 4.3 Configuration via the `WireframeProtocol` Trait

To provide a clean, organised, and extensible configuration API, all
To provide a clean, organized, and extensible configuration API, all
protocol-specific logic and callbacks will be encapsulated within a single
`WireframeProtocol` trait. This is a significant ergonomic improvement over
using a collection of individual closures.
Expand Down Expand Up @@ -733,7 +741,7 @@ sequenceDiagram
This design is explicitly intended to work in concert with the other major
features of the 1.0 release.

- **Streaming Responses:** The prioritised write loop (Section 3.2) naturally
- **Streaming Responses:** The prioritized write loop (Section 3.2) naturally
handles the interleaving of pushed messages and streaming responses, ensuring
that urgent pushes can interrupt a long-running data stream.

Expand Down
24 changes: 19 additions & 5 deletions docs/hardening-wireframe-a-guide-to-production-resilience.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,20 @@ token-bucket algorithm is ideal.
use wireframe::push::{PushQueues, MAX_PUSH_RATE};

// Configure a connection to allow at most MAX_PUSH_RATE pushes per second.
let (queues, handle) = PushQueues::<Frame>::bounded_with_rate(8, 8, Some(MAX_PUSH_RATE))
let (queues, handle) = PushQueues::<Frame>::builder()
.high_capacity(8)
.low_capacity(8)
.rate(Some(MAX_PUSH_RATE))
.build()
.expect("rate within supported bounds");

// Passing `None` disables rate limiting entirely:
let (_unlimited, _handle) = PushQueues::<Frame>::bounded_no_rate_limit(8, 8);
// Calling `unlimited` disables rate limiting entirely:
let (_unlimited, _handle) = PushQueues::<Frame>::builder()
.high_capacity(8)
.low_capacity(8)
.unlimited()
.build()
.expect("failed to build unlimited queues");

// Inside PushHandle::push()
async fn push(&self, frame: Frame) -> Result<(), PushError> {
Expand Down Expand Up @@ -324,7 +333,7 @@ operation, rather than just abruptly closing the connection.

When a stream concludes successfully, the connection actor calls the
`stream_end_frame` hook to produce a terminator frame with no payload. This
explicit marker lets clients recognise that the logical stream has ended and
explicit marker lets clients recognize that the logical stream has ended and
helps avoid lingering resources or stalled state machines. The terminator is
only appended if the protocol supplies one (that is, the hook returns
`Some(frame)`), and the frame passes through the `before_send` hook like any
Expand All @@ -349,7 +358,7 @@ match self.tx.try_send(frame) {
if let Some(dlq_tx) = &self.dlq_tx {
// Attempt to route the failed frame to the DLQ.
if dlq_tx.try_send(failed_frame).is_err() {
tracing::error!("Push queue and DLQ are both full. Frame lost.");
tracing::warn!("Push queue and DLQ are both full. Frame lost.");
}
} else {
// Default behaviour: drop the frame.
Expand All @@ -362,6 +371,11 @@ match self.tx.try_send(frame) {

```

By default the library logs each DLQ drop to maximise visibility during
development and testing. Applications can tune verbosity using the push queue
builder's `dlq_log_every_n` and `dlq_log_interval` settings to throttle
warnings per handle in production.

A separate part of the application is then responsible for consuming from the
DLQ's receiver to inspect, log, and re-process these failed messages, ensuring
zero message loss even under transient high load.
Expand Down
2 changes: 1 addition & 1 deletion docs/multi-packet-and-streaming-responses-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ not hang.

## 7. Synergy with Other 1.0 Features

- **Asynchronous Pushes:** The connection actor's prioritised write loop (as
- **Asynchronous Pushes:** The connection actor's prioritized write loop (as
defined in the outbound messaging design) will always poll for pushed
messages *before* polling the response stream. This ensures that urgent,
out-of-band messages are not starved by a long-running data stream.
Expand Down
48 changes: 29 additions & 19 deletions src/app/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, sync::Arc};

use bytes::BytesMut;
use futures::{SinkExt, StreamExt};
use log::{debug, warn};
use tokio::{
io::{self, AsyncRead, AsyncWrite, AsyncWriteExt},
time::{Duration, timeout},
Expand Down Expand Up @@ -138,7 +139,10 @@ where
.clone();

if let Err(e) = self.process_stream(stream, &routes).await {
tracing::warn!(correlation_id = ?None::<u64>, error = ?e, "connection terminated with error");
warn!(
"connection terminated with error: correlation_id={:?}, error={e:?}",
None::<u64>
);
}

if let (Some(teardown), Some(state)) = (&self.on_disconnect, state) {
Expand Down Expand Up @@ -181,7 +185,7 @@ where
Ok(Some(Err(e))) => return Err(e),
Ok(None) => break,
Err(_) => {
tracing::debug!("read timeout elapsed; continuing to wait for next frame");
debug!("read timeout elapsed; continuing to wait for next frame");
}
}
}
Expand All @@ -207,7 +211,10 @@ where
}
Err(EnvelopeDecodeError::Parse(e)) => {
*deser_failures += 1;
tracing::warn!(correlation_id = ?None::<u64>, error = ?e, "failed to parse message");
warn!(
"failed to parse message: correlation_id={:?}, error={e:?}",
None::<u64>
);
crate::metrics::inc_deser_errors();
if *deser_failures >= MAX_DESER_FAILURES {
return Err(io::Error::new(
Expand All @@ -219,7 +226,10 @@ where
}
Err(EnvelopeDecodeError::Deserialize(e)) => {
*deser_failures += 1;
tracing::warn!(correlation_id = ?None::<u64>, error = ?e, "failed to deserialize message");
warn!(
"failed to deserialize message: correlation_id={:?}, error={e:?}",
None::<u64>
);
crate::metrics::inc_deser_errors();
if *deser_failures >= MAX_DESER_FAILURES {
return Err(io::Error::new(
Expand All @@ -242,36 +252,36 @@ where
match self.serializer.serialize(&response) {
Ok(bytes) => {
if let Err(e) = framed.send(bytes.into()).await {
tracing::warn!(
id = env.id,
correlation_id = ?correlation_id,
error = ?e,
"failed to send response",
warn!(
"failed to send response: id={}, correlation_id={:?}, \
error={e:?}",
env.id, correlation_id
);
crate::metrics::inc_handler_errors();
}
}
Err(e) => {
tracing::warn!(
id = env.id,
correlation_id = ?correlation_id,
error = ?e,
"failed to serialize response",
warn!(
"failed to serialize response: id={}, correlation_id={:?}, \
error={e:?}",
env.id, correlation_id
);
crate::metrics::inc_handler_errors();
}
}
}
Err(e) => {
tracing::warn!(id = env.id, correlation_id = ?env.correlation_id, error = ?e, "handler error");
warn!(
"handler error: id={}, correlation_id={:?}, error={e:?}",
env.id, env.correlation_id
);
crate::metrics::inc_handler_errors();
}
}
} else {
tracing::warn!(
id = env.id,
correlation_id = ?env.correlation_id,
"no handler for message id"
warn!(
"no handler for message id: id={}, correlation_id={:?}",
env.id, env.correlation_id
);
}

Expand Down
8 changes: 4 additions & 4 deletions src/app/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ impl PacketParts {
pub fn inherit_correlation(mut self, source: Option<u64>) -> Self {
let (next, mismatched) = Self::select_correlation(self.correlation_id, source);
if mismatched && let (Some(found), Some(expected)) = (self.correlation_id, next) {
tracing::warn!(
id = self.id,
log::warn!(
"mismatched correlation id in response: id={}, expected={}, found={}",
self.id,
expected,
found,
"mismatched correlation id in response",
found
);
}
self.correlation_id = next;
Expand Down
Loading
Loading