Skip to content

Add ComputeHandler as Middleware#3

Merged
marcomq merged 36 commits into
mainfrom
dev
Dec 21, 2025
Merged

Add ComputeHandler as Middleware#3
marcomq merged 36 commits into
mainfrom
dev

Conversation

@marcomq

@marcomq marcomq commented Dec 20, 2025

Copy link
Copy Markdown
Owner

Summary by CodeRabbit

  • New Features

    • Fanout publishing; Compute, Retry, and RandomPanic middleware for publishers/consumers.
    • Message IDs expanded to 128-bit/UUID-backed identifiers with broader parsing; NATS sequence-aware IDs; per-endpoint MQTT QoS and client settings.
    • MongoDB persists message IDs as binary UUIDs.
  • Tests

    • Added Criterion-based end-to-end performance benchmarks and updated integration performance helpers.
  • Chores / Documentation

    • Package renamed to mq-bridge and README fully rewritten.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Dec 20, 2025

Copy link
Copy Markdown

Caution

Review failed

The pull request is closed.

Walkthrough

Expanded CanonicalMessage ID to Option with UUID/hex/decimal/OID parsing and generation; added FanoutPublisher, Compute middleware (Compute trait + wrappers), RandomPanic and Retry middlewares, DLQ backoff refactor, APP_NAME constant, QoS and other endpoint config additions, Criterion benchmarks, and widespread endpoint/test call-site updates to match new APIs.

Changes

Cohort / File(s) Summary
Manifest & Bench
Cargo.toml, benches/performance_bench.rs
Package renamed to mq-bridge; added optional rustls-pemfile, dev-dep criterion, feature uuid in nats feature, and [[bench]] section; new Criterion-based benchmark file added.
Core constant
src/lib.rs
Added pub const APP_NAME: &str = env!("CARGO_PKG_NAME");.
CanonicalMessage API
src/canonical_message.rs
message_id: Option<u64>Option<u128>; constructor now new(payload: Vec<u8>, message_id: Option<u128>); added set_id, gen_id, with_gen_id, from_struct, get_struct; JSON parsing accepts UUID/hex/decimal/OID into u128.
Fanout & endpoint registry
src/endpoints/fanout.rs, src/endpoints/mod.rs
New FanoutPublisher aggregating Arc<dyn MessagePublisher>; EndpointType::Fanout(Vec<Endpoint>); create_base_publisher handles Fanout by composing sub-publishers; added integration test.
Endpoint call-site updates
src/endpoints/*.rs
All call sites updated to CanonicalMessage::new(payload, Option<u128>). NATS: sequence-aware canonicalization and APP_NAME usage; AMQP: routing_keyqueue; Kafka: async consumer ctor, composite u128 from partition+offset used as id/key; MQTT: per-instance QoS, APP_NAME client id, configurable keep_alive/clean_session; MongoDB: message_id becomes Option<Binary> and ObjectId/16-byte id logic; Memory: channel() helper and batch logging; File/HTTP/Static: constructor usage updated.
Middleware additions & DLQ refactor
src/middleware/compute.rs, src/middleware/mod.rs, src/middleware/retry.rs, src/middleware/random_panic.rs, src/middleware/dlq.rs
Added ComputeConsumer/ComputePublisher and wiring; added RetryPublisher and RandomPanic wrappers; DLQ now driven by DeadLetterQueueMiddleware config with next_backoff, config-driven retry/backoff, and adjusted return semantics.
Models & Traits
src/models.rs, src/traits.rs
Added ComputeHandler, EndpointType::Fanout, middleware variants (Retry, RandomPanic, Compute), RetryMiddleware, RandomPanicMiddleware, extended DLQ fields and endpoint/config options (NATS/AMQP/Mongo/MQTT); added async Compute trait with blanket impl.
Tests & integration changes
tests/integration/*, tests/integration_test.rs, tests/integration/common.rs, tests/*/docker-compose/*.yml
Tests updated to mq_bridge imports; Kafka/NATS constructors updated to async usage; delayed_ack flags toggled; performance helpers now return Duration; docker-compose container names updated; benchmark wiring added.
Misc / Minor
src/route.rs, src/middleware/{deduplication,metrics}.rs, tests/memory_test.rs, README.md
Branding/documentation updates (hot_queue → mq-bridge), README rewritten, small formatting/logging tweaks across files.

Sequence Diagram(s)

sequenceDiagram
    participant Producer
    participant FanoutPub as FanoutPublisher
    participant PubA as Publisher A
    participant PubB as Publisher B

    Producer->>FanoutPub: send(CanonicalMessage)
    note right of FanoutPub: clones message per publisher
    FanoutPub->>PubA: send(cloned message)
    PubA-->>FanoutPub: Ok(None)
    FanoutPub->>PubB: send(cloned message)
    PubB-->>FanoutPub: Ok(None)
    FanoutPub-->>Producer: Ok(None)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Pay extra attention to:
    • src/canonical_message.rs — parsing UUID/hex/decimal/OID → u128 and serialization edge cases.
    • src/endpoints/mongodb.rs — BSON Binary construction, ObjectId derivation and zero-padding to 16 bytes.
    • src/middleware/dlq.rs and src/middleware/retry.rs — backoff calculation, retry loops, and changed return semantics.
    • Async signature changes — Kafka and NATS constructors (ensure call sites await and pass new params).
    • benches/performance_bench.rs — async/Docker orchestration, feature gating, and public helpers.

Possibly related PRs

Poem

"I nibble bytes and spin a UUID bright,
I fan out carrots left and right.
I compute a hop, then forward the cheer,
Retry, DLQ, and panics sometimes near.
Benches hum — the bridge grows light. 🐇"

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 27.27% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'Add ComputeHandler as Middleware' is specific and relates to a significant portion of the changeset, including the new Compute trait, ComputeHandler struct, ComputeConsumer/ComputePublisher middleware, and integration into the middleware system.

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec6288d and 553dc50.

📒 Files selected for processing (1)
  • .github/workflows/ci.yml (1 hunks)

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/endpoints/memory.rs (1)

109-117: Clarify helper method behavior and potential test pitfalls.

The documentation states this helper "will create a separate Queue channel" for broadcast publishers, which is confusing and potentially dangerous for tests. If a test calls publisher.channel() expecting to inspect the same channel the publisher is writing to, but gets a different channel instead, the test will silently fail to observe the expected messages.

Consider:

  1. Clarifying when this returns the same vs. a different channel
  2. Adding a clearer warning about the broadcast case
  3. Or, removing this helper and requiring tests to use get_or_create_channel directly with the same config
🔎 Suggested documentation improvement
-    /// Note: This helper is primarily for tests expecting a Queue.    
-    /// If used on a broadcast publisher, it will create a separate Queue channel.
+    /// Helper to retrieve the memory channel for this publisher's topic.
+    /// 
+    /// ⚠️ **Warning for tests**: This retrieves or creates a channel using the topic name
+    /// with no explicit capacity. If this publisher was created with a specific capacity,
+    /// or is a broadcast-type publisher, this method may return a *different* channel
+    /// than the one the publisher is actually writing to, causing test assertions to fail.
+    /// 
+    /// For reliable testing, use `get_or_create_channel(&config)` with the exact same
+    /// `MemoryConfig` used to create the publisher.
     pub fn channel(&self) -> MemoryChannel {
🧹 Nitpick comments (3)
src/canonical_message.rs (1)

53-54: Casting negative i64 to u128 produces unexpected large values.

val.as_i64() can return negative numbers, and casting a negative i64 to u128 will produce a very large value due to two's complement wrapping (e.g., -1i64 as u128 = 18446744073709551615).

Consider handling this explicitly:

🔎 Proposed fix
             } else if let Some(n) = val.as_i64() {
-                message_id = Some(n as u128);
+                // Only convert non-negative values to avoid wrapping surprises
+                if n >= 0 {
+                    message_id = Some(n as u128);
+                }
             } else if let Some(n) = val.as_u64() {
src/middleware/compute.rs (2)

61-70: Consider parallel processing for batch transformations.

Similar to ComputeConsumer::receive_batch, messages are processed sequentially. If the ComputeHandler is thread-safe and stateless, consider concurrent processing for better throughput on large batches.

The current implementation is correct but may not be optimal for high-volume scenarios.


26-36: Consider parallel processing for batch transformations.

Messages are processed sequentially in the loop. For large batches, this could create a performance bottleneck. The Compute trait's Send + Sync bounds indicate concurrent execution is safe. Consider processing messages concurrently using futures::future::join_all or similar.

However, if the handler has side effects or requires ordered processing, the current sequential approach is correct.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e54acb9 and 1e9171f.

📒 Files selected for processing (19)
  • Cargo.toml (1 hunks)
  • src/canonical_message.rs (1 hunks)
  • src/endpoints/amqp.rs (3 hunks)
  • src/endpoints/fanout.rs (1 hunks)
  • src/endpoints/file.rs (1 hunks)
  • src/endpoints/http.rs (2 hunks)
  • src/endpoints/kafka.rs (2 hunks)
  • src/endpoints/memory.rs (2 hunks)
  • src/endpoints/mod.rs (3 hunks)
  • src/endpoints/mongodb.rs (3 hunks)
  • src/endpoints/mqtt.rs (3 hunks)
  • src/endpoints/nats.rs (10 hunks)
  • src/endpoints/static_endpoint.rs (2 hunks)
  • src/lib.rs (1 hunks)
  • src/middleware/compute.rs (1 hunks)
  • src/middleware/mod.rs (3 hunks)
  • src/models.rs (6 hunks)
  • src/route.rs (1 hunks)
  • src/traits.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
src/endpoints/fanout.rs (2)
src/canonical_message.rs (1)
  • new (21-27)
src/traits.rs (1)
  • send_batch_helper (110-140)
src/endpoints/nats.rs (1)
src/canonical_message.rs (2)
  • s (50-50)
  • new (21-27)
src/endpoints/kafka.rs (1)
src/models.rs (2)
  • new (45-47)
  • new (135-140)
src/middleware/mod.rs (2)
src/middleware/compute.rs (2)
  • new (13-15)
  • new (49-51)
src/models.rs (2)
  • new (45-47)
  • new (135-140)
src/middleware/compute.rs (3)
src/canonical_message.rs (1)
  • new (21-27)
src/models.rs (2)
  • new (45-47)
  • new (135-140)
src/traits.rs (2)
  • receive (55-65)
  • receive_batch (49-52)
🔇 Additional comments (45)
src/route.rs (1)

117-119: LGTM!

Formatting improvement with no functional changes.

src/lib.rs (1)

16-16: LGTM!

Exposing APP_NAME as a public constant is a good practice for consistent application naming across endpoints. Using env!("CARGO_PKG_NAME") ensures the name stays synchronized with Cargo.toml.

src/endpoints/mod.rs (2)

8-8: LGTM!

The fanout module is properly exposed for public use.


194-228: LGTM!

The integration test comprehensively validates the fanout behavior:

  • Creates two memory endpoints
  • Wraps them in a Fanout endpoint
  • Sends a message and verifies it reaches both destinations
  • Checks payload integrity
src/endpoints/static_endpoint.rs (1)

33-33: LGTM!

Both call sites correctly updated to use the new CanonicalMessage::new signature. Passing None for the message ID is appropriate for static endpoints.

Also applies to: 72-72

src/endpoints/file.rs (1)

123-123: LGTM!

Correctly updated to use the new CanonicalMessage::new signature. Passing None is appropriate for file-based messages.

src/endpoints/memory.rs (1)

131-134: LGTM!

Adding trace-level logging for batch operations improves observability without impacting performance.

src/endpoints/http.rs (1)

128-128: LGTM!

Both call sites correctly updated to use the new CanonicalMessage::new signature. Passing None is appropriate as HTTP requests/responses don't have inherent message IDs at construction time.

Also applies to: 249-249

Cargo.toml (1)

23-23: Remove the verification request for rustls-pemfile dependency—it is actively used.

The rustls-pemfile dependency is confirmed to be used in both src/endpoints/nats.rs and src/endpoints/mqtt.rs for parsing PEM-encoded certificates and keys. No action needed.

Likely an incorrect or invalid review comment.

src/endpoints/amqp.rs (3)

3-3: LGTM!

Import of APP_NAME for dynamic consumer tag naming is appropriate.


159-159: Good improvement: Dynamic consumer tag.

Using APP_NAME for the consumer tag prefix improves uniqueness and traceability across multiple instances.


228-232: LGTM!

The AMQP delivery_tag (u64) is safely widened to u128 for the new CanonicalMessage API. This preserves the delivery tag for potential downstream use (e.g., acknowledgments, tracing).

src/endpoints/kafka.rs (2)

146-151: LGTM!

Good approach: using the message_id bytes directly for the Kafka key when available provides deterministic partitioning, with UUID fallback for messages without an ID.


354-361: Composite message ID construction looks correct.

The approach of combining partition (high 64 bits) and offset (low 64 bits) into a u128 creates a unique identifier within a topic. Since Kafka partitions and offsets are non-negative in normal operation, the as u128 casts are safe.

Note: The comment accurately describes the bit layout, which aids maintainability.

src/traits.rs (1)

142-156: Clean trait design with ergonomic blanket implementation.

The Compute trait provides a minimal abstraction for message transformation, and the blanket implementation for async closures enables flexible, inline compute handlers without boilerplate. The trait bounds (Send + Sync) are appropriate for the async middleware context.

src/middleware/mod.rs (3)

11-11: LGTM!

Module declaration and imports for the new compute middleware are correctly added.

Also applies to: 18-18


44-46: Consistent middleware integration for consumers.

The Compute middleware is correctly integrated into the consumer middleware chain. The handler.clone() is efficient since ComputeHandler wraps an Arc.


76-78: Consistent middleware integration for publishers.

The Compute middleware is correctly integrated into the publisher middleware chain, following the same pattern as other middlewares.

src/endpoints/mqtt.rs (3)

7-7: LGTM!

Import of APP_NAME for dynamic client ID naming.


135-139: Excellent documentation on MQTT pkid limitations.

The comment clearly explains why pkid cannot be used for deduplication (it's reused by the broker), justifying the None value. This prevents subtle bugs from incorrect assumptions about message uniqueness.


252-252: LGTM!

Using APP_NAME as a prefix for the MQTT client ID improves uniqueness and makes it easier to identify clients in broker logs. The sanitize_for_client_id function ensures the resulting ID is valid.

src/endpoints/fanout.rs (4)

7-15: LGTM on the struct and constructor.

Clean and minimal design for the fanout publisher.


19-25: Early failure aborts fanout to remaining publishers.

The ? operator causes an early return on the first failure, meaning subsequent publishers in the list won't receive the message. Depending on your use case, you may want all publishers to attempt delivery regardless of individual failures, collecting errors at the end.

If partial delivery is acceptable, this is fine. If you need best-effort delivery to all targets, consider iterating through all publishers and aggregating errors.


27-37: LGTM on send_batch implementation.

Correctly delegates to send_batch_helper for consistent batch handling.


39-41: LGTM on as_any implementation.

Standard trait implementation for dynamic downcasting.

src/models.rs (4)

41-65: LGTM on ComputeHandler implementation.

Good design choices:

  • Arc<dyn Compute> allows shared ownership and dynamic dispatch
  • Custom Deserialize that always errors prevents accidental config-based instantiation
  • Debug provides useful output for logging

198-198: LGTM on Fanout endpoint type.

Clean addition to the enum allowing recursive endpoint definitions.


208-209: LGTM on Compute middleware variant.

Using #[serde(skip)] is the correct approach to exclude this from serialization/deserialization since it wraps the non-deserializable ComputeHandler.


596-629: LGTM on fanout deserialization test.

Good coverage for the new Fanout endpoint type, verifying nested endpoint parsing.

src/endpoints/nats.rs (6)

4-4: LGTM on new imports.

APP_NAME and Uuid imports support the new naming convention and ID parsing capabilities.

Also applies to: 16-16


169-174: LGTM on durable_name using APP_NAME.

Consistent naming convention across the codebase.


191-191: LGTM on queue_group using APP_NAME.

Maintains naming consistency with JetStream path.


201-239: Well-designed message ID resolution with graceful fallbacks.

Good implementation:

  • Prioritizes JetStream sequence (most reliable)
  • Falls back to Nats-Msg-Id header for Core NATS
  • Handles both UUID and raw u128 formats
  • Logs warning when parsing fails rather than erroring

250-251: LGTM on updated call sites.

All create_canonical_message calls correctly pass the sequence parameter.

Also applies to: 268-268, 294-295, 301-303, 335-335


312-324: LGTM on concurrent batch acknowledgment.

Using for_each_concurrent with a limit of 1000 is a reasonable balance between parallelism and resource consumption.

src/endpoints/mongodb.rs (4)

26-26: LGTM on type change.

Changing from i64 to Option<mongodb::bson::Binary> aligns with the u128 message_id representation.


41-47: LGTM on Binary to u128 conversion.

Right-aligned padding ensures smaller IDs are correctly zero-extended.


75-91: ObjectId derivation truncates first 4 bytes of u128.

Using the last 12 bytes of the 16-byte u128 representation for ObjectId means the first 4 bytes are discarded. For UUIDs (which use all 128 bits), this could theoretically cause _id collisions if two messages have the same lower 96 bits but different upper 32 bits.

In practice, UUID v4's randomness makes this extremely unlikely, but worth documenting. If message IDs are sequential or have patterns in the upper bits, consider an alternative approach like hashing the full 128 bits.


95-107: LGTM on document construction and return value.

Clean document structure with proper BSON types. Returning Ok(Some(message)) preserves the original message for downstream use.

src/canonical_message.rs (4)

14-14: LGTM on message_id type upgrade.

Changing from u64 to u128 accommodates UUIDs and provides a larger ID space.


21-27: LGTM on constructor signature change.

The new signature aligns with the updated message_id type.


29-40: LGTM on ID generation methods.

Clean builder pattern with with_gen_id for fluent usage.


67-74: LGTM on struct serialization helpers.

Convenient methods for common serialization/deserialization patterns.

src/middleware/compute.rs (2)

1-6: LGTM!

Imports are well-organized and all dependencies are necessary for the middleware implementation.


77-137: LGTM!

The tests provide good coverage for both consumer and publisher paths. The use of lambda-based handlers effectively demonstrates the middleware's transformation capabilities, and the assertions correctly verify the expected behavior.

Comment thread src/endpoints/mod.rs
Comment thread src/models.rs Outdated
}
has_dlq = true;
}
&crate::models::Middleware::Compute(_) => todo!(),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace todo!() with a proper match arm or unreachable!().

The todo!() macro will panic at runtime if a Compute middleware is ever encountered in this test. Since Compute cannot be deserialized from config (as per your design), this branch should never execute. Consider using unreachable!("Compute middleware cannot be deserialized") or simply {} to make the test more robust.

🔎 Proposed fix
-                &crate::models::Middleware::Compute(_) => todo!(),
+                Middleware::Compute(_) => unreachable!("Compute middleware cannot be deserialized from config"),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
&crate::models::Middleware::Compute(_) => todo!(),
&crate::models::Middleware::Compute(_) => unreachable!("Compute middleware cannot be deserialized from config"),
🤖 Prompt for AI Agents
In src/models.rs around line 487, the match arm for
&crate::models::Middleware::Compute(_) currently calls todo!(), which will panic
if ever hit; replace it with unreachable!("Compute middleware cannot be
deserialized") to express that this branch must never occur (alternatively use
an empty block {} if you prefer to silently ignore), ensuring the test doesn't
use todo!() at runtime.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
src/middleware/dlq.rs (1)

128-183: Consider extracting duplicate backoff logic into a helper.

The exponential backoff calculation pattern is repeated multiple times (lines 130, 158-161, 169-172). While functional, this could be consolidated into a small helper method.

🔎 Example helper method
fn next_backoff(&self, current: u64) -> u64 {
    let next = (current as f64 * self.config.dlq_multiplier) as u64;
    next.min(self.config.dlq_max_interval_ms)
}

Then use: backoff_ms = self.next_backoff(backoff_ms);

src/models.rs (1)

274-278: Add validation for probability field.

The probability field in RandomPanicMiddleware has no constraints to ensure it's within the valid range [0.0, 1.0]. Consider adding validation during deserialization to catch configuration errors early.

🔎 Proposed validation approach

You can add a custom deserializer or a validation function:

 #[derive(Debug, Deserialize, Serialize, Clone)]
 #[serde(deny_unknown_fields)]
 pub struct RandomPanicMiddleware {
+    #[serde(deserialize_with = "deserialize_probability")]
     pub probability: f64,
 }
+
+fn deserialize_probability<'de, D>(deserializer: D) -> Result<f64, D::Error>
+where
+    D: Deserializer<'de>,
+{
+    let value = f64::deserialize(deserializer)?;
+    if !(0.0..=1.0).contains(&value) {
+        return Err(serde::de::Error::custom(
+            "probability must be between 0.0 and 1.0"
+        ));
+    }
+    Ok(value)
+}
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e9171f and f0ae729.

📒 Files selected for processing (9)
  • src/endpoints/amqp.rs (4 hunks)
  • src/endpoints/mongodb.rs (4 hunks)
  • src/endpoints/mqtt.rs (11 hunks)
  • src/endpoints/nats.rs (10 hunks)
  • src/middleware/dlq.rs (10 hunks)
  • src/middleware/mod.rs (3 hunks)
  • src/middleware/random_panic.rs (1 hunks)
  • src/middleware/retry.rs (1 hunks)
  • src/models.rs (13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/endpoints/mqtt.rs
🧰 Additional context used
🧬 Code graph analysis (4)
src/endpoints/nats.rs (1)
src/canonical_message.rs (2)
  • s (50-50)
  • new (21-27)
src/middleware/retry.rs (4)
src/endpoints/mongodb.rs (6)
  • new (63-69)
  • new (133-174)
  • send (74-108)
  • send_batch (110-118)
  • as_any (120-122)
  • as_any (251-253)
src/endpoints/mqtt.rs (6)
  • new (25-39)
  • new (100-121)
  • send (66-75)
  • send_batch (77-85)
  • as_any (87-89)
  • as_any (166-168)
src/endpoints/nats.rs (6)
  • new (32-69)
  • new (145-201)
  • send (87-116)
  • send_batch (118-127)
  • as_any (129-131)
  • as_any (343-345)
src/middleware/random_panic.rs (6)
  • new (14-19)
  • new (54-59)
  • send (70-73)
  • send_batch (75-81)
  • as_any (43-45)
  • as_any (83-85)
src/middleware/mod.rs (1)
src/middleware/compute.rs (1)
  • new (13-15)
src/models.rs (1)
src/endpoints/memory.rs (4)
  • new (34-37)
  • new (102-108)
  • new (154-161)
  • len (72-74)
🔇 Additional comments (27)
src/endpoints/amqp.rs (4)

3-3: LGTM!

The APP_NAME import enables consistent consumer naming across the codebase.


53-53: LGTM!

Using config.exchange.clone().unwrap_or_default() appropriately defaults to the AMQP default exchange (empty string) when not configured.


150-157: LGTM!

Good improvements:

  • Making prefetch_count configurable with a sensible default of 100 provides operational flexibility.
  • Using APP_NAME for the consumer tag improves observability and identification.

227-230: LGTM!

Passing the delivery_tag as the message ID enables downstream tracking and correlation. The cast from u64 to u128 is safe and lossless.

src/middleware/dlq.rs (4)

19-19: LGTM!

Storing the full DeadLetterQueueMiddleware config provides access to all backoff parameters, enabling more flexible retry behavior.

Also applies to: 39-39


51-69: LGTM!

The exponential backoff implementation with configurable initial interval, multiplier, and max interval is well-structured. Capping at dlq_max_interval_ms prevents unbounded wait times.


102-104: LGTM!

Returning Ok(None) after successful DLQ delivery correctly signals to upstream consumers that the message was "handled" and should be committed, preventing redelivery loops.


147-149: LGTM!

Returning an empty Vec::new() for the failed list after successful DLQ delivery correctly signals batch completion, allowing upstream consumers to commit the entire batch.

Also applies to: 206-206

src/endpoints/mongodb.rs (3)

41-47: Verify byte alignment for short binary values.

The logic copies bytes from the binary's start into the end of a 16-byte array for big-endian u128 conversion. If the binary has fewer than 16 bytes, this right-aligns the value (treating leading bytes as zeros). Ensure this matches how message_id was originally encoded (line 90 uses the full 16 bytes from to_be_bytes()).


74-91: LGTM with a note on ObjectId derivation.

The approach of using the last 12 bytes of the u128 for the ObjectId is reasonable since the lower bits of UUIDs/sequential IDs tend to be more unique. The fallback path correctly generates a new ObjectId and maps it back to a 16-byte array.


172-172: LGTM!

Making polling_interval configurable with a sensible 100ms default provides operational flexibility for tuning latency vs. load tradeoffs.

src/middleware/retry.rs (3)

9-17: LGTM!

Clean struct design with the inner publisher and retry configuration. Taking ownership of config (not a reference) is appropriate since it's used in async contexts.


60-100: Batch retry implementation looks correct.

The logic properly:

  • Accumulates successful responses across attempts
  • Narrows retries to only failed messages
  • Returns partial success with remaining failures after exhausting retries

One consideration: cloning current_messages on every iteration (line 73) could be memory-intensive for large batches. If inner publishers don't consume the messages, you could explore passing by reference, though this would require trait changes.


102-104: LGTM!

Standard as_any implementation for trait object downcasting.

src/endpoints/nats.rs (6)

4-4: LGTM!

The APP_NAME and uuid imports enable consistent naming and robust message ID parsing.

Also applies to: 16-16


167-178: LGTM!

Good changes:

  • max_ack_pending configurable via prefetch_count with a reasonable default of 10,000
  • durable_name now uses APP_NAME prefix for better identification across deployments

192-192: LGTM!

The queue group naming with APP_NAME prefix ensures isolation between different application deployments.


202-240: LGTM!

The message ID derivation logic is well-prioritized:

  1. JetStream sequence (most reliable for ordering)
  2. Nats-Msg-Id header parsed as UUID
  3. Nats-Msg-Id header parsed as raw u128
  4. Warning logged if unparseable (graceful degradation)

This provides flexibility while preferring the most reliable source.


313-325: LGTM!

Using for_each_concurrent with a limit of 1000 provides a good balance between parallelism and resource consumption. This is cleaner than the previous implementation style while maintaining concurrent ACK behavior.


251-252: LGTM!

All call sites correctly pass the sequence parameter to create_canonical_message:

  • JetStream paths extract and pass the sequence
  • Core NATS paths pass None (no sequence available)

Also applies to: 269-269, 295-296, 302-304, 336-336

src/middleware/mod.rs (1)

48-50: LGTM! Compute middleware integration is clean.

The Compute middleware is correctly wired for both consumers and publishers using handler.clone(), which efficiently clones the underlying Arc<dyn Compute>.

Also applies to: 82-84

src/models.rs (5)

58-82: LGTM! ComputeHandler design correctly prevents config deserialization.

The custom Deserialize implementation returning an error ensures that ComputeHandler can only be created programmatically via new(), which is the intended design pattern for dynamically registered compute functions.


253-259: LGTM! DLQ retry backoff configuration is well-designed.

The addition of exponential backoff parameters to DeadLetterQueueMiddleware improves reliability with sensible defaults.


339-339: LGTM! Configuration extensions are backward compatible.

The new configuration fields for NATS, AMQP, MongoDB, and MQTT are appropriately optional or have sensible defaults, ensuring backward compatibility with existing configurations.

Also applies to: 367-368, 395-395, 419-422


555-555: Past review comment addressed correctly.

The todo!() has been replaced with panic!("Compute middleware cannot be deserialized"), which correctly expresses that this branch should never execute in a test that deserializes from config.


666-699: LGTM! Fanout test provides solid coverage.

The new test validates fanout endpoint deserialization and structure, ensuring the feature works correctly with multiple child endpoints.

src/middleware/random_panic.rs (1)

21-25: No changes needed. The API rand::rng().random_bool(probability) is correctly implemented and available in rand 0.9.2.

Comment thread src/endpoints/mongodb.rs Outdated
Comment thread src/middleware/random_panic.rs
Comment thread src/middleware/retry.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/middleware/dlq.rs (1)

65-68: Retry count semantics still inconsistent with RetryMiddleware.

The DLQ retry logic uses attempt < self.config.dlq_retry_attempts, meaning with dlq_retry_attempts=3, only 2 attempts occur before exhaustion (attempts 1 and 2 retry, attempt 3 falls through to error).

In contrast, retry.rs uses attempt >= max_attempts, so max_attempts=3 allows 3 total attempts.

This creates operator confusion where the same value (3) means different things. Consider aligning both to use the same semantics.

Also applies to: 153-156

🧹 Nitpick comments (1)
src/middleware/retry.rs (1)

60-100: Consider extracting duplicated backoff logic.

The backoff calculation (lines 94-98) is duplicated from retry_op. Consider extracting it to a helper method like next_backoff(&self, current: u64) -> u64 for consistency with the pattern used in DlqPublisher.

🔎 Proposed refactor
+    fn next_backoff(&self, current: u64) -> u64 {
+        let next = (current as f64 * self.config.multiplier) as u64;
+        std::cmp::min(next, self.config.max_interval_ms)
+    }
+
     async fn retry_op<F, Fut, T>(&self, operation: F) -> anyhow::Result<T>
     where
         F: Fn() -> Fut,
         Fut: std::future::Future<Output = anyhow::Result<T>>,
     {
         // ... existing code ...
-                    interval = (interval as f64 * self.config.multiplier) as u64;
-                    if interval > self.config.max_interval_ms {
-                        interval = self.config.max_interval_ms;
-                    }
+                    interval = self.next_backoff(interval);
         // ...
     }

Apply the same change in send_batch.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f0ae729 and c5e9a76.

📒 Files selected for processing (5)
  • src/endpoints/mongodb.rs (4 hunks)
  • src/middleware/dlq.rs (10 hunks)
  • src/middleware/random_panic.rs (1 hunks)
  • src/middleware/retry.rs (1 hunks)
  • src/models.rs (13 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/middleware/retry.rs (2)
src/middleware/dlq.rs (4)
  • new (23-41)
  • send (94-115)
  • send_batch (117-233)
  • as_any (235-237)
src/middleware/random_panic.rs (6)
  • new (14-25)
  • new (60-71)
  • send (82-85)
  • send_batch (87-93)
  • as_any (49-51)
  • as_any (95-97)
🔇 Additional comments (12)
src/middleware/retry.rs (1)

19-47: LGTM on the retry logic implementation.

The retry_op helper cleanly implements exponential backoff with configurable parameters. The semantics are clear: max_attempts means total attempts (not retries), so max_attempts=3 allows attempts 1, 2, and 3 before giving up.

src/endpoints/mongodb.rs (2)

105-107: Previous feedback addressed: now returns Ok(None).

The MongoDB publisher now correctly returns Ok(None) after successful insertion, aligning with the fire-and-forget pattern used by other persistence publishers (AMQP, NATS, Kafka, File).


41-47: Byte ordering logic is correct and consistent.

The serialization uses to_be_bytes() to produce a full 16-byte big-endian representation, and deserialization correctly interprets it using from_be_bytes() after right-aligning. If the Binary contains the full 16 bytes (as from serialization), bytes[16 - 16..16].copy_from_slice() copies all bytes. If a shorter value were somehow stored, right-alignment with leading zeros correctly reconstructs the intended value under big-endian interpretation. The round-trip is sound.

src/middleware/dlq.rs (2)

43-46: Good refactoring: centralized backoff calculation.

The next_backoff helper cleanly encapsulates the exponential backoff logic with the configurable multiplier and maximum interval cap.


102-106: Clear semantics: Ok(None) signals successful DLQ handling.

The updated comment and return value correctly indicate that the message was handled (via DLQ) and the upstream consumer should commit, even though the primary send failed.

src/middleware/random_panic.rs (2)

14-25: Previous feedback addressed: probability validation added.

Both RandomPanicConsumer::new and RandomPanicPublisher::new now validate that probability is within [0.0, 1.0] and panic with a descriptive message if not. Additionally, deserialize_probability in models.rs provides validation at deserialization time.

Also applies to: 59-71


27-31: The code uses the correct rand 0.9.x API. The random_bool(&mut self, p: f64) -> bool method is properly available on the Rng trait, and in rand 0.9 rand::thread_rng() was renamed to rand::rng(). The implementation at lines 27-31 (and 73-77) correctly uses rand::rng().random_bool(probability) and will compile without issues.

src/models.rs (5)

58-82: Well-designed ComputeHandler wrapper.

The ComputeHandler implementation is clean:

  • Uses Arc<dyn Compute> for thread-safe sharing of compute handlers
  • Provides a simple new() constructor
  • Custom Debug impl avoids issues with trait objects
  • Deserialize always fails with a clear error, preventing accidental config-based instantiation

219-229: Good design: Compute variant skipped in serde.

The #[serde(skip)] attribute on Compute(ComputeHandler) correctly prevents this variant from being serialized/deserialized, since compute handlers must be registered programmatically rather than via configuration files.


281-292: Defense in depth: probability validated at both deserialization and construction.

The deserialize_probability function validates the range at config load time, while the RandomPanicConsumer/RandomPanicPublisher constructors also validate at runtime. This provides good protection against invalid values from any source.


569-569: Previous feedback addressed: replaced todo!() with explicit panic!().

The test now uses panic!("Compute middleware cannot be deserialized") instead of todo!(), providing a clear message if this branch is ever unexpectedly reached.


680-713: Good test coverage for Fanout endpoint.

The test verifies that the Fanout(Vec<Endpoint>) variant deserializes correctly from YAML, including nested memory endpoints with their topic configurations.

Comment thread src/endpoints/mongodb.rs
Comment on lines +75 to +87
let (object_id, message_id_bin) = if let Some(message_id) = &message.message_id {
// An ObjectId is 12 bytes. A u128 is 16 bytes. We use the last 12 bytes
// of the message_id to construct the ObjectId, as they are more likely to be unique.
let bin_id = message_id.to_be_bytes();
let id_bytes: [u8; 12] = bin_id[4..].try_into()?;
let oid = mongodb::bson::oid::ObjectId::from(id_bytes);
(oid, bin_id)
} else {
let oid = mongodb::bson::oid::ObjectId::new();
let mut id_bytes = [0u8; 16];
id_bytes[4..16].copy_from_slice(&oid.bytes());
(oid, id_bytes)
};

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential data loss: high 4 bytes of message_id discarded.

When converting a 128-bit message_id to a 12-byte MongoDB ObjectId, the high 4 bytes are discarded (bin_id[4..]). If any message_id has significant high bytes, this could cause:

  1. Different message_ids mapping to the same ObjectId (collision)
  2. Loss of the original message_id on round-trip

If this is intentional for ObjectId compatibility, consider documenting this limitation. If full message_id preservation is required, you might store the _id separately from message_id.

🤖 Prompt for AI Agents
In src/endpoints/mongodb.rs around lines 75 to 87, the code discards the high 4
bytes of a 128-bit message_id when constructing a 12-byte MongoDB ObjectId which
can cause collisions and prevents round-trip preservation; instead, preserve the
full message_id by not truncating it — either use the generated ObjectId only
for the MongoDB _id and store the full u128 message_id in a separate field
(e.g., message_id as a binary or hex string), or make the message_id the
document _id by encoding the full u128 (binary subtype or hex) so no bytes are
dropped; update the code to write message_id into its own field (or change _id
serialization) and add a short comment documenting the chosen approach and the
reason for not truncating bytes.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
src/endpoints/mod.rs (1)

174-181: Potential infinite recursion with circular Fanout references remains unaddressed.

The recursive call to create_publisher_from_route for each fanout endpoint could cause a stack overflow if fanout configurations form a cycle (e.g., Fanout A → Fanout B → Fanout A). Consider adding cycle detection or a recursion depth limit.

src/endpoints/mongodb.rs (1)

75-87: Previous concern still applies: high 4 bytes of message_id discarded when constructing ObjectId.

The truncation of u128 to 12-byte ObjectId (using bin_id[4..]) remains. If two message_ids differ only in their high 4 bytes, they will produce the same ObjectId, causing a duplicate key error on insert.

Consider documenting this limitation or storing the full message_id in a separate field while using the ObjectId only for _id.

🧹 Nitpick comments (8)
src/middleware/retry.rs (1)

97-101: Consider extracting backoff logic to reduce duplication.

The exponential backoff calculation (sleep + interval update with cap) is duplicated from retry_op (lines 39-43). Extracting this into a small helper method would improve maintainability.

tests/integration/common.rs (3)

567-568: Consider using assert_eq! instead of debug_assert_eq! for test reliability.

Using debug_assert_eq! means this verification won't run in release builds. For integration tests where you want to catch regressions regardless of build profile, a regular assert_eq! provides stronger guarantees.

🔎 Proposed fix
-    debug_assert_eq!(final_count, num_messages);
+    assert_eq!(final_count, num_messages);
     start_time.elapsed()

619-622: Consider using assert_eq! here as well for consistency with test validation.

Same concern as the batch read function - debug_assert_eq! won't validate in release builds.

🔎 Proposed fix
-    debug_assert_eq!(
+    assert_eq!(
         final_count.load(std::sync::atomic::Ordering::Relaxed),
         num_messages
     );

650-651: Same concern: prefer assert_eq! over debug_assert_eq! in tests.

🔎 Proposed fix
-    debug_assert_eq!(final_count, num_messages);
+    assert_eq!(final_count, num_messages);
     start_time.elapsed()
tests/integration/nats.rs (1)

79-79: Unnecessary clone - result is not used after this call.

The result.clone() is redundant since result is not used after being passed to add_performance_result. You can pass ownership directly.

🔎 Proposed fix
-        add_performance_result(result.clone());
+        add_performance_result(result);
src/endpoints/kafka.rs (1)

354-358: Potential sign extension issue with i32/i64 to u128 casts.

message.partition() returns i32 and message.offset() returns i64. When cast to u128, negative values (though unlikely in practice) would produce unexpected large positive values due to sign extension. Consider using explicit unsigned casts for clarity and safety.

🔎 Proposed fix
     // Combine partition and offset for a unique ID within a topic.
     // A u128 is used to hold both values, with the partition in the high 64 bits
     // and the offset in the low 64 bits.
-    let message_id = ((message.partition() as u128) << 64) | (message.offset() as u128);
+    let message_id = ((message.partition() as u32 as u128) << 64) | (message.offset() as u64 as u128);
     let mut canonical_message = CanonicalMessage::new(payload.to_vec(), Some(message_id));
src/endpoints/nats.rs (1)

293-304: Consider lowering the concurrency limit for batch acknowledgments.

A concurrency limit of 1000 for for_each_concurrent is quite high and could overwhelm the NATS server or exhaust local resources if there are many large batches in flight. Consider a more conservative limit (e.g., 100) or making it configurable.

🔎 Suggested adjustment
-                        futures::stream::iter(jetstream_messages)
-                            .for_each_concurrent(Some(1000), |message| async move {
+                        // Limit concurrent acks to avoid overwhelming the server
+                        futures::stream::iter(jetstream_messages)
+                            .for_each_concurrent(Some(100), |message| async move {
benches/performance_bench.rs (1)

88-105: Add collection cleanup to MongoDB benchmark functions for consistency.

The integration tests drop the MongoDB collection before each test run, but the benchmark functions assume the environment is clean or append to existing collections. For reliable and reproducible benchmark results across runs, drop the collection before each benchmark iteration, similar to the pattern in tests/integration/mongodb.rs:65-67.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c5e9a76 and a0dcc62.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • Cargo.toml (2 hunks)
  • benches/performance_bench.rs (1 hunks)
  • src/canonical_message.rs (1 hunks)
  • src/endpoints/amqp.rs (7 hunks)
  • src/endpoints/kafka.rs (3 hunks)
  • src/endpoints/mod.rs (5 hunks)
  • src/endpoints/mongodb.rs (4 hunks)
  • src/endpoints/nats.rs (10 hunks)
  • src/middleware/random_panic.rs (1 hunks)
  • src/middleware/retry.rs (1 hunks)
  • tests/integration/amqp.rs (1 hunks)
  • tests/integration/common.rs (11 hunks)
  • tests/integration/kafka.rs (2 hunks)
  • tests/integration/nats.rs (2 hunks)
  • tests/integration_test.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/canonical_message.rs
🧰 Additional context used
🧬 Code graph analysis (8)
src/middleware/retry.rs (1)
src/canonical_message.rs (1)
  • new (21-27)
tests/integration/nats.rs (1)
src/endpoints/nats.rs (2)
  • new (31-63)
  • new (126-180)
src/endpoints/nats.rs (1)
src/canonical_message.rs (2)
  • new (21-27)
  • s (54-54)
tests/integration/kafka.rs (1)
src/endpoints/kafka.rs (2)
  • new (28-108)
  • new (199-254)
src/endpoints/mongodb.rs (1)
src/canonical_message.rs (1)
  • new (21-27)
src/middleware/random_panic.rs (2)
src/canonical_message.rs (1)
  • new (21-27)
src/models.rs (2)
  • new (62-64)
  • new (152-157)
src/endpoints/mod.rs (4)
src/endpoints/amqp.rs (2)
  • new (30-58)
  • new (122-156)
src/endpoints/kafka.rs (2)
  • new (28-108)
  • new (199-254)
src/endpoints/mqtt.rs (2)
  • new (25-39)
  • new (100-121)
src/endpoints/memory.rs (5)
  • new (34-37)
  • new (102-108)
  • new (154-161)
  • get_or_create_channel (83-92)
  • len (72-74)
benches/performance_bench.rs (1)
tests/integration/common.rs (9)
  • format_pretty (503-528)
  • measure_read_performance (530-569)
  • measure_single_read_performance (626-652)
  • measure_single_write_performance (571-624)
  • measure_write_performance (381-472)
  • new (51-55)
  • new (122-138)
  • down (77-91)
  • up (57-75)
🔇 Additional comments (42)
src/middleware/retry.rs (3)

19-47: LGTM!

The retry logic correctly implements exponential backoff with configurable max_attempts semantics. The interval capping at max_interval_ms prevents unbounded growth.


52-58: LGTM!

The message cloning inside the closure is necessary for retry semantics since the closure may be invoked multiple times.


105-107: LGTM!

Standard as_any implementation for trait object downcasting.

src/middleware/random_panic.rs (6)

1-6: LGTM!

All imports are appropriate and necessary for the middleware implementation.


14-25: Validation logic correctly implemented.

The probability validation addresses the previous review concern and correctly enforces the [0.0, 1.0] range at construction time.


34-52: Implementation pattern is appropriate for chaos testing.

The middleware correctly panics before the operation, ensuring that error handling and recovery mechanisms are tested. The delegation pattern is clean and consistent.


59-71: Consistent validation with RandomPanicConsumer.

The validation logic mirrors the consumer implementation, maintaining consistency across both middleware wrappers.


80-98: LGTM!

The publisher implementation mirrors the consumer pattern, maintaining consistency. The panic-before-send behavior is appropriate for testing error recovery in publishing flows.


27-31: This review comment is incorrect. The repository uses Rand 0.9.2, where random_bool is a valid method on the Rng trait. Recent Rand versions renamed gen_bool to random_bool, making the current code correct. The code at lines 28 and 74 already uses the correct method name. Applying the suggested fix to change random_bool to gen_bool would break the code.

Likely an incorrect or invalid review comment.

tests/integration/common.rs (3)

25-32: LGTM!

Adding Default derive to PerformanceResult is a sensible improvement for test scaffolding.


501-528: LGTM!

The format_pretty helper provides clean thousand-separator formatting with proper handling of both integers and floating-point numbers. Truncating to 2 decimal places is reasonable for performance metrics display.


386-471: LGTM!

The refactored measure_write_performance with atomic counting and retry logic is well-structured. The batch retry mechanism with proper success counting provides reliable throughput measurement.

tests/integration/amqp.rs (1)

53-57: LGTM!

Setting delayed_ack: false for performance tests is appropriate to measure maximum throughput without waiting for broker acknowledgments.

tests/integration_test.rs (1)

86-100: LGTM!

The reordering of MQTT and NATS direct performance tests with proper feature gates is a clean organizational change. Each backend now has its own clearly gated test block.

tests/integration/kafka.rs (1)

61-71: LGTM!

The test correctly adapts to the updated KafkaConsumer::new async constructor signature and uses delayed_ack: false for high-throughput performance testing.

tests/integration/nats.rs (1)

64-67: LGTM!

The updated NatsPublisher::new call correctly uses the new signature (config, stream_name, subject) matching the API changes in src/endpoints/nats.rs.

src/endpoints/kafka.rs (2)

146-150: LGTM!

The key derivation logic is clean: use existing message ID bytes when available, otherwise generate a new UUID. This ensures consistent partitioning for messages with IDs while still providing uniqueness for new messages.


199-254: LGTM!

The KafkaConsumer::new constructor is now correctly marked as async, which is consistent with other endpoint constructors and aligns with the async initialization patterns used throughout the codebase.

Cargo.toml (2)

74-74: LGTM!

Adding uuid to the nats feature is appropriate since NATS operations may need UUID generation for durable consumer naming or message identification.


87-92: LGTM!

The criterion dev-dependency with async/tokio features and the bench configuration are correctly set up for async performance benchmarking. The benchmark file exists at the expected location.

src/endpoints/mod.rs (3)

8-8: LGTM!

The fanout module is correctly exposed publicly to enable fanout publishing functionality.


45-47: LGTM!

The Kafka consumer creation correctly awaits the now-async KafkaConsumer::new constructor.


191-232: LGTM!

The integration test for fanout publisher is well-structured: it creates two memory channels, sends a message through the fanout publisher, and verifies that both channels received the correct payload.

src/endpoints/mongodb.rs (4)

41-47: LGTM on the Binary-to-u128 conversion logic.

The zero-padding approach correctly handles variable-length Binary data by right-aligning bytes into a 16-byte array before converting to u128. This ensures proper round-trip for message IDs that were originally u128.


88-102: LGTM on the BSON document construction.

The message_id is now correctly stored as a BSON Binary with the full 16 bytes, preserving the complete message_id for retrieval. The metadata serialization and payload handling are appropriate.


107-107: LGTM - Now returns Ok(None) for consistency.

This aligns with other persistence publishers (AMQP, NATS, Kafka, File) that follow the fire-and-forget pattern.


172-172: LGTM on polling interval configuration.

The unwrap_or(100) provides a sensible default of 100ms when polling_interval_ms is not configured.

src/endpoints/amqp.rs (5)

4-4: LGTM on APP_NAME import.

Using a centralized application name constant for consumer identification improves consistency across endpoints.


24-24: LGTM on the routing_key to queue rename.

The rename from routing_key to queue is applied consistently throughout the struct and method signatures. This improves clarity since the value is used as both the routing key and queue name for the default exchange pattern.

Also applies to: 30-30, 39-42, 54-54, 87-87


53-53: LGTM on exchange configuration.

Deriving the exchange from config.exchange with unwrap_or_default() allows users to specify a custom exchange while defaulting to the empty string (direct routing to queue).


140-149: LGTM on prefetch and consumer naming.

The prefetch_count with unwrap_or(100) provides a reasonable default. Using APP_NAME in the consumer tag aids debugging and monitoring in multi-application environments.


219-220: LGTM on message_id from delivery_tag.

Casting delivery_tag (u64) to u128 is safe and provides a reliable message identifier for AMQP messages. This aligns with the updated CanonicalMessage::new signature.

src/endpoints/nats.rs (4)

4-4: LGTM on imports.

Adding APP_NAME for naming consistency and Uuid for message ID parsing aligns with the PR's goal of unified ID handling.

Also applies to: 16-16


31-48: LGTM on stream creation with stream_name parameter.

The updated NatsPublisher::new signature now explicitly takes stream_name, and the stream subjects pattern {stream_name}.> is correctly configured.


148-159: LGTM on consumer configuration with APP_NAME prefix.

The durable_name format {APP_NAME}-{stream_name}-{subject} ensures uniqueness across applications. Using max_ack_pending from config is a good configurability improvement.


181-219: LGTM on create_canonical_message with sequence fallback logic.

The prioritization is sensible: JetStream sequence (most reliable) → Nats-Msg-Id header (UUID or u128) → None. The warning on parse failure aids debugging without breaking message flow.

benches/performance_bench.rs (6)

11-18: LGTM on integration module reuse.

Using #[path] to include the integration test module allows sharing test utilities (DockerCompose, measure_* functions) without duplication. This is a practical approach for benchmark code.


20-24: LGTM on constants and shared state.

PERF_TEST_MESSAGE_COUNT = 1000 and DEFAULT_SLEEP = 50ms are reasonable defaults. Using Lazy<Mutex<HashMap>> for result accumulation across async benchmarks is appropriate.


203-212: LGTM on Criterion configuration for integration benchmarks.

The settings (sample_size(10), measurement_time(10s), warm_up_time(1s)) are appropriate for network/disk-bound integration tests where each iteration is expensive.


223-225: LGTM on Docker environment management.

Calling down() before up() is defensive and ensures a clean slate even if a previous run crashed. This pattern helps avoid port conflicts or stale container state.


227-256: LGTM on single_write benchmark structure.

The pattern of measuring writes, then cleaning up by reading, ensures queue state consistency across iterations. The per-iteration cleanup is essential for accurate, isolated measurements.


398-424: LGTM on consolidated results reporting.

Using blocking_lock() after group.finish() is safe since all async work is complete. The formatted table output provides clear visibility into benchmark results across backends.

Comment thread src/endpoints/mod.rs
Comment on lines +127 to +131
let stream_name = cfg.stream.as_deref().unwrap_or_default();
Ok(
Box::new(nats::NatsPublisher::new(&cfg.config, stream_name, subject).await?)
as Box<dyn MessagePublisher>,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty stream_name may cause NATS JetStream issues.

Using unwrap_or_default() for stream_name will result in an empty string if no stream is configured. This differs from the consumer path (lines 52-57) which requires a stream and returns an error if not provided. An empty stream name could cause NATS JetStream operations to fail or behave unexpectedly.

🔎 Suggested fix for consistency with consumer validation
         EndpointType::Nats(cfg) => {
             let subject = cfg.subject.as_deref().unwrap_or(route_name);
-            let stream_name = cfg.stream.as_deref().unwrap_or_default();
+            let stream_name = cfg.stream.as_deref().unwrap_or(route_name);
             Ok(
                 Box::new(nats::NatsPublisher::new(&cfg.config, stream_name, subject).await?)
                     as Box<dyn MessagePublisher>,
             )
         }

Alternatively, require a stream for publishers as well and return an error if not provided.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let stream_name = cfg.stream.as_deref().unwrap_or_default();
Ok(
Box::new(nats::NatsPublisher::new(&cfg.config, stream_name, subject).await?)
as Box<dyn MessagePublisher>,
)
let stream_name = cfg.stream.as_deref().unwrap_or(route_name);
Ok(
Box::new(nats::NatsPublisher::new(&cfg.config, stream_name, subject).await?)
as Box<dyn MessagePublisher>,
)
🤖 Prompt for AI Agents
In src/endpoints/mod.rs around lines 127 to 131, the code uses
cfg.stream.as_deref().unwrap_or_default() which yields an empty stream name when
none is configured; change this to require a configured stream and return an Err
if missing (consistent with the consumer path at lines 52-57) so you do not call
NatsPublisher::new with an empty stream. Validate cfg.stream, return a
descriptive error when absent, and only call NatsPublisher::new with a non-empty
stream_name.

Comment thread src/middleware/retry.rs
Comment on lines +87 to +95
Err(e) => {
if attempt >= self.config.max_attempts {
return Err(e);
}
warn!(
"Batch send failed (attempt {}/{}): {}. Retrying...",
attempt, self.config.max_attempts, e
);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Partial successes are lost on final batch error.

If earlier attempts had partial success (some messages in all_responses), returning Err(e) discards those successful responses. Consider preserving partial successes:

🔎 Proposed fix
 Err(e) => {
     if attempt >= self.config.max_attempts {
-        return Err(e);
+        if all_responses.is_empty() {
+            return Err(e);
+        } else {
+            // Preserve partial successes; return remaining as failed
+            return Ok((Some(all_responses), current_messages));
+        }
     }
     warn!(
         "Batch send failed (attempt {}/{}): {}. Retrying...",
         attempt, self.config.max_attempts, e
     );
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Err(e) => {
if attempt >= self.config.max_attempts {
return Err(e);
}
warn!(
"Batch send failed (attempt {}/{}): {}. Retrying...",
attempt, self.config.max_attempts, e
);
}
Err(e) => {
if attempt >= self.config.max_attempts {
if all_responses.is_empty() {
return Err(e);
} else {
// Preserve partial successes; return remaining as failed
return Ok((Some(all_responses), current_messages));
}
}
warn!(
"Batch send failed (attempt {}/{}): {}. Retrying...",
attempt, self.config.max_attempts, e
);
}
🤖 Prompt for AI Agents
In src/middleware/retry.rs around lines 87 to 95, the final Err(e) path discards
any previously accumulated successful responses in all_responses; change the
logic so that if all_responses contains any entries when the max attempts is
reached you return Ok(all_responses) (preserve partial successes) and only
return Err(e) when there are no successful responses to return. Ensure the
warning log stays, but on the final attempt check all_responses.is_empty() to
decide between Err(e) and Ok(all_responses).

Comment thread tests/integration/common.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/integration/common.rs (1)

82-89: Replace docker-compose command with docker compose.

The error indicates the docker-compose binary is unavailable. Modern Docker uses docker compose (as a space-separated command) built into the Docker CLI. Update line 82 to use docker compose instead of docker-compose for compatibility with modern Docker environments.

♻️ Duplicate comments (4)
src/endpoints/mod.rs (2)

127-131: Empty stream_name inconsistency with consumer path.

This concern was previously flagged. The publisher path uses unwrap_or_default() yielding an empty string, while the consumer path (lines 52-57) requires an explicit stream and returns an error if not provided. This asymmetry could cause unexpected NATS JetStream behavior.


174-181: Potential infinite recursion with circular Fanout references.

This concern was previously flagged. Recursive create_publisher_from_route calls without cycle detection could cause stack overflow if a Fanout contains itself directly or transitively.

src/models.rs (1)

569-569: Addressed previous review feedback.

The todo!() has been replaced with panic!() with a descriptive message, which is appropriate here since Compute middleware cannot be deserialized from config.

tests/integration/common.rs (1)

553-553: Critical bug: Increment by actual batch size, not 1.

This issue has already been flagged in a previous review. The final_count should be incremented by msgs.len() to reflect the actual number of messages received in the batch.

🔎 Proposed fix
-                final_count += 1;
+                final_count += msgs.len();
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a0dcc62 and 62eedd0.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (29)
  • Cargo.toml (3 hunks)
  • README.md (1 hunks)
  • benches/performance_bench.rs (1 hunks)
  • src/canonical_message.rs (1 hunks)
  • src/endpoints/file.rs (2 hunks)
  • src/endpoints/http.rs (3 hunks)
  • src/endpoints/memory.rs (3 hunks)
  • src/endpoints/mod.rs (5 hunks)
  • src/endpoints/static_endpoint.rs (3 hunks)
  • src/lib.rs (2 hunks)
  • src/middleware/deduplication.rs (1 hunks)
  • src/middleware/dlq.rs (11 hunks)
  • src/middleware/metrics.rs (1 hunks)
  • src/middleware/mod.rs (3 hunks)
  • src/models.rs (14 hunks)
  • src/route.rs (2 hunks)
  • src/traits.rs (2 hunks)
  • tests/integration/amqp.rs (2 hunks)
  • tests/integration/common.rs (12 hunks)
  • tests/integration/docker-compose/amqp.yml (1 hunks)
  • tests/integration/docker-compose/kafka.yml (2 hunks)
  • tests/integration/docker-compose/mongodb.yml (1 hunks)
  • tests/integration/docker-compose/mqtt.yml (1 hunks)
  • tests/integration/docker-compose/nats.yml (1 hunks)
  • tests/integration/kafka.rs (3 hunks)
  • tests/integration/mongodb.rs (2 hunks)
  • tests/integration/mqtt.rs (2 hunks)
  • tests/integration/nats.rs (4 hunks)
  • tests/memory_test.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • src/middleware/deduplication.rs
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/endpoints/file.rs
  • src/route.rs
  • tests/integration/nats.rs
  • tests/integration/kafka.rs
  • src/endpoints/static_endpoint.rs
  • src/lib.rs
🧰 Additional context used
🧬 Code graph analysis (8)
src/middleware/mod.rs (1)
src/models.rs (2)
  • new (62-64)
  • new (152-157)
tests/memory_test.rs (3)
tests/integration/common.rs (1)
  • format_pretty (503-528)
tests/integration/mod.rs (2)
  • amqp (2-2)
  • mqtt (10-10)
tests/integration_test.rs (1)
  • integration (4-4)
src/canonical_message.rs (1)
src/endpoints/kafka.rs (3)
  • new (28-108)
  • new (199-254)
  • process_message (346-377)
tests/integration/mqtt.rs (1)
src/endpoints/mqtt.rs (4)
  • MqttConsumer (87-92)
  • MqttPublisher (16-20)
  • MessageConsumer (126-159)
  • MqttConsumer (94-116)
benches/performance_bench.rs (2)
tests/integration/common.rs (8)
  • format_pretty (503-528)
  • measure_read_performance (530-569)
  • measure_single_read_performance (626-652)
  • measure_write_performance (381-472)
  • new (51-55)
  • new (122-138)
  • down (77-91)
  • up (57-75)
tests/integration_test.rs (1)
  • test_all_performance_direct (79-113)
tests/integration/mongodb.rs (1)
src/endpoints/mongodb.rs (3)
  • MongoDbPublisher (54-62)
  • MongoDbConsumer (249-441)
  • MongoDbConsumer (119-123)
Cargo.toml (2)
tests/integration/mod.rs (2)
  • amqp (2-2)
  • mqtt (10-10)
src/endpoints/mqtt.rs (1)
  • rustls (321-356)
src/endpoints/mod.rs (1)
src/endpoints/memory.rs (5)
  • new (34-37)
  • new (102-108)
  • new (154-161)
  • get_or_create_channel (83-92)
  • len (72-74)
🪛 GitHub Actions: Continuous Integration
tests/integration/common.rs

[error] 89-89: Failed to stop docker-compose: Os { code: 2, kind: NotFound, message: "No such file or directory" }

🪛 markdownlint-cli2 (0.18.1)
README.md

9-9: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


10-10: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


11-11: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)

🔇 Additional comments (41)
src/middleware/metrics.rs (1)

1-4: LGTM!

The header comments are correctly updated to reflect the repository rename from hot_queue to mq-bridge.

tests/integration/docker-compose/nats.yml (1)

4-4: LGTM! Container name updated consistently.

The container name change from nats-itest-mq-multi-bridge to nats-itest-mq-bridge aligns with the repository-wide rename to mq-bridge.

tests/integration/docker-compose/mqtt.yml (1)

4-4: LGTM! Container name updated consistently.

The container name change aligns with the repository-wide rename to mq-bridge.

tests/integration/docker-compose/amqp.yml (1)

4-4: LGTM! Container name updated consistently.

The container name change aligns with the repository-wide rename to mq-bridge.

README.md (1)

26-71: Excellent documentation with clear usage example.

The code example effectively demonstrates the basic usage pattern with memory channels and provides clear expectations for users.

src/traits.rs (1)

142-156: Well-designed Compute trait with ergonomic blanket implementation.

The new Compute trait provides a clean abstraction for message transformation middleware. The blanket implementation allows users to pass closures directly, which is idiomatic and user-friendly.

Cargo.toml (3)

2-2: LGTM! Package renamed consistently.

The package name change from hot_queue to mq-bridge aligns with the repository-wide rename.


87-92: The criterion version and async configuration are correct.

Version 0.8.1 is the current release of criterion, and it includes full async support. Tokio is explicitly supported through the async executor module, making the async_tokio feature suitable for your benchmark configuration.


23-23: rustls-pemfile 2.2 is compatible with rustls 0.23.

rustls-pemfile 2.2 does not directly depend on a specific rustls version; it only depends on rustls-pki-types 1.9 and works as a standalone PEM parser compatible with any rustls version. UUID is a reasonable addition to the nats feature for generating message IDs in JetStream deduplication, though it's an implementation choice rather than a strict requirement.

tests/memory_test.rs (1)

1-1: LGTM! Import paths and comments updated for package rename.

The changes correctly update the import path and command reference to reflect the package rename from hot_queue to mq-bridge.

Also applies to: 9-9

tests/integration/docker-compose/mongodb.yml (2)

4-4: LGTM! Container name updated consistently.

The container name change from mongodb-itest-hot_queue to mongodb-itest-mq-bridge aligns with the repository-wide rename.


3-3: No compatibility issues found between MongoDB Rust driver 3.4.0 and MongoDB 8.2.

MongoDB Rust driver v3.1.0 provides compatibility with MongoDB 8.0, and since 3.4.0 is a later patch release with minimum server version 4.2, MongoDB 8.2 is fully supported. If a driver is compatible with a major version of MongoDB Server, it is also compatible with the minor and patch releases within that major version unless otherwise specified. Standard healthcheck patterns remain functional with MongoDB 8.2.

src/endpoints/memory.rs (1)

109-117: LGTM - Test helper method with appropriate documentation.

The documentation correctly warns about the broadcast publisher edge case. The implementation is consistent with MemoryConsumer::channel() at lines 162-167.

tests/integration/mqtt.rs (1)

7-7: LGTM - Namespace updates consistent with project rename.

Import paths correctly updated from hot_queue to mq_bridge.

Also applies to: 53-53

tests/integration/mongodb.rs (1)

17-17: Database names differ between pipeline and direct tests.

The CONFIG_YAML uses mq_bridge_test (line 17) while the direct performance test uses mq_bridge_test_db (line 56). This appears intentional to isolate test data, but worth confirming this is the desired behavior.

Also applies to: 56-56

src/endpoints/http.rs (1)

128-128: LGTM - Correctly adapted to new CanonicalMessage constructor signature.

Both call sites correctly pass None for the new optional metadata parameter, consistent with the expanded API.

Also applies to: 249-249

tests/integration/amqp.rs (1)

55-55: Verify delayed_ack: false is intentional.

Changed from true to false. With delayed_ack: false, messages are acknowledged immediately rather than after processing. This affects message delivery guarantees in failure scenarios. If this is for performance testing purposes, this change is reasonable.

src/endpoints/mod.rs (2)

198-232: LGTM - Well-structured integration test for fanout functionality.

The test correctly validates that a single message sent to a fanout publisher is received by both target memory channels. Good coverage of the basic fanout use case.


8-8: LGTM - Public fanout module addition.

The new fanout module is correctly exposed and integrates cleanly with the existing endpoint modules.

src/middleware/mod.rs (3)

11-27: LGTM on module declarations and imports.

The new middleware modules (compute, random_panic, retry) are properly declared and their public types are correctly imported following the existing pattern for conditional feature-gated modules.


48-53: Middleware wiring looks correct.

  • Compute wraps the consumer with ComputeConsumer and clones the handler (appropriate since ComputeHandler wraps Arc).
  • Retry is correctly marked as a no-op for consumers (publisher-only middleware).
  • RandomPanic is applied to consumers as expected.

82-89: Publisher middleware wiring is consistent.

The publisher path correctly applies Compute, Retry, and RandomPanic middlewares. The configuration cloning for Retry and reference passing for RandomPanic align with their respective struct definitions.

src/canonical_message.rs (3)

71-78: Good additions for struct serialization/deserialization.

The from_struct and get_struct methods provide a clean API for working with typed message payloads. These complement the existing from_json method well.


29-40: UUID-based ID generation is well-implemented.

The builder pattern with with_gen_id provides a fluent API, and using Uuid::new_v4().as_u128() is appropriate for generating unique message IDs.


21-27: Constructor signature change verified—all call sites updated.

All 12 calls to CanonicalMessage::new throughout the codebase correctly use the new two-argument signature (payload, message_id).

src/middleware/dlq.rs (3)

43-46: Good exponential backoff implementation.

The next_backoff helper cleanly encapsulates the backoff calculation with configurable multiplier and max cap. The logic is correct.


102-106: Returning Ok(None) for successful DLQ handling is correct.

This semantic change appropriately signals that the message was "handled" (by the DLQ) and should be committed upstream, rather than propagating an error that would prevent commit.


149-151: Batch DLQ success returns empty failed list to enable commit.

Returning Ok((responses, Vec::new())) after successfully sending failed messages to DLQ is the right approach—it signals the upstream consumer that all messages in the batch are accounted for.

src/models.rs (4)

58-82: Well-designed ComputeHandler wrapper.

The design appropriately:

  • Uses Arc<dyn Compute> for shared ownership of the compute implementation
  • Provides a custom Debug impl to avoid trait bound requirements
  • Fails deserialization explicitly since Compute middleware is programmatic-only

281-291: Probability validation is correct.

The deserialize_probability function properly validates that the value is in the [0.0, 1.0] range and provides a clear error message when it's not.


261-272: RetryMiddleware config with sensible defaults.

The retry configuration provides reasonable defaults (3 attempts, 100ms initial, 5s max, 2x multiplier) that align with the DLQ middleware defaults, ensuring consistent retry behavior across the system.


680-713: Fanout deserialization test validates the new endpoint type.

The test correctly verifies that a fanout configuration with multiple child endpoints is properly deserialized, including nested Memory endpoint types.

benches/performance_bench.rs (4)

10-18: Pragmatic reuse of integration test infrastructure.

Using #[path = "..."] to include the integration module allows reusing established test utilities (DockerCompose, measurement functions) without duplication. This is a reasonable approach for benchmarks.


398-424: Results aggregation and printing is well-implemented.

Using blocking_lock() here is correct since this code runs after group.finish() returns and is not in an async context. The formatted output provides a clear comparison table across backends.


165-201: MQTT helper uses unique client IDs to avoid conflicts.

Good practice to generate unique publisher/consumer IDs with Uuid::new_v4() to prevent client ID collisions that could cause connection issues in MQTT.


214-365: Remove redundant explicit _docker.down() calls; rely on Drop implementation for cleanup.

The DockerCompose struct implements Drop and automatically calls down() in its destructor. The explicit _docker.down() at line 362 is redundant since the value will be cleaned up automatically. Additionally, the pattern at lines 219-220 (_docker.down(); _docker.up();) is unnecessary—just call up() directly. Remove both explicit down() calls and let Drop handle cleanup automatically, which also provides panic safety without extra code.

Likely an incorrect or invalid review comment.

tests/integration/common.rs (5)

4-6: LGTM!

The import changes correctly reflect the refactored module structure with the mq_bridge namespace.

Also applies to: 22-22


25-25: LGTM!

Adding Default derive improves ergonomics for test utilities.


328-370: LGTM!

The refactoring to use Duration return types is more idiomatic, and the message-per-second calculations are correct.


381-472: LGTM!

The atomic counter tracking ensures all messages are successfully sent, and the batch retry logic correctly accounts for partial failures. The return type change to Duration is more idiomatic.


501-528: LGTM!

The format_pretty function is a useful addition for human-readable test output. The implementation correctly handles both integers and floating-point numbers with proper thousand separators.

Comment thread README.md
Comment on lines +9 to +11
* **Retries**: Exponential backoff for transient failures.
* **Dead-Letter Queues (DLQ)**: Redirect failed messages.
* **Deduplication**: Message deduplication using `sled`.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix list indentation to match Markdown standards.

The nested list items use 4-space indentation, but the markdown linter expects 2-space indentation for consistency.

🔎 Proposed fix
 *   **Middleware**:
-    *   **Retries**: Exponential backoff for transient failures.
-    *   **Dead-Letter Queues (DLQ)**: Redirect failed messages.
-    *   **Deduplication**: Message deduplication using `sled`.
+  *   **Retries**: Exponential backoff for transient failures.
+  *   **Dead-Letter Queues (DLQ)**: Redirect failed messages.
+  *   **Deduplication**: Message deduplication using `sled`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* **Retries**: Exponential backoff for transient failures.
* **Dead-Letter Queues (DLQ)**: Redirect failed messages.
* **Deduplication**: Message deduplication using `sled`.
* **Middleware**:
* **Retries**: Exponential backoff for transient failures.
* **Dead-Letter Queues (DLQ)**: Redirect failed messages.
* **Deduplication**: Message deduplication using `sled`.
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)

9-9: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


10-10: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


11-11: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)

🤖 Prompt for AI Agents
In README.md around lines 9 to 11, the nested list items under the bullet list
are indented with 4 spaces but the repo markdown style requires 2-space
indentation; update each nested list line to use 2-space indentation (replace
the 4 leading spaces with 2) so the list conforms to the linter and renders
consistently.

Comment thread src/canonical_message.rs
Comment on lines +57 to +60
} else if let Some(n) = val.as_i64() {
message_id = Some(n as u128);
} else if let Some(n) = val.as_u64() {
message_id = Some(n as u128);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Negative i64 values will wrap to large u128 values.

Casting a negative i64 to u128 produces an unexpected large number due to two's complement representation. If negative IDs are invalid, consider rejecting them explicitly.

🔎 Proposed fix to reject negative values
-            } else if let Some(n) = val.as_i64() {
-                message_id = Some(n as u128);
+            } else if let Some(n) = val.as_i64() {
+                if n >= 0 {
+                    message_id = Some(n as u128);
+                }
             } else if let Some(n) = val.as_u64() {
🤖 Prompt for AI Agents
In src/canonical_message.rs around lines 57 to 60, the code casts an i64 to u128
which will wrap negative values into huge u128s; update the logic to explicitly
reject negative i64s instead of casting: when you get Some(n) from val.as_i64(),
check n >= 0 and only then set message_id = Some(n as u128), otherwise return or
propagate a parsing/validation error (or set message_id to None and log/handle
the invalid ID) so negative IDs are not silently converted.

}
None => formatted_integer,
}
debug_assert_eq!(final_count, num_messages);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replace debug_assert_eq! with assert_eq! for reliable testing.

Using debug_assert_eq! means this assertion is compiled out in release builds, which could mask test failures when running benchmarks or performance tests in release mode. Since this is a correctness check ensuring all messages were received, it should always be validated.

🔎 Proposed fix
-    debug_assert_eq!(final_count, num_messages);
+    assert_eq!(final_count, num_messages, "Expected {} messages but received {}", num_messages, final_count);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
debug_assert_eq!(final_count, num_messages);
assert_eq!(final_count, num_messages, "Expected {} messages but received {}", num_messages, final_count);
🤖 Prompt for AI Agents
In tests/integration/common.rs around line 567, the test uses
debug_assert_eq!(final_count, num_messages) which can be compiled out in release
builds; replace it with assert_eq!(final_count, num_messages) so the equality is
always checked during tests and benchmarks. Update the assertion macro
accordingly and run the test suite to verify behavior remains correct.

Comment thread tests/integration/common.rs Outdated
Comment on lines 619 to 622
debug_assert_eq!(
final_count.load(std::sync::atomic::Ordering::Relaxed),
num_messages
);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replace debug_assert_eq! with assert_eq! for reliable testing.

Using debug_assert_eq! means this assertion is compiled out in release builds. Since this validates that all messages were successfully sent, it should always be checked to ensure test correctness.

🔎 Proposed fix
-    debug_assert_eq!(
+    assert_eq!(
         final_count.load(std::sync::atomic::Ordering::Relaxed),
-        num_messages
+        num_messages,
+        "Expected {} messages to be sent but only {} were successful",
+        num_messages,
+        final_count.load(std::sync::atomic::Ordering::Relaxed)
     );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
debug_assert_eq!(
final_count.load(std::sync::atomic::Ordering::Relaxed),
num_messages
);
assert_eq!(
final_count.load(std::sync::atomic::Ordering::Relaxed),
num_messages,
"Expected {} messages to be sent but only {} were successful",
num_messages,
final_count.load(std::sync::atomic::Ordering::Relaxed)
);
🤖 Prompt for AI Agents
In tests/integration/common.rs around lines 619 to 622, the test currently uses
debug_assert_eq! to compare final_count.load(..., Ordering::Relaxed) and
num_messages which is compiled out in release builds; replace debug_assert_eq!
with assert_eq! so the equality is always checked, keeping the same left and
right expressions and Ordering::Relaxed, and ensure the assertion remains
descriptive (you can add a context string if desired) so test failures report
the actual values.

format_pretty(msgs_per_sec)
);
msgs_per_sec
debug_assert_eq!(final_count, num_messages);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Replace debug_assert_eq! with assert_eq! for reliable testing.

Using debug_assert_eq! means this assertion is compiled out in release builds. Since this validates that all messages were successfully received, it should always be checked.

🔎 Proposed fix
-    debug_assert_eq!(final_count, num_messages);
+    assert_eq!(final_count, num_messages, "Expected {} messages but received {}", num_messages, final_count);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
debug_assert_eq!(final_count, num_messages);
assert_eq!(final_count, num_messages, "Expected {} messages but received {}", num_messages, final_count);
🤖 Prompt for AI Agents
In tests/integration/common.rs around line 650, replace the debug-only assertion
with a real test assertion: change the debug_assert_eq!(final_count,
num_messages) call to assert_eq!(final_count, num_messages) so the check runs in
all build profiles; update any surrounding test messages if needed to preserve
context.

kafka:
image: confluentinc/cp-kafka:7.9.5
container_name: kafka-itest-mq-multi-bridge
container_name: kafka-itest-mq-bridge

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for old container name and cluster ID references

echo "=== Searching for 'mq-multi-bridge' references ==="
rg 'mq-multi-bridge' . 2>/dev/null || echo "No matches found"

echo ""
echo "=== Searching for 'ci-cluster-id-for-mq-multi-bridge' references ==="
rg 'ci-cluster-id-for-mq-multi-bridge' . 2>/dev/null || echo "No matches found"

echo ""
echo "=== Checking current state of kafka.yml ==="
cat tests/integration/docker-compose/kafka.yml 2>/dev/null || echo "File not found"

Repository: marcomq/hot_queue

Length of output: 1929


Update all.yml with the same container name and cluster ID changes.

The rename in kafka.yml is correct, but the corresponding changes must also be applied to tests/integration/docker-compose/all.yml. The following services still reference the old naming convention and need to be updated:

  • kafka-itest-mq-multi-bridgekafka-itest-mq-bridge
  • rabbitmq-itest-mq-multi-bridgerabbitmq-itest-mq-bridge
  • nats-itest-mq-multi-bridgenats-itest-mq-bridge
  • mosquitto-itest-mq-multi-bridgemosquitto-itest-mq-bridge
  • ci-cluster-id-for-mq-multi-bridge-12345ci-cluster-id-for-mq-bridge-12345

Without these changes, the integration tests will fail due to container name conflicts.

@marcomq marcomq merged commit 4544fe7 into main Dec 21, 2025
2 checks passed

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
src/endpoints/mod.rs (1)

134-138: Empty stream_name may cause NATS JetStream issues.

Using unwrap_or_default() for stream_name will result in an empty string if no stream is configured. This differs from the consumer path (lines 52-57) which requires a stream and returns an error if not provided. An empty stream name could cause NATS JetStream operations to fail or behave unexpectedly.

Consider requiring a stream for publishers as well and returning an error if not provided, consistent with the consumer validation.

tests/integration/common.rs (3)

568-571: Replace debug_assert_eq! with assert_eq! for reliable testing.

Using debug_assert_eq! means this assertion is compiled out in release builds, which could mask test failures when running benchmarks or performance tests in release mode. Since this is a correctness check ensuring all messages were received, it should always be validated.

🔎 Proposed fix
     if final_count != num_messages {
         eprintln!("measure_read_performance: Expected {} messages, but got {}", num_messages, final_count);
     }
-    debug_assert_eq!(final_count, num_messages);
+    assert_eq!(final_count, num_messages, "Expected {} messages but received {}", num_messages, final_count);

623-627: Replace debug_assert_eq! with assert_eq! for reliable testing.

Using debug_assert_eq! means this assertion is compiled out in release builds. Since this validates that all messages were successfully sent, it should always be checked to ensure test correctness.

🔎 Proposed fix
     let count = final_count.load(std::sync::atomic::Ordering::Relaxed);
     if count != num_messages {
         eprintln!("measure_single_write_performance: Expected {} messages, but got {}", num_messages, count);
     }
-    debug_assert_eq!(count, num_messages);
+    assert_eq!(count, num_messages, "Expected {} messages to be sent but only {} were successful", num_messages, count);

655-658: Replace debug_assert_eq! with assert_eq! for reliable testing.

Using debug_assert_eq! means this assertion is compiled out in release builds. Since this validates that all messages were successfully received, it should always be checked.

🔎 Proposed fix
     if final_count != num_messages {
         eprintln!("measure_single_read_performance: Expected {} messages, but got {}", num_messages, final_count);
     }
-    debug_assert_eq!(final_count, num_messages);
+    assert_eq!(final_count, num_messages, "Expected {} messages but received {}", num_messages, final_count);
🧹 Nitpick comments (2)
src/endpoints/mongodb.rs (1)

75-90: Consider decoupling _id from message_id to eliminate collision risk.

While the full message_id is correctly preserved in its own field, deriving the MongoDB _id from the last 12 bytes of message_id introduces an unnecessary collision risk. If two messages have message_id values differing only in the high 4 bytes, insertion will fail with a duplicate key error.

A cleaner approach: always generate a fresh ObjectId for _id, independent of message_id. This eliminates the collision risk entirely while still preserving the full 128-bit message_id in its dedicated field.

🔎 Proposed refactor to decouple _id from message_id
-        let (object_id, message_id_bin) = if let Some(message_id) = &message.message_id {
-            // An ObjectId is 12 bytes. A u128 is 16 bytes. We use the last 12 bytes
-            // of the message_id to construct the ObjectId, as they are more likely to be unique.
-            // NOTE: This discards the high 4 bytes of the message_id. If two message_ids differ
-            // only in the high 4 bytes, they will result in the same ObjectId, potentially causing
-            // a duplicate key error on insert if _id uniqueness is enforced.
-            let bin_id = message_id.to_be_bytes();
-            let id_bytes: [u8; 12] = bin_id[4..].try_into()?;
-            let oid = mongodb::bson::oid::ObjectId::from(id_bytes);
-            (oid, bin_id)
-        } else {
-            let oid = mongodb::bson::oid::ObjectId::new();
-            let mut id_bytes = [0u8; 16];
-            id_bytes[4..16].copy_from_slice(&oid.bytes());
-            (oid, id_bytes)
-        };
+        // Always generate a fresh ObjectId for MongoDB's _id field to avoid collisions
+        let object_id = mongodb::bson::oid::ObjectId::new();
+        
+        // Generate or use the provided message_id as a full 16-byte value
+        let message_id_bin = if let Some(message_id) = &message.message_id {
+            message_id.to_be_bytes()
+        } else {
+            // If no message_id provided, generate one from the ObjectId (with zero-padding)
+            let mut id_bytes = [0u8; 16];
+            id_bytes[4..16].copy_from_slice(&object_id.bytes());
+            id_bytes
+        };
benches/performance_bench.rs (1)

223-374: Consider optimizing Docker lifecycle management.

The benchmark macro calls _docker.down() and _docker.up() for each backend, which ensures a clean environment but may be time-consuming. The down() call at line 233 followed by up() at line 234 suggests the environment is reset before each backend's benchmarks.

However, the down() at line 371 followed by another at drop (line 232 creates _docker which drops at the end) means down() is called twice. Consider whether the explicit down() at line 371 is necessary given that Drop will be called automatically.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b4fd5d8 and ec6288d.

📒 Files selected for processing (8)
  • benches/performance_bench.rs (1 hunks)
  • src/endpoints/kafka.rs (3 hunks)
  • src/endpoints/mod.rs (6 hunks)
  • src/endpoints/mongodb.rs (4 hunks)
  • src/endpoints/nats.rs (10 hunks)
  • src/middleware/retry.rs (1 hunks)
  • tests/integration/common.rs (12 hunks)
  • tests/integration/nats.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/middleware/retry.rs
  • tests/integration/nats.rs
🧰 Additional context used
🧬 Code graph analysis (5)
src/endpoints/mod.rs (1)
src/endpoints/memory.rs (4)
  • new (34-37)
  • new (102-108)
  • new (154-161)
  • get_or_create_channel (83-92)
src/endpoints/nats.rs (1)
src/canonical_message.rs (2)
  • new (21-27)
  • s (54-54)
src/endpoints/kafka.rs (1)
src/canonical_message.rs (1)
  • new (21-27)
src/endpoints/mongodb.rs (1)
src/canonical_message.rs (1)
  • new (21-27)
benches/performance_bench.rs (1)
tests/integration/common.rs (7)
  • format_pretty (504-529)
  • measure_read_performance (531-573)
  • measure_single_read_performance (631-660)
  • measure_single_write_performance (575-629)
  • measure_write_performance (381-473)
  • down (77-91)
  • up (57-75)
🪛 GitHub Actions: Continuous Integration
tests/integration/common.rs

[error] 89-89: Failed to stop docker-compose: Os { code: 2, kind: NotFound, message: "No such file or directory" } during teardown.

🔇 Additional comments (22)
src/endpoints/mongodb.rs (6)

26-26: LGTM!

The change from i64 to Option<Binary> aligns with the broader refactor to support 128-bit message IDs and makes the field properly optional.


41-47: LGTM!

The Binary-to-u128 conversion logic correctly right-aligns bytes and zero-pads, handling edge cases gracefully. This is consistent with the publisher's big-endian encoding.


91-94: LGTM!

BSON Binary construction for message_id is correct, using the Generic subtype appropriately for binary data.


95-107: LGTM!

Document construction is correct. The use of unwrap_or_default() for metadata handles the optional case properly, and all BSON types are constructed consistently with the consumer's expectations.


110-110: LGTM!

Returning Ok(None) correctly implements the fire-and-forget pattern consistent with other persistence publishers (AMQP, NATS, Kafka, File). This addresses the previous review concern.


175-175: LGTM!

Making the polling interval configurable via config.polling_interval_ms with a reasonable 100ms default is a good improvement for operational flexibility.

src/endpoints/mod.rs (4)

45-47: LGTM!

The async constructor pattern for KafkaConsumer::new is correctly implemented with proper error propagation.


101-115: Good recursion protection added!

The depth-limited publisher creation with MAX_DEPTH = 16 effectively prevents infinite recursion in fanout configurations. This addresses the previous concern about circular fanout references.


181-188: LGTM!

The fanout publisher creation correctly handles recursive depth tracking and uses Box::pin for async recursion. The depth-based protection prevents stack overflow from circular references.


198-240: LGTM!

The integration test correctly validates fanout publisher behavior by verifying that a single message is delivered to multiple endpoints. The test structure is clean and uses the updated CanonicalMessage API appropriately.

src/endpoints/kafka.rs (3)

146-150: LGTM!

The key generation logic appropriately uses the message ID when available (converted to big-endian bytes for consistent ordering) or falls back to a generated UUID, ensuring every message has a unique key.


199-199: LGTM!

The async constructor signature change is appropriate for supporting asynchronous Kafka consumer initialization.


354-358: The partition cast is safe. The partition() method returns i32, but Kafka maps messages to partitions using a hash function and modulo operation with the number of partitions, which guarantees partition values are always in the range 0 to num_partitions-1. The special -1 value (RD_KAFKA_PARTITION_UA) is only returned by partitioner callbacks if partitioning fails, not by the partition() method on consumed messages. No defensive checks or assertions are needed for messages obtained from a consumer.

Likely an incorrect or invalid review comment.

src/endpoints/nats.rs (3)

31-63: LGTM!

The updated NatsPublisher::new signature now requires stream_name as a parameter, ensuring consistent stream creation and preventing empty stream name issues. The use of APP_NAME provides proper application-level namespacing.


148-172: LGTM!

The consumer configuration correctly uses APP_NAME for durable consumer names and queue groups, ensuring proper namespacing. The max_ack_pending configuration is appropriately derived from prefetch_count.


181-219: LGTM!

The sequence-aware create_canonical_message function implements a robust fallback strategy: it prioritizes the JetStream sequence number, then attempts to parse the Nats-Msg-Id header as either a UUID or u128, with appropriate warning logs on parse failures. This handles both JetStream and Core NATS paths correctly.

benches/performance_bench.rs (3)

26-36: LGTM!

The should_run filter allows selective backend execution via command-line arguments, which is useful for running specific benchmarks without executing the full suite.


74-115: Good proactive cleanup in MongoDB helper.

The mongodb_helper::create_publisher drops the collection before testing to ensure a clean state. This is appropriate for benchmarks but note that this means concurrent benchmark runs on the same MongoDB instance could interfere with each other.


236-370: LGTM!

The benchmark implementations correctly include cleanup steps after write benchmarks and setup steps before read benchmarks, ensuring consistent queue state between iterations. The throughput calculations and result collection are properly implemented.

tests/integration/common.rs (3)

25-32: LGTM!

Adding Default derive to PerformanceResult enables the convenient .or_default() pattern when inserting into the results map.


381-473: Good refactoring to duration-based API.

The refactored measure_write_performance correctly tracks message counts with an atomic counter and returns Duration, allowing callers to calculate throughput metrics. The retry logic with backoff and batch size recalculation on partial success are well-implemented.


502-529: LGTM!

The format_pretty function provides readable number formatting with thousand separators, appropriately handling both integers and floating-point numbers with fractional truncation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant