Skip to content

Commit c64371b

Browse files
committed
Refine DLQ logging and showcase unlimited builder
1 parent a6abe9f commit c64371b

13 files changed

Lines changed: 45 additions & 34 deletions

docs/efficiency-report.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,21 +89,22 @@ known.
8989

9090
### Frame Processing Pipeline
9191

92-
- **Bottleneck**: Frame decode/encode operations in high-throughput scenarios
93-
- **Critical Path**: `LengthPrefixedProcessor::decode` method
94-
- **Optimization Priority**: High—affects every incoming frame
92+
- **Bottleneck**: The frame decoding and encoding operations in
93+
high-throughput scenarios
94+
- **Critical path**: `LengthPrefixedProcessor::decode`
95+
- **Optimisation priority**: High—affects every incoming frame
9596

9697
### Connection Handling
9798

98-
- **Bottleneck**: Connection actor event loop and fairness tracking
99-
- **Critical Path**: `tokio::select!` in connection actor
100-
- **Optimization Priority**: Medium—affects per-connection performance
99+
- **Bottleneck**: The connection actor’s event loop and fairness tracking
100+
- **Critical path**: `tokio::select!` in the connection actor
101+
- **Optimisation priority**: Medium—affects per-connection performance
101102

102103
### Message Routing
103104

104-
- **Bottleneck**: HashMap lookups for route resolution
105-
- **Critical Path**: Route handler lookup in `WireframeApp`
106-
- **Optimization Priority**: Low—HashMap lookups are already efficient
105+
- **Bottleneck**: The `HashMap` lookups for route resolution
106+
- **Critical path**: Route handler lookup in `WireframeApp`
107+
- **Optimisation priority**: Low—`HashMap` lookups are already efficient
107108

108109
## Implemented Optimizations
109110

@@ -133,8 +134,8 @@ Ok(Some(src.split_to(len).freeze().to_vec()))
133134

134135
## Future Optimization Opportunities
135136

136-
1. **Frame Type Optimization**: Consider changing the frame type from `Vec<u8>`
137-
to `Bytes` to eliminate the final `.to_vec()` call entirely.
137+
1. **Frame type optimisation**: Replace `Vec<u8>` with `Bytes` end to end to
138+
eliminate the final `.to_vec()` copy on the hot path.
138139

139140
2. **Connection Actor Pooling**: Implement connection actor pooling to reduce
140141
setup/teardown overhead.

docs/hardening-wireframe-a-guide-to-production-resilience.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ let (queues, handle) = PushQueues::<Frame>::builder()
260260
.build()
261261
.expect("rate within supported bounds");
262262

263-
// Passing `None` disables rate limiting entirely:
263+
// Calling `unlimited` disables rate limiting entirely:
264264
let (_unlimited, _handle) = PushQueues::<Frame>::builder()
265265
.high_capacity(8)
266266
.low_capacity(8)
267-
.rate(None)
267+
.unlimited()
268268
.build()
269269
.expect("failed to build unlimited queues");
270270

src/push/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
//! let (queues, handle) = PushQueues::<u8>::builder()
1212
//! .high_capacity(8)
1313
//! .low_capacity(8)
14+
//! .unlimited() // disables rate limiting
1415
//! .build()
1516
//! .expect("failed to build PushQueues");
1617
//! # drop((queues, handle));

src/push/queues/builder.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,11 @@ impl<F: FrameLike> PushQueuesBuilder<F> {
124124
}
125125

126126
/// Log dropped frames every `n` occurrences when the DLQ is full or closed.
127+
/// A value of `0` disables count-based logging; interval-based logging
128+
/// still applies.
127129
#[must_use]
128130
pub fn dlq_log_every_n(mut self, n: usize) -> Self {
131+
// Treat 0 as “disabled by count”; interval-based logging remains.
129132
self.dlq_log_every_n = n;
130133
self
131134
}
@@ -140,7 +143,8 @@ impl<F: FrameLike> PushQueuesBuilder<F> {
140143
/// Build the configured [`PushQueues`] and associated [`PushHandle`].
141144
///
142145
/// Validation of capacities and rate occurs only during the build step,
143-
/// not when setting individual fields.
146+
/// not when setting individual fields. A `dlq_log_every_n` of `0` disables
147+
/// count-based logging; interval-based logging still applies.
144148
///
145149
/// # Errors
146150
///

src/push/queues/errors.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use super::MAX_PUSH_RATE;
66

77
/// Errors that can occur when pushing a frame.
88
#[non_exhaustive]
9-
#[derive(Debug, Error)]
9+
#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)]
1010
pub enum PushError {
1111
/// The queue was at capacity and the policy was `ReturnErrorIfFull`.
1212
#[error("push queue full")]
@@ -18,12 +18,12 @@ pub enum PushError {
1818

1919
/// Errors returned when creating push queues.
2020
#[non_exhaustive]
21-
#[derive(Debug, Error)]
21+
#[derive(Debug, Error, Clone, PartialEq, Eq)]
2222
pub enum PushConfigError {
23-
/// The provided rate was zero or exceeded [`MAX_PUSH_RATE`].
23+
/// The provided rate was zero or exceeded [`crate::push::queues::MAX_PUSH_RATE`].
2424
#[error("invalid rate {0}; must be between 1 and {max}", max = MAX_PUSH_RATE)]
2525
InvalidRate(usize),
2626
/// The provided capacities were zero.
27-
#[error("invalid capacities; high={high}, low={low}; each must be 1")]
27+
#[error("invalid capacities; high={high}, low={low}; each must be >= 1")]
2828
InvalidCapacity { high: usize, low: usize },
2929
}

src/push/queues/handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl<F: FrameLike> PushHandle<F> {
154154
let dropped = self.0.dlq_drops.fetch_add(1, Ordering::Relaxed) + 1;
155155
let mut last = self.0.dlq_last_log.lock().expect("lock poisoned");
156156
let now = Instant::now();
157-
if dropped.is_multiple_of(log_every_n)
157+
if (log_every_n != 0 && dropped.is_multiple_of(log_every_n))
158158
|| now.duration_since(*last) > log_interval
159159
{
160160
warn!(

src/push/queues/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,7 @@ impl<F: FrameLike> PushQueues<F> {
149149
limiter,
150150
dlq_tx: dlq,
151151
dlq_drops: AtomicUsize::new(0),
152-
dlq_last_log: Mutex::new(
153-
Instant::now()
154-
.checked_sub(dlq_log_interval)
155-
.unwrap_or_else(Instant::now),
156-
),
152+
dlq_last_log: Mutex::new(Instant::now()),
157153
dlq_log_every_n,
158154
dlq_log_interval,
159155
};

tests/advanced/concurrency_loom.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ fn concurrent_push_delivery() {
2424
let (queues, handle) = PushQueues::<u8>::builder()
2525
.high_capacity(1)
2626
.low_capacity(1)
27+
.unlimited()
2728
.build()
2829
.expect("failed to build PushQueues");
2930
let token = CancellationToken::new();

tests/advanced/interaction_fuzz.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use wireframe::{
1515
response::FrameStream,
1616
};
1717

18+
#[path = "../support.rs"]
19+
mod support;
20+
1821
#[derive(Debug, Clone)]
1922
enum Action {
2023
High(u8),
@@ -23,9 +26,10 @@ enum Action {
2326
}
2427

2528
async fn run_actions(actions: &[Action]) -> Vec<u8> {
26-
let (queues, handle) = PushQueues::<u8>::builder()
29+
let (queues, handle) = support::builder::<u8>()
2730
.high_capacity(16)
2831
.low_capacity(16)
32+
.unlimited()
2933
.build()
3034
.expect("failed to build PushQueues");
3135
let shutdown = CancellationToken::new();

tests/correlation_id.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async fn stream_frames_carry_request_correlation_id() {
1818
let (queues, handle) = PushQueues::<Envelope>::builder()
1919
.high_capacity(1)
2020
.low_capacity(1)
21-
.rate(None)
21+
.unlimited()
2222
.build()
2323
.expect("failed to build PushQueues");
2424
let shutdown = CancellationToken::new();

0 commit comments

Comments
 (0)