Skip to content

Clippy, batch bugfixes and test and performance improvements#6

Merged
marcomq merged 12 commits into
mainfrom
dev
Dec 31, 2025
Merged

Clippy, batch bugfixes and test and performance improvements#6
marcomq merged 12 commits into
mainfrom
dev

Conversation

@marcomq

@marcomq marcomq commented Dec 30, 2025

Copy link
Copy Markdown
Owner

Summary by CodeRabbit

  • New Features

    • Improved tracing/observability across transports; README expanded with philosophy, usage examples, and Into-based message examples; readiness signaling for route startup; new MQTT consumer/subscriber wrappers; performance test scaffolding.
  • Breaking Changes

    • Message send API now accepts Into-convertible values; route run_until_err gained an extra parameter; some transport constructors require explicit topic/collection; YAML keys renamed in/out → input/output.
  • Improvements

    • Concurrent batch dispatch, per-message error aggregation, richer ack/commit handling, and enhanced logging.
  • Bug Fixes

    • Tests and docs updated to match new APIs and configs.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Dec 30, 2025

Copy link
Copy Markdown
📝 Walkthrough

Walkthrough

Bumps version and README; replaces CanonicalMessage::from_str with From<&str>; adds hidden LazyMessageIds tracing helper; broad observability and concurrent batch dispatch changes across endpoints; per-endpoint context fields added; Route gained readiness signaling and run signatures changed; tests and YAML keys updated from in/outinput/output.

Changes

Cohort / File(s) Summary
Version & Docs
Cargo.toml, README.md
Version bumped to 0.1.1. README updated: Into-based sending, “events” semantics, YAML/JSON/env guidance, Philosophy section, examples adjusted (Route::run_until_err signature updated).
Core message & tracing helper
src/canonical_message.rs
Removed CanonicalMessage::from_str; impl From<&str> now constructs via Self::new(..., None); added #[doc(hidden)] pub mod tracing_support with LazyMessageIds<'a> (Debug prints 32-digit hex IDs).
Route & readiness signaling
src/route.rs, tests/.../route.rs, tests/memory_test.rs, tests/*
run now returns anyhow::Result<(JoinHandle<()>, Sender<()>)>; run_until_err gained ready_tx: Option<Sender<()>>; sequential/concurrent runners accept/emit readiness and callers/tests updated to pass extra None.
Error-return adjustments
src/command_handler.rs, src/event_handler.rs
Several error returns changed from Err(e.into()) to Err(e) to preserve original error types; tests updated to use Into conversions for inputs.
Endpoint observability & context fields
src/endpoints/*
src/endpoints/amqp.rs, kafka.rs, mongodb.rs, file.rs, mqtt.rs, nats.rs, memory.rs, http.rs, static_endpoint.rs
Added tracing (many using LazyMessageIds), richer per-endpoint context fields (e.g., queue, topic, collection_name, path, subject), per-message metadata propagation, ack/commit refinements, and logging improvements. Constructors/structs updated to store context where applicable.
Batch concurrency & aggregation
src/endpoints/fanout.rs, src/endpoints/http.rs, src/endpoints/switch.rs, src/endpoints/nats.rs, src/endpoints/file.rs
Replaced sequential batch helpers with concurrent dispatch (futures::join_all) in several publishers; per-message failure collection and aggregation into Ack/Partial outcomes; switch groups by publisher and dispatches groups in parallel.
MQTT rework
src/endpoints/mqtt.rs
Background event loop spawned (JoinHandle), boxed V3/V5 eventloops, new MqttListener, public MqttConsumer/MqttSubscriber wrappers, topic tracking, JSON-first deserialization attempt for CanonicalMessage, and adjusted subscribe/publish wiring.
Middleware, models & utilities
src/middleware/deduplication.rs, src/models.rs, src/type_handler.rs
Deduplication logs now include hex message_id; MemoryConfig construction simplified in Endpoint::new_memory; TypeHandler: Default implemented; minor formatting tweaks.
Tests, perf harness & CI fixtures
tests/integration/*.rs, tests/integration/common.rs, tests/integration_test.rs, tests/performance_pipeline.rs, benches/performance_bench.rs, tests/integration/docker-compose/*.yml
Many tests updated: YAML keys in/outinput/output; new perf constants (PERF_TEST_BATCH_MESSAGE_COUNT, PERF_TEST_SINGLE_MESSAGE_COUNT, PERF_TEST_CONCURRENCY); TestHarness simplified; AMQP prefetch_count added in tests; new performance test scaffolding; docker-compose nats.yml adds JetStream store path; benches print batch/single counts.

Sequence Diagram(s)

sequenceDiagram
    %% Participants
    participant Caller as Route.run caller
    participant Router as Route
    participant Runner as Runner task
    participant Ready as Ready channel

    Note over Caller,Router: Caller invokes Route.run (now returns Result with ready signaling)
    Caller->>Router: run(name) -> Result<(JoinHandle, Sender)>
    Router->>Runner: spawn background runner(task, ready_tx)
    Runner->>Ready: ready_tx.send(())  -- signal readiness immediately on startup
    Caller->>Ready: wait up to 5s for ready_rx
    alt ready received
        Caller->>Caller: proceed with startup success
    else timeout / closed
        Caller->>Runner: abort/return error
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I nibble logs and tidy trails with care,

message IDs like carrots, shining there.
Batches hop together, routed in a leap,
topics, queues, and traces — memories to keep.
A rabbit cheers: observability — deep!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 76.38% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The pull request title is vague and overly broad, using generic phrases like 'Clippy,' 'bugfixes,' and 'improvements' without specifying the main change or which aspects are most significant. Refine the title to highlight the primary change, such as specifying the most impactful modification (e.g., tracing enhancements, API signature updates, or batch operation improvements) rather than listing general categories.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
src/event_handler.rs (1)

32-35: Consider updating the comment to reflect implicit conversion.

The code change from Err(e.into()) to Err(e) works correctly (relying on the implicit From<HandlerError> for PublisherError conversion), but the comment on Line 34 stating "Converts HandlerError to PublisherError" is now potentially misleading since the conversion is implicit via the ? operator or return type coercion.

🔎 Suggested comment update
     async fn send(&self, message: CanonicalMessage) -> Result<Sent, PublisherError> {
         match self.handler.handle(message).await {
             Ok(_) => Ok(Sent::Ack),  // Ignore result (Ack or Publish), just Ack.
-            Err(e) => Err(e), // Converts HandlerError to PublisherError
+            Err(e) => Err(e), // HandlerError automatically converts to PublisherError
         }
     }
src/endpoints/fanout.rs (1)

31-52: Concurrent batch sending improves throughput but consider partial failure handling.

The switch from sequential to concurrent batch dispatch is a good performance improvement. However, the current error handling propagates the first failure and aborts, which means some publishers may have succeeded while others failed. For fanout semantics, you might want to:

  1. Collect all results and return an aggregated error with details about which publishers failed
  2. Or log failures individually while continuing to attempt all publishes

Additionally, consider adding trace logging consistent with the broader tracing enhancements in this PR (e.g., using LazyMessageIds to log batch operations).

🔎 Proposed enhancement for error aggregation
 async fn send_batch(
     &self,
     messages: Vec<CanonicalMessage>,
 ) -> Result<SentBatch, PublisherError> {
     use futures::future::join_all;
 
     if messages.is_empty() {
         return Ok(SentBatch::Ack);
     }
 
     // Send the batch to all publishers concurrently.
     let batch_sends = self.publishers.iter().map(|p| {
         // Each publisher gets a clone of the entire batch. This can be memory-intensive.
         p.send_batch(messages.clone())
     });
 
     let results = join_all(batch_sends).await;
 
-    // For fan-out, we consider the batch successful if it was successfully sent to *all* publishers.
-    // If any publisher returns a hard error, we propagate it.
-    // We don't currently aggregate partial failures from different fan-out destinations.
-    for result in results {
-        result?;
-    }
+    // Collect all errors to provide better visibility into partial failures
+    let mut errors = Vec::new();
+    for (idx, result) in results.into_iter().enumerate() {
+        if let Err(e) = result {
+            errors.push((idx, e));
+        }
+    }
+    
+    if !errors.is_empty() {
+        // Return the first error, but consider logging all failures
+        return Err(errors.into_iter().next().unwrap().1);
+    }
 
     Ok(SentBatch::Ack)
 }
src/endpoints/http.rs (1)

346-383: Concurrent batch implementation looks solid, but consider rate limiting.

The refactor to use join_all for concurrent HTTP requests is well-implemented with proper error handling and result aggregation. However, sending all messages concurrently could overwhelm the target server or hit rate limits.

Consider adding a configurable concurrency limit to avoid overwhelming downstream services:

💡 Example implementation with semaphore-based rate limiting
use tokio::sync::Semaphore;
use std::sync::Arc;

// In HttpPublisher struct, add:
// concurrent_limit: Arc<Semaphore>,

// In send_batch:
async fn send_batch(
    &self,
    messages: Vec<CanonicalMessage>,
) -> Result<SentBatch, PublisherError> {
    use futures::future::join_all;

    if messages.is_empty() {
        return Ok(SentBatch::Ack);
    }

    // Limit concurrent requests (e.g., max 10 concurrent)
    let semaphore = Arc::new(Semaphore::new(10));

    let send_futures = messages.into_iter().map(|message| {
        let msg_for_err = message.clone();
        let sem = semaphore.clone();
        async move {
            let _permit = sem.acquire().await.unwrap();
            self.send(message).await.map_err(|e| (msg_for_err, e))
        }
    });

    let results = join_all(send_futures).await;
    // ... rest of aggregation logic
}
src/endpoints/mqtt.rs (1)

285-289: Consider deferring message_ids formatting.

The comment // could be optimized is accurate—the format! call at line 285 executes eagerly, even though the value is only used inside the async commit closure. For large batches, this could add unnecessary overhead.

💡 Defer formatting until commit is called
 let count = messages.len();
 trace!(count = count, topic = %self.topic, message_ids = ?LazyMessageIds(&messages), "Received batch of MQTT messages");
 let log_noun = self.log_noun;
-let message_ids = format!("{:?}", LazyMessageIds(&messages)); // could be optimized
+let messages_for_trace = messages.clone(); // Clone only IDs if needed, or defer formatting
 let commit = Box::new(move |_responses: Option<Vec<CanonicalMessage>>| {
     Box::pin(async move {
-        trace!(count = count, message_ids = %message_ids, "MQTT batch of {}s processed", log_noun);
+        trace!(count = count, message_ids = ?LazyMessageIds(&messages_for_trace), "MQTT batch of {}s processed", log_noun);
     }) as BoxFuture<'static, ()>
 });

Note: This adds a clone, so measure the trade-off. Alternatively, extract just the IDs into a Vec<u128> if you want to avoid cloning full messages.

src/endpoints/switch.rs (1)

56-91: Consider the comment accuracy: grouping is by metadata value, not by Arc pointer.

The comment on lines 63-64 mentions "raw pointer of the Arc as a key" but the actual grouping key (lines 74-81) uses the metadata value, not the Arc pointer. The comment on line 72-73 is misleading since it references pointer-based grouping that isn't actually implemented.

🔎 Suggested comment fix
-        // Group messages by their target publisher.
-        // We use the raw pointer of the Arc as a key to group messages for the same publisher instance.
+        // Group messages by their target publisher.
+        // We use the metadata value as a key to group messages for the same publisher instance.
         let mut grouped_messages: HashMap<
             String,
             (Arc<dyn MessagePublisher>, Vec<CanonicalMessage>),
         > = HashMap::new();

         for message in messages {
             if let Some(publisher) = self.get_publisher(&message) {
-                // Use the pointer address of the Arc as a key. This is safe as the Arcs live
-                // as long as the SwitchPublisher and we clone them into the map.
+                // Use the metadata value as a key to group messages for the same publisher.
                 grouped_messages
src/endpoints/amqp.rs (1)

383-389: Consider using bounded concurrency for ack operations.

The commit closure uses for_each_concurrent(None, ...) which allows unlimited concurrent acks. While AMQP brokers can typically handle this, consider adding a concurrency limit (similar to the NATS implementation at line 509 which uses Some(100)) for consistency and to prevent overwhelming the broker under heavy load.

🔎 Suggested change
                 futures::stream::iter(ackers)
-                    .for_each_concurrent(None, |acker| async move {
+                    .for_each_concurrent(Some(100), |acker| async move {
                         if let Err(e) = acker.ack(BasicAckOptions::default()).await {
                             tracing::error!(error = %e, "Failed to ack AMQP message");
                         }
                     })
                     .await;
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17e22f2 and 756dc8f.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (30)
  • Cargo.toml
  • README.md
  • src/canonical_message.rs
  • src/command_handler.rs
  • src/endpoints/amqp.rs
  • src/endpoints/fanout.rs
  • src/endpoints/file.rs
  • src/endpoints/http.rs
  • src/endpoints/kafka.rs
  • src/endpoints/memory.rs
  • src/endpoints/mod.rs
  • src/endpoints/mongodb.rs
  • src/endpoints/mqtt.rs
  • src/endpoints/nats.rs
  • src/endpoints/static_endpoint.rs
  • src/endpoints/switch.rs
  • src/event_handler.rs
  • src/middleware/deduplication.rs
  • src/models.rs
  • src/route.rs
  • src/type_handler.rs
  • tests/integration/amqp.rs
  • tests/integration/common.rs
  • tests/integration/kafka.rs
  • tests/integration/memory.rs
  • tests/integration/mongodb.rs
  • tests/integration/mqtt.rs
  • tests/integration/nats.rs
  • tests/integration_test.rs
  • tests/performance_pipeline.rs
🧰 Additional context used
🧬 Code graph analysis (8)
tests/integration/memory.rs (1)
tests/integration/common.rs (1)
  • run_performance_pipeline_test (152-158)
tests/performance_pipeline.rs (5)
tests/integration/kafka.rs (1)
  • test_kafka_performance_pipeline (46-56)
tests/integration/amqp.rs (1)
  • test_amqp_performance_pipeline (37-47)
tests/integration/mqtt.rs (1)
  • test_mqtt_performance_pipeline (37-47)
tests/integration/nats.rs (1)
  • test_nats_performance_pipeline (37-47)
tests/integration/mongodb.rs (1)
  • test_mongodb_performance_pipeline (38-48)
tests/integration_test.rs (2)
tests/integration/kafka.rs (1)
  • test_kafka_performance_direct (58-88)
tests/integration/amqp.rs (1)
  • test_amqp_performance_direct (49-73)
tests/integration/mqtt.rs (1)
tests/integration/common.rs (3)
  • run_test_with_docker (322-331)
  • setup_logging (262-288)
  • run_performance_pipeline_test (152-158)
src/endpoints/switch.rs (1)
src/canonical_message.rs (6)
  • new (23-29)
  • from (104-106)
  • from (110-112)
  • from (116-118)
  • from (122-124)
  • from (136-141)
src/endpoints/memory.rs (1)
src/canonical_message.rs (5)
  • from (104-106)
  • from (110-112)
  • from (116-118)
  • from (122-124)
  • from (136-141)
src/command_handler.rs (1)
src/canonical_message.rs (1)
  • new (23-29)
src/endpoints/mqtt.rs (2)
src/canonical_message.rs (1)
  • fmt (153-160)
src/endpoints/memory.rs (6)
  • len (76-78)
  • send_batch (134-149)
  • new (38-41)
  • new (106-112)
  • new (165-172)
  • new (236-241)
🔇 Additional comments (53)
Cargo.toml (1)

3-3: LGTM! Version bump is appropriate.

The version increment from 0.1.0 to 0.1.1 aligns with the bug fixes and improvements in this PR.

src/type_handler.rs (1)

65-69: LGTM! Standard Default implementation.

The Default trait implementation correctly delegates to the existing new() constructor, following Rust conventions.

src/models.rs (1)

192-195: LGTM! Correct removal of unnecessary Default.

Since both topic and capacity fields are explicitly initialized, the ..Default::default() spread is unnecessary and its removal is correct.

src/middleware/deduplication.rs (2)

51-51: Good addition of message ID formatting for observability.

The hex-formatted message ID will improve log correlation and debugging.


65-72: LGTM! Enhanced tracing for deduplication flow.

The trace log for new messages and the enhanced info log for duplicates provide valuable observability. The consistent use of message_id_hex aligns with tracing patterns across the codebase.

README.md (1)

149-149: LGTM! Documentation updated to reflect API changes.

The usage example now correctly demonstrates the Into trait conversion for sending messages, aligning with the From<&str> implementation for CanonicalMessage.

tests/integration/mongodb.rs (1)

14-23: LGTM! Consistent configuration key naming.

The rename from in/out to input/output aligns with the Route struct's field names and improves clarity across the codebase.

src/route.rs (2)

130-133: LGTM! Simplified error propagation.

Returning the error directly without conversion is correct since ConsumerError::Connection(e) already contains an anyhow::Error, which matches the function's return type.


244-247: LGTM! Consistent error handling.

The direct error return mirrors the sequential runner's error handling and correctly propagates the connection error for reconnection.

src/command_handler.rs (2)

48-48: LGTM! Simplified error conversion.

The change from Err(e.into()) to Err(e) is cleaner and relies on automatic type conversion from HandlerError to PublisherError. This should work correctly given the trait implementations.


85-95: LGTM! Test updates align with API changes.

The use of .into() for constructing CanonicalMessage instances is consistent with the broader API changes in this PR where CanonicalMessage now implements From<&str>.

src/endpoints/mod.rs (1)

189-190: Non-functional formatting change.

These are whitespace-only adjustments with no behavioral impact.

tests/integration/memory.rs (2)

12-21: LGTM! Config key renames are consistent.

The changes from in/out to input/output align with the broader configuration schema updates across all integration tests in this PR.


30-30: The broker name "internal" is correct and properly configured. The CONFIG_YAML defines both required routes: memory_to_internal (input) and internal_to_memory (output). No changes needed.

tests/integration/nats.rs (1)

13-22: LGTM! Consistent configuration updates.

The configuration key renames from in/out to input/output are consistent with the broader schema changes across all integration tests.

tests/performance_pipeline.rs (1)

1-36: LGTM! Well-structured performance test scaffolding.

The feature-gated tests with #[ignore] annotations provide good infrastructure for running performance tests on-demand without impacting regular CI runs.

tests/integration/kafka.rs (1)

13-31: LGTM! Configuration schema updated consistently.

The configuration key renames maintain the nested structure while updating to the new input/output naming convention.

src/endpoints/static_endpoint.rs (2)

35-40: LGTM! Enhanced observability with message ID tracing.

The addition of trace logging with hex-formatted message IDs aligns with the broader tracing improvements across endpoints in this PR and provides valuable observability for static responses.


80-80: LGTM! Consistent tracing for static message production.

The trace logging matches the pattern established in the publisher, providing symmetric observability for both sides of the static endpoint.

tests/integration/amqp.rs (1)

13-22: LGTM! Configuration standardization and performance tuning.

The renaming of in/out to input/output improves configuration clarity, and the addition of prefetch_count: 1000 allows for better throughput control in performance tests.

tests/integration/mqtt.rs (1)

5-5: LGTM! Test configuration updates align with batch-oriented testing.

The switch from PERF_TEST_MESSAGE_COUNT to PERF_TEST_BATCH_MESSAGE_COUNT and the input/output key renaming are consistent with the broader test infrastructure improvements across the PR.

Also applies to: 13-22, 30-30, 42-44, 56-56

tests/integration_test.rs (1)

15-24: LGTM! Test execution order adjustment.

The addition of Kafka and AMQP direct performance tests at the beginning of the test suite is a straightforward organizational change with proper feature gating.

src/endpoints/memory.rs (2)

138-143: LGTM! Excellent observability improvements.

The addition of trace logging with LazyMessageIds provides valuable debugging information without performance impact (only formats when trace is enabled). The inclusion of topic and message_ids in both send and receive paths creates a consistent tracing pattern.

Also applies to: 214-214


352-352: LGTM! API usage updated correctly.

The change from CanonicalMessage::from_str to CanonicalMessage::from aligns with the removal of the from_str method and the addition of the From<&str> trait implementation in src/canonical_message.rs.

src/endpoints/http.rs (1)

159-159: LGTM! Consistent tracing additions.

The trace logging with message_id formatting provides valuable observability for HTTP request/response flows.

Also applies to: 286-290

src/canonical_message.rs (2)

103-105: LGTM! Cleaner API design with trait implementation.

Replacing the dedicated from_str method with the From<&str> trait implementation provides a more idiomatic Rust API and enables automatic conversion in contexts that accept Into<CanonicalMessage>.


144-162: Excellent observability helper with zero-cost abstraction.

The LazyMessageIds struct is well-designed:

  • Defers formatting work until the Debug trait is invoked (only when trace is actually enabled)
  • Provides consistent hex formatting across all endpoints
  • Uses #[doc(hidden)] appropriately for internal-only API
src/endpoints/mqtt.rs (3)

1-1: LGTM! Comprehensive tracing improvements.

The addition of LazyMessageIds tracing across send and receive paths provides excellent observability. The inclusion of context fields like topic, message_id, and payload_size creates a consistent logging pattern across MQTT operations.

Also applies to: 131-142, 145-157, 283-283


189-189: LGTM! Topic tracking improves context.

Adding the topic field to MqttListener enables more informative logging and error messages throughout the consumer lifecycle.

Also applies to: 219-219


325-327: LGTM! EventLoop boxing improves stack efficiency.

Boxing both EventLoop variants prevents large stack allocations when one variant is significantly larger than the other. The consistent box/unbox pattern when passing to event loop runners is correctly implemented.

Also applies to: 354-354, 367-367, 495-495, 513-513

src/endpoints/file.rs (2)

26-26: LGTM! Path tracking improves observability.

Adding the path field to FilePublisher enables context-aware logging and debugging for file-based operations.

Also applies to: 48-48


64-64: LGTM! Consistent tracing additions.

The trace logging with LazyMessageIds and path context follows the same pattern established in other endpoints, providing excellent observability for file I/O operations.

Also applies to: 181-181

src/endpoints/switch.rs (3)

44-49: LGTM!

Good enhancement to include the message_id in the warning log. This improves observability when messages are dropped due to missing metadata keys.


93-120: Verify error handling: early return on any sub-batch error discards partial successes.

When any sub-batch fails entirely (lines 113-118), the function returns immediately with the error. This discards any successful results from other sub-batches that may have already completed. This is documented as intentional ("Propagating the error is the safest and simplest option"), but it's worth noting that partial progress is lost.

The current behavior is acceptable for retry-based error handling at the caller level, but if fine-grained partial success tracking is needed later, this would need refactoring.


175-183: LGTM!

Test updates correctly use CanonicalMessage::from("...") instead of the removed from_str, aligning with the trait implementation in src/canonical_message.rs.

src/endpoints/nats.rs (4)

73-78: LGTM!

Good addition of structured tracing with subject, message_id, and payload_size fields for improved observability.


125-187: Batch publishing logic is well-structured with proper error aggregation.

The implementation correctly:

  1. Separates delayed_ack (fire-and-forget) from synchronous ack paths
  2. Collects publish futures first, then awaits all acks
  3. Aggregates failures into SentBatch::Partial

One observation: on line 153, message.payload.clone() clones the payload for the publish call. Since the message is moved into ack_futures on success, this clone is necessary. The pattern is correct.


229-232: LGTM!

Subject field correctly added to NatsConsumer and NatsSubscriber structs and initialized in constructors. This enables richer logging context.

Also applies to: 275-278


489-501: Minor: Match pattern could be simplified.

The try_next().now_or_never() pattern returns Option<Result<Option<T>, E>>. The current match handles the happy path correctly, but the _ => break catch-all silently handles errors.

This is consistent with similar patterns in other files (amqp.rs, kafka.rs) where errors during greedy batch filling are acceptable since at least one message was already received.

src/endpoints/amqp.rs (3)

118-185: LGTM! Batch publishing with publisher confirms is well-implemented.

The implementation correctly:

  1. Falls back to helper for delayed_ack mode
  2. Publishes all messages first, collecting confirmation futures
  3. Awaits all confirmations and aggregates failures
  4. Returns appropriate SentBatch::Ack or SentBatch::Partial

The pattern mirrors the NATS implementation, providing consistency across endpoints.


363-374: Good use of try_next().now_or_never() for greedy batch consumption.

This pattern correctly handles:

  • Some(Ok(Some(delivery))): Process message
  • Some(Ok(None)): Stream ended
  • Some(Err(e)): Log warning and break (preserving already-received messages)
  • None: No messages ready, stop greedy consumption

The warning on line 371 appropriately logs errors without failing the entire batch.


460-498: Good enhancement: AMQP metadata extraction is now more comprehensive.

The implementation correctly:

  1. Uses delivery_tag as fallback message_id
  2. Attempts to parse AMQP message_id as UUID or u128
  3. Extracts and stores amqp_message_id and amqp_correlation_id in metadata
  4. Handles various AMQP header value types (LongString, ShortString, Boolean, LongInt)

This provides richer message context for downstream processing.

tests/integration/common.rs (4)

87-87: Good addition: -v flag ensures volumes are removed on cleanup.

This prevents stale data from previous test runs affecting subsequent tests, improving test isolation and reproducibility.


166-182: LGTM! Route extraction from YAML is cleaner.

The refactored approach:

  1. Parses YAML config
  2. Extracts routes by computed name pattern (memory_to_<broker>, <broker>_to_memory)
  3. Provides clear panic messages if routes aren't found

This is more maintainable than the previous approach.


220-226: Good improvement: proper shutdown sequencing.

Sending shutdown signals to both routes and awaiting handles ensures graceful termination. The final drain (line 226) captures any messages that arrived during shutdown, preventing lost messages in assertions.


410-412: Constants improve test configurability.

Separating PERF_TEST_BATCH_MESSAGE_COUNT (250,000) and PERF_TEST_SINGLE_MESSAGE_COUNT (5,000) allows appropriate sizing for each test type. The alias PERF_TEST_MESSAGE_COUNT maintains backward compatibility.

src/endpoints/mongodb.rs (4)

59-70: LGTM!

Consistent metadata key naming with mq_bridge. prefix for internal metadata. The comparison logic correctly identifies "raw" format documents for round-trip preservation.


120-123: LGTM!

Collection name field correctly added to MongoDbPublisher and MongoDbConsumer structs, enabling context-aware logging throughout the publish and consume paths.

Also applies to: 255-256


543-543: Boxing ChangeStream is appropriate for the enum.

Box<ChangeStream<...>> reduces the size difference between enum variants and allows the SubscriberStream enum to have a consistent size on the stack.


130-130: Consistent tracing enhancements across MongoDB operations.

All trace statements now include collection context, and batch operations include LazyMessageIds for comprehensive observability. This aligns with tracing patterns in other endpoint implementations.

Also applies to: 148-148, 518-518, 650-650, 691-691

src/endpoints/kafka.rs (3)

111-116: LGTM!

Comprehensive tracing added to send() and send_batch() with topic, message_id/message_ids, and payload_size/count. This aligns with the tracing patterns across other endpoints.

Also applies to: 156-161


269-272: LGTM!

Topic field correctly added to KafkaConsumer and KafkaSubscriber structs and passed to receive_batch_internal for contextual logging.

Also applies to: 326-326, 375-376


492-506: Consistent greedy batch consumption pattern.

The try_next().now_or_never() pattern is applied consistently here as in NATS and AMQP endpoints. The trace log on line 506 includes topic and message IDs for comprehensive observability.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/integration/common.rs (1)

186-189: Consider proper synchronization over sleep-based timing.

The 200ms sleep delays rely on timing assumptions rather than explicit readiness signals. This can lead to flaky tests on slower systems or unnecessarily slow tests on faster ones. If the Route API provides readiness signals or health checks, use those instead.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 756dc8f and e03f76b.

📒 Files selected for processing (5)
  • benches/performance_bench.rs
  • src/command_handler.rs
  • src/event_handler.rs
  • src/models.rs
  • tests/integration/common.rs
💤 Files with no reviewable changes (1)
  • src/models.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/event_handler.rs
🧰 Additional context used
🧬 Code graph analysis (2)
benches/performance_bench.rs (1)
tests/integration/common.rs (1)
  • format_pretty (570-595)
tests/integration/common.rs (1)
src/endpoints/memory.rs (1)
  • len (76-78)
🔇 Additional comments (9)
src/command_handler.rs (2)

85-85: LGTM! Cleaner test code with .into() conversions.

The test updates consistently use .into() for converting string literals to CanonicalMessage, which is more idiomatic and aligns well with the PR's objective to leverage From<&str> implementation.

Also applies to: 90-90, 106-106, 122-122, 148-148, 153-153, 195-195, 233-233


48-48: No action needed. The code change is correct.

HandlerError and PublisherError are identical type aliases (both resolve to ProcessingError in src/errors.rs), so the change from Err(e.into()) to Err(e) requires no type conversion. The code compiles and functions correctly without explicit .into() calls.

tests/integration/common.rs (7)

87-87: LGTM - Volume cleanup improves test isolation.

Adding the -v flag ensures Docker volumes are removed during cleanup, providing a clean state for subsequent test runs. This is appropriate for integration tests.


115-146: LGTM - Simplified test harness.

The removal of the temporary directory field and its initialization simplifies the TestHarness structure without affecting functionality.


196-227: LGTM - Improved timeout and observability for performance tests.

The extended timeout (70 seconds) and periodic progress logging enhance the reliability and observability of performance tests. Draining remaining messages after shutdown ensures accurate message counts.


230-246: LGTM - Accurate performance metrics based on actual received count.

Using received.len() for throughput calculation ensures accuracy, especially when combined with the assertion at line 249 that validates all messages were received.


271-272: LGTM - Improved documentation for logging configuration.

The added comments clearly document the default log level and how to override it via environment variables.


410-413: LGTM - Clear separation of batch and single message counts.

The new constants PERF_TEST_BATCH_MESSAGE_COUNT and PERF_TEST_SINGLE_MESSAGE_COUNT provide clarity for different test scenarios, while the alias maintains backward compatibility.


362-406: LGTM - Consistent usage of batch and single message constants.

The updated function calls correctly use PERF_TEST_BATCH_MESSAGE_COUNT for batch operations and PERF_TEST_SINGLE_MESSAGE_COUNT for single operations, with corresponding performance calculations.

Comment thread benches/performance_bench.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e03f76b and a8cc0ce.

📒 Files selected for processing (11)
  • README.md
  • benches/performance_bench.rs
  • src/command_handler.rs
  • src/endpoints/mqtt.rs
  • src/route.rs
  • src/type_handler.rs
  • tests/integration/common.rs
  • tests/integration/docker-compose/nats.yml
  • tests/integration/route.rs
  • tests/integration_test.rs
  • tests/memory_test.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • benches/performance_bench.rs
🧰 Additional context used
🧬 Code graph analysis (4)
tests/memory_test.rs (1)
src/route.rs (1)
  • run_until_err (95-108)
tests/integration_test.rs (2)
tests/integration/kafka.rs (1)
  • test_kafka_performance_direct (58-88)
tests/integration/amqp.rs (1)
  • test_amqp_performance_direct (49-73)
src/route.rs (1)
src/endpoints/mod.rs (3)
  • create_publisher_from_route (163-174)
  • consumer (370-372)
  • create_consumer_from_route (32-44)
tests/integration/common.rs (1)
src/endpoints/memory.rs (1)
  • len (76-78)
🪛 markdownlint-cli2 (0.18.1)
README.md

10-10: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


11-11: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


12-12: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)

🔇 Additional comments (27)
src/type_handler.rs (2)

65-69: LGTM! Idiomatic Default implementation.

The Default trait implementation correctly delegates to TypeHandler::new(), providing a conventional default constructor path without altering existing behavior.


399-400: Test calls updated to match extended Route API.

The run_until_err calls now pass two additional None arguments, aligning with the new signature that accepts optional shutdown_rx and ready_tx parameters.

tests/integration_test.rs (1)

20-29: LGTM! Test reordering improves organization.

Moving Kafka and AMQP direct performance tests earlier in the sequence is a sensible organizational change that doesn't affect test correctness.

tests/integration/common.rs (4)

24-27: Performance test constants centralized effectively.

The new constants (PERF_TEST_BATCH_MESSAGE_COUNT, PERF_TEST_SINGLE_MESSAGE_COUNT) provide clear separation between batch and single-message test scenarios. The alias PERF_TEST_MESSAGE_COUNT maintains backward compatibility.


101-101: Good addition: Docker volume cleanup.

Adding the -v flag ensures volumes are removed during teardown, preventing test state leakage between runs.


174-259: Enhanced pipeline test with progress tracking.

The refactored run_pipeline_test_internal now includes:

  • Progress logging every 5 seconds
  • Extended timeout for performance tests (70s)
  • Proper shutdown sequencing
  • Post-shutdown message draining

These improvements enhance observability and reliability of integration tests.


209-209: Integration tests are not executed in the CI pipeline (only unit and binary tests via cargo test --lib --bins), so the concern about slower CI environments does not apply. The 70-second timeout is only used during local performance test runs. Additionally, the code is already defensive: it breaks early once all messages are received (line 218) and logs progress every 5 seconds, making it easy to diagnose slow runs. No evidence exists in the codebase of timeout failures or inadequate duration.

src/endpoints/mqtt.rs (9)

48-77: Improved MQTT publish API with optimization.

The refactored Client::publish method now:

  • Accepts CanonicalMessage directly (cleaner API)
  • For V3, conditionally serializes to JSON only when metadata is present
  • For V5, uses native user_properties (better performance)

This optimization reduces serialization overhead for metadata-free messages.


79-84: Subscribe capability added to Client.

The new subscribe method properly routes V3/V5 subscribe calls to the appropriate backend implementations. This unification simplifies subscription logic.


100-101: Background event loop spawned correctly.

The publisher now spawns a dedicated background task for the event loop with proper lifecycle management. The Arc<JoinHandle> allows reference counting for cleanup.


137-148: Enhanced tracing for publish operations.

The tracing now includes message_id, topic, and payload_size, providing better observability for debugging and monitoring.


151-163: Batch tracing with LazyMessageIds.

The batch send now uses LazyMessageIds for efficient logging of message IDs without eager formatting. This is a good performance optimization for tracing.


194-202: EventWrapper and EventLoop enums unify V3/V5 handling.

The introduction of these enums with boxed event loops provides a clean abstraction for handling both MQTT protocol versions. Boxing helps manage stack size for large types.


242-278: Improved batch reception with yield strategy.

The receive_batch implementation uses a smart batching strategy:

  1. Blocks for the first message
  2. Yields to allow producer to run
  3. Greedily drains available messages

This approach balances latency and throughput effectively.


335-381: Unified event loop with proper error handling.

The refactored run_eventloop provides:

  • Unified handling for V3/V5 via EventWrapper
  • Proper error logging with reconnection logic
  • Optional message channel for subscriber pattern

The reconnection delay (1 second) prevents tight reconnection loops.


398-403: Smart V3 message deserialization with fallback.

The V3 publish handler now attempts to deserialize as CanonicalMessage first, falling back to raw payload. This supports both serialized and raw message flows.

tests/memory_test.rs (1)

38-38: Test updated for extended Route API.

The run_until_err call now passes two additional None arguments for the optional shutdown_rx and ready_tx parameters, matching the updated signature.

tests/integration/route.rs (1)

49-49: Test calls updated consistently.

All three test cases (test_route_with_typed_handler_success, test_route_with_typed_handler_failure_deserialization, test_route_with_typed_handler_failure_handler) now pass the additional None arguments to run_until_err, maintaining consistency with the extended API.

src/command_handler.rs (2)

48-48: Simplified error conversion.

Changed from explicit Err(e.into()) to Err(e), relying on the From implementation for automatic conversion from HandlerError to PublisherError. This is cleaner and more idiomatic.


85-195: Tests updated with Into-based message construction.

The tests now use .into() for string-to-CanonicalMessage conversion (e.g., "command1".into()), which is more ergonomic than explicit constructor calls. This aligns with the new From<&str> implementation for CanonicalMessage.

README.md (3)

2-2: Clearer positioning as asynchronous message library.

The updated description better clarifies that mq-bridge is an "asynchronous message library" and emphasizes its role as a "programmable integration layer" rather than a simple bridge. This helps set appropriate expectations.


15-30: Excellent addition: Philosophy & Focus section.

The new section clearly articulates:

  • When to use mq-bridge (hybrid messaging, infrastructure abstraction, resilient pipelines)
  • When NOT to use it (stateful stream processing, domain aggregate management, specialization needs)

This guidance helps developers make informed decisions about adopting the library.


166-171: Examples updated to reflect API changes.

The code examples now demonstrate:

  • Using .into() for ergonomic message construction (line 166)
  • The extended run_until_err signature with optional parameters (line 171)

These updates keep the documentation in sync with the code.

src/route.rs (4)

95-108: LGTM - Clean API extension for readiness signaling.

The ready_tx parameter is appropriately optional, and propagation to both execution paths is correct. The internal channel fallback (lines 101-102) maintains backward compatibility for callers without shutdown channels.


115-121: LGTM - Readiness signaling correctly placed after resource initialization.

Signaling after create_publisher_from_route and create_consumer_from_route ensures the route is truly ready. Ignoring the send result with let _ = is appropriate since the receiver may have dropped. The removal of .into() on line 142 is correct as e is already anyhow::Error.

Also applies to: 142-142


177-183: LGTM - Consistent readiness pattern with sequential path.

The placement mirrors run_sequentially, signaling readiness after resource creation but before starting the worker pool. The .into() removal on line 260 is correct.

Also applies to: 260-260


51-59: LGTM - Correctly wired ready signal into the spawned task.

The ready_tx_clone is properly moved into the inner task for signaling once initialization completes.

Comment thread src/route.rs
image: nats:2.12.2-alpine3.22
container_name: nats-itest-mq-bridge
command: "-js" # Enable JetStream
command: "-js -sd /data" # Enable JetStream and set store dir

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

NATS 2.12.2 JetStream -sd store directory flag default path alpine

💡 Result:

The JetStream store default is /tmp/nats/jetstream (unless you set it with -sd / --store_dir). This applies on Alpine too (i.e. /tmp exists there). (beta-docs.nats.io)

Citations:


🏁 Script executed:

# Read the full nats.yml file to understand the complete configuration
cat tests/integration/docker-compose/nats.yml

Repository: marcomq/mq-bridge

Length of output: 425


Use the default JetStream store directory or remove the -sd flag for this test environment.

The JetStream store directory in NATS 2.12.2 defaults to /tmp/nats/jetstream, not /data. Using a non-standard path like -sd /data without a corresponding volume mount creates an ephemeral directory that NATS may not automatically create. Since this is an integration test with no persistence requirements, either remove the -sd /data flag entirely to use the default, or change it to -sd /tmp/nats/jetstream to explicitly use the standard location.

🤖 Prompt for AI Agents
In tests/integration/docker-compose/nats.yml around line 5 the NATS command sets
JetStream store dir to /data via "-sd /data", which is non-standard and not
volume-mounted; update the service to either remove the "-sd /data" flag so NATS
uses the default JetStream store (/tmp/nats/jetstream) for this ephemeral test
environment, or change the flag to "-sd /tmp/nats/jetstream" and ensure a
matching volume mount if you intend to keep an explicit path.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
src/route.rs (1)

92-106: Consider replacing busy-wait with a blocking wait or async alternative.

The current implementation uses try_recv() in a loop with thread::sleep(50ms), which is a busy-wait pattern. While it works, it's inefficient and performs up to 100 iterations over 5 seconds.

Consider using recv_timeout() with a crossbeam or std::sync::mpsc channel for the readiness signal, or restructure to use async waiting patterns.

🔎 Alternative approach using crossbeam_channel

Replace async_channel::bounded for the ready signal with crossbeam_channel::bounded:

let (ready_tx, ready_rx) = crossbeam_channel::bounded(1);

Then replace the busy-wait loop with:

match ready_rx.recv_timeout(std::time::Duration::from_secs(5)) {
    Ok(()) => Ok((handle, shutdown_tx)),
    Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
        handle.abort();
        Err(anyhow::anyhow!("Route '{}' failed to start within 5 seconds", name_str))
    }
    Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
        handle.abort();
        Err(anyhow::anyhow!("Route '{}' failed to start (channel closed)", name_str))
    }
}

Note: This requires adding crossbeam-channel as a dependency, but eliminates the busy-wait.

tests/integration/common.rs (3)

24-27: Consider removing the redundant alias constant.

PERF_TEST_MESSAGE_COUNT is simply an alias to PERF_TEST_BATCH_MESSAGE_COUNT. Consider using PERF_TEST_BATCH_MESSAGE_COUNT directly throughout the codebase to reduce redundancy and improve clarity about which test mode is being referenced.


232-236: Consider logging shutdown errors instead of silently ignoring them.

Shutdown operations use let _ = to ignore potential errors, which could hide issues during test cleanup. While test assertions will still catch message processing problems, logging shutdown errors would improve debuggability.

🔎 Proposed fix to log shutdown errors
-    let _ = in_shutdown.send(()).await;
-    let _ = out_shutdown.send(()).await;
-    let _ = in_handle.await;
-    let _ = out_handle.await;
+    if let Err(e) = in_shutdown.send(()).await {
+        eprintln!("Warning: Failed to send shutdown signal to in_route: {:?}", e);
+    }
+    if let Err(e) = out_shutdown.send(()).await {
+        eprintln!("Warning: Failed to send shutdown signal to out_route: {:?}", e);
+    }
+    if let Err(e) = in_handle.await {
+        eprintln!("Warning: in_route join handle error: {:?}", e);
+    }
+    if let Err(e) = out_handle.await {
+        eprintln!("Warning: out_route join handle error: {:?}", e);
+    }

252-258: Document why single-message metrics are zero for pipeline tests.

Pipeline performance results intentionally set single_write_performance and single_read_performance to 0.0 because pipeline tests only measure batch throughput. However, this produces potentially confusing output in the consolidated results table where these columns show zero. Consider either:

  1. Adding a comment explaining this design decision
  2. Using Option<f64> to distinguish "not measured" from "zero performance"
  3. Creating separate result types for pipeline vs. direct tests
💡 Example: Add clarifying comment
     add_performance_result(PerformanceResult {
         test_name: format!("{} Pipeline", broker_name),
         write_performance: messages_per_second,
         read_performance: messages_per_second,
+        // Pipeline tests don't measure single-message performance
         single_write_performance: 0.0,
         single_read_performance: 0.0,
     });
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8cc0ce and ffd2b7f.

📒 Files selected for processing (2)
  • src/route.rs
  • tests/integration/common.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/route.rs (3)
src/type_handler.rs (1)
  • handle (134-149)
src/traits.rs (2)
  • handle (21-21)
  • handle (36-38)
src/endpoints/mod.rs (3)
  • create_publisher_from_route (163-174)
  • consumer (370-372)
  • create_consumer_from_route (32-44)
🪛 GitHub Actions: Continuous Integration
src/route.rs

[error] 102-102: Rust compile error E0382: borrow of moved value: name.

🪛 GitHub Check: Cargo Check
src/route.rs

[failure] 102-102:
borrow of moved value: name


[failure] 102-102:
borrow of moved value: name

🪛 GitHub Check: Cargo Test
src/route.rs

[failure] 102-102:
borrow of moved value: name

🔇 Additional comments (13)
src/route.rs (7)

33-35: API change to return Result improves startup error handling.

The addition of the readiness channel and the Result return type allows callers to detect startup failures within a timeout period. This is a breaking API change but provides better error handling semantics.


53-57: Readiness signaling properly threaded through retry loop.

The code correctly passes ready_tx_clone to each iteration of run_until_err, allowing each startup attempt to signal readiness independently.


114-122: Clean delegation of readiness signal to runners.

The updated signature and dispatch logic properly threads the readiness signal to both sequential and concurrent execution paths.


130-136: Readiness signaled at the right time in sequential path.

The code correctly signals readiness after consumer and publisher initialization completes, ensuring the route is truly ready before the outer loop proceeds.


192-198: Readiness signaling consistent with sequential path.

The concurrent runner follows the same pattern as the sequential runner, signaling readiness after initialization but before message processing begins.


157-157: Error type preservation improves error context.

The change from Err(e.into()) to Err(e) directly returns the anyhow::Error from ConsumerError::Connection(e) without conversion, preserving the original error context.


275-275: Consistent error handling with sequential path.

This matches the error handling improvement in the sequential runner at line 157, preserving the original error type.

tests/integration/common.rs (6)

44-54: LGTM!

The performance result collection is properly synchronized using a Mutex, and the formatting provides clear feedback during test execution.


283-285: LGTM!

The comments clearly document the logging configuration and how to override it via environment variables.


310-315: LGTM!

The formatted banner clearly communicates the batch and single message counts used in performance tests, making the results table easier to interpret.


359-425: LGTM!

The direct performance test correctly uses the new batch and single message constants, and the performance calculations properly convert elapsed time to messages per second. The test ordering and cleanup logic are appropriate.


101-101: The -v flag is appropriate for test cleanup.

The docker-compose files in this test suite do not define persistent data volumes—services store state internally in ephemeral container storage. Removing volumes with -v ensures clean test isolation without affecting debugging, since there is no test data persisted to volumes. This cleanup behavior is correct and beneficial.


186-196: Route naming convention is already enforced consistently across all tests.

Routes are defined inline in Rust test files (as CONFIG_YAML constants), not in separate YAML configuration files. All current test implementations correctly follow the memory_to_{broker} and {broker}_to_memory naming pattern: kafka, mqtt, mongodb, nats, and amqp tests all match their broker names to the route names. The code at lines 186-187 correctly derives and looks up these names by lowercasing the broker_name parameter, and mismatches will naturally panic with clear error messages. No YAML configuration files exist to verify as the route definitions are embedded in the test files themselves.

Likely an incorrect or invalid review comment.

Comment thread src/route.rs Outdated
@marcomq marcomq merged commit c467164 into main Dec 31, 2025
2 of 3 checks passed

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
tests/integration/common.rs (4)

24-27: Add documentation for public performance testing constants.

These public constants lack documentation explaining their purpose and usage. Consider adding doc comments to clarify when and how they should be used.

📝 Suggested documentation
+/// Number of messages to use for batch performance tests.
 pub const PERF_TEST_BATCH_MESSAGE_COUNT: usize = 150_000;
+/// Number of messages to use for single-message performance tests.
 pub const PERF_TEST_SINGLE_MESSAGE_COUNT: usize = 15_000;
+/// Alias for PERF_TEST_BATCH_MESSAGE_COUNT for backward compatibility.
 pub const PERF_TEST_MESSAGE_COUNT: usize = PERF_TEST_BATCH_MESSAGE_COUNT;
+/// Concurrency level for performance tests.
 pub const PERF_TEST_CONCURRENCY: usize = 100;

180-196: Consider more defensive YAML parsing.

The current implementation panics if the YAML structure is unexpected. While this is acceptable for test code, adding more descriptive error messages could aid debugging when tests fail.

💡 Suggested improvement
 let yaml_val: serde_yaml_ng::Value =
-    serde_yaml_ng::from_str(config_yaml).expect("Failed to parse YAML config");
+    serde_yaml_ng::from_str(config_yaml)
+        .expect(&format!("Failed to parse YAML config for broker: {}", broker_name));
 let routes_val = yaml_val.get("routes").expect("YAML must have 'routes' key");
 let routes: std::collections::HashMap<String, Route> =
-    serde_yaml_ng::from_value(routes_val.clone()).expect("Failed to parse routes");
+    serde_yaml_ng::from_value(routes_val.clone())
+        .expect(&format!("Failed to parse routes for broker: {}", broker_name));

209-209: Consider documenting the 70-second timeout rationale.

The timeout increase from 30s to 70s for performance tests seems reasonable but lacks justification. Consider adding a comment explaining why this specific value was chosen.

📝 Suggested comment
 let timeout = if is_performance_test {
+    // Longer timeout for performance tests handling 150k messages
     Duration::from_secs(70)
 } else {
     Duration::from_secs(30)
 };

252-258: Consider clarifying pipeline performance metrics semantics.

The pipeline test sets write_performance and read_performance to the same messages_per_second value, which might be misleading since pipeline tests measure end-to-end throughput, not separate write/read operations. Additionally, the single metrics are set to 0.0.

Consider either:

  1. Using a single "throughput" metric for pipeline tests
  2. Adding a comment explaining that write/read are identical for pipeline tests
  3. Renaming the fields to better reflect what's being measured
📝 Suggested comment
 add_performance_result(PerformanceResult {
     test_name: format!("{} Pipeline", broker_name),
+    // Pipeline tests measure end-to-end throughput, not separate write/read
     write_performance: messages_per_second,
     read_performance: messages_per_second,
+    // Pipeline tests don't measure single-message performance
     single_write_performance: 0.0,
     single_read_performance: 0.0,
 });
src/route.rs (1)

110-122: Public API change - ensure documentation is updated.

The run_until_err method signature now includes an optional ready_tx parameter. Since this is a public method, consider adding documentation explaining when callers should provide this parameter versus passing None.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ffd2b7f and 3af4819.

📒 Files selected for processing (2)
  • src/route.rs
  • tests/integration/common.rs
🧰 Additional context used
🧬 Code graph analysis (1)
tests/integration/common.rs (1)
src/endpoints/memory.rs (1)
  • len (76-78)
🔇 Additional comments (15)
tests/integration/common.rs (9)

45-52: LGTM! Enhanced logging improves observability.

The immediate printing of performance results provides valuable feedback during test execution.


101-101: LGTM! Volume cleanup prevents test pollution.

Adding the -v flag ensures volumes are removed during teardown, preventing state leakage between test runs.


174-178: LGTM! Descriptive parameter names improve readability.

The renamed parameters make the function signature more self-documenting.


198-201: LGTM! Explicit route names improve traceability.

Starting routes with explicit names enhances logging and debugging capabilities.


213-230: LGTM! Progress logging aids debugging of long-running tests.

The progress logging every 5 seconds provides valuable feedback during performance test execution.


232-238: Verify message loss during concurrent shutdown.

The code drains messages after shutdown, which is good practice. However, there's a potential race condition: messages might be lost if they arrive between the final drain_messages() in the wait loop (line 217) and the shutdown signals (lines 232-233).

Consider adding a small delay after shutdown before the final drain to ensure all in-flight messages are processed:

💡 Suggested improvement
 let _ = in_shutdown.send(()).await;
 let _ = out_shutdown.send(()).await;
 let _ = in_handle.await;
 let _ = out_handle.await;
+
+// Brief delay to allow in-flight messages to complete
+tokio::time::sleep(Duration::from_millis(100)).await;

 // Drain any remaining messages that arrived during shutdown
 received.extend(harness.out_channel.drain_messages());

283-284: LGTM! Enhanced comments improve test usability.

The added documentation clearly explains how to control log levels during test execution.


310-314: LGTM! Clearer performance summary output.

The updated banner clearly distinguishes between batch and single message test configurations.


379-423: LGTM! Consistent use of performance constants.

The refactoring to use named constants improves maintainability and makes the performance calculations easier to understand.

src/route.rs (6)

33-38: LGTM!

The signature change to return Result properly reflects the fallible startup behavior. The bounded(1) channel is appropriate for one-shot readiness signaling, and name_str is correctly preserved for error messages before moving name into the Arc.


48-59: LGTM!

The ready_tx is correctly cloned per iteration and passed to run_until_err. After the first successful startup, subsequent reconnect attempts will have their send ignored (channel already consumed), which is the intended behavior.


134-136: LGTM!

Readiness is signaled after both publisher and consumer are successfully created, which correctly indicates the route is operational. Ignoring the send result is appropriate since the receiver may have already timed out.


157-157: LGTM!

Removing the redundant .into() is a clean improvement since ConsumerError::Connection already wraps an anyhow::Error.


196-198: LGTM!

Consistent with run_sequentially - readiness is signaled after successful connection setup.


91-95: The close() call on a cloned receiver affects all clones globally.

The web search confirms that async_channel::Receiver::close() closes the underlying channel for all Receiver and Sender clones, since they share the same channel state. The first concern in the original comment is incorrect—calling close() on ready_rx_clone does close the channel that ready_rx.recv_blocking() is waiting on.

However, there is a real issue with the error handling flow. If run_until_err fails before signaling readiness, the error path (lines 76-83) sleeps for 5 seconds. During this time, the startup thread blocks on recv_blocking(). After those 5 seconds, the timeout task fires and closes the channel, causing recv_blocking() to fail with a channel-closed error (line 104). This fails the startup even if a retry of run_until_err on the next loop iteration might have succeeded, since the channel is now permanently closed. The timeout is a hard deadline for the entire startup, not a per-attempt deadline.

Comment thread src/route.rs
Comment on lines +91 to +106
let ready_rx_clone = ready_rx.clone();
let timeout = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
ready_rx_clone.close();
});

match ready_rx.recv_blocking() {
Ok(_) => {
timeout.abort();
Ok((handle, shutdown_tx))
},
Err(_) => {
handle.abort();
Err(anyhow::anyhow!("Route '{}' failed to start within 5 seconds or encountered an error", name_str))
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the run() function definition to understand its signature
rg -n "fn run\(" src/route.rs -A 2

Repository: marcomq/mq-bridge

Length of output: 257


🏁 Script executed:

#!/bin/bash
# Search for all calls to .run( to see calling patterns
rg -n "\.run\(" --type rust -g '!target/*' -B 2 -A 3

Repository: marcomq/mq-bridge

Length of output: 616


🏁 Script executed:

#!/bin/bash
# Check for any #[tokio::main] or runtime setup in main/tests
rg -n "#\[tokio" --type rust -g '!target/*'

Repository: marcomq/mq-bridge

Length of output: 2267


🏁 Script executed:

#!/bin/bash
# Look at the broader context of the file to understand execution flow
head -100 src/route.rs

Repository: marcomq/mq-bridge

Length of output: 4415


🏁 Script executed:

#!/bin/bash
# Check if there's a main.rs or binary targets
fd -n "main\.rs" --type f

Repository: marcomq/mq-bridge

Length of output: 291


🏁 Script executed:

#!/bin/bash
# Check for examples that might show usage
ls -la examples/ 2>/dev/null || echo "No examples directory"

Repository: marcomq/mq-bridge

Length of output: 82


🏁 Script executed:

#!/bin/bash
# Check Cargo.toml to see if this is a library or binary
head -30 Cargo.toml

Repository: marcomq/mq-bridge

Length of output: 867


🏁 Script executed:

#!/bin/bash
# Search for any documentation comments about runtime requirements
rg -n "runtime\|Runtime\|tokio::spawn" src/route.rs -B 2 -A 2

Repository: marcomq/mq-bridge

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Search for main.rs in the src directory
find src -name "main.rs" -type f

Repository: marcomq/mq-bridge

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check the full Cargo.toml to see if there are binary targets defined
cat Cargo.toml | grep -A 5 "\[\[bin\]\]"

Repository: marcomq/mq-bridge

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Search for tokio::spawn usage and any warnings/docs about runtime
rg "tokio::spawn" src/route.rs -B 1 -A 1

Repository: marcomq/mq-bridge

Length of output: 457


🏁 Script executed:

#!/bin/bash
# Check the actual context of where run() is called to understand if runtime is guaranteed
rg "fn run\(" src/route.rs -B 5 -A 15

Repository: marcomq/mq-bridge

Length of output: 1286


🏁 Script executed:

#!/bin/bash
# Check if there's documentation about using the library
ls -la | grep -i readme

Repository: marcomq/mq-bridge

Length of output: 121


🏁 Script executed:

#!/bin/bash
# Check if there's any initialization guard or docs at crate root
head -50 src/lib.rs

Repository: marcomq/mq-bridge

Length of output: 690


🏁 Script executed:

#!/bin/bash
# Verify: Can recv_blocking actually be called without panic from tokio::spawn?
# Check if tokio::spawn on line 43 would panic if no runtime exists
rg -n "pub fn run\|tokio::spawn" src/route.rs | head -20

Repository: marcomq/mq-bridge

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Check README for runtime requirements or usage instructions
cat README.md | head -100

Repository: marcomq/mq-bridge

Length of output: 6030


🏁 Script executed:

#!/bin/bash
# Check if run() is documented with any caveats about Tokio runtime
rg -n "run\(" src/route.rs -B 10 | grep -A 10 "//\|///"

Repository: marcomq/mq-bridge

Length of output: 495


🏁 Script executed:

#!/bin/bash
# Let's verify: does tokio::spawn actually require a runtime to be active?
# Check what happens when spawn is called - it either panics or returns a handle
# The key is: the first spawn on line 43 happens BEFORE recv_blocking on line 99
# So if there's no runtime, panic happens at line 43, not 99
# But the real question: is the review comment claiming this is a problem?
cat src/route.rs | sed -n '33,110p'

Repository: marcomq/mq-bridge

Length of output: 3577


🏁 Script executed:

#!/bin/bash
# Verify if README mentions tokio runtime requirement for run()
grep -i "tokio\|runtime\|#\[tokio::main\]" README.md

Repository: marcomq/mq-bridge

Length of output: 592


🏁 Script executed:

#!/bin/bash
# Check if there's any guard or assertion about runtime in the code
rg -n "Handle::current\|Runtime\|try_current" src/route.rs

Repository: marcomq/mq-bridge

Length of output: 43


Document that run() requires an active Tokio runtime.

This function calls tokio::spawn() at line 43, which will panic if called outside a Tokio runtime context (e.g., from a non-async main or outside #[tokio::main]). The requirement is not documented in the function's doc comment or README, making it easy for users to encounter an unexpected panic. Add a note clarifying that run() must be invoked from within a Tokio runtime.

🤖 Prompt for AI Agents
In src/route.rs around lines 91 to 106, the run() function calls tokio::spawn
and therefore will panic if invoked outside an active Tokio runtime; update the
function's doc comment to clearly state that run() must be called from within a
Tokio runtime (e.g., from an async context or #[tokio::main]) and add an entry
to the README or API docs noting this requirement. Optionally, add a runtime
check at the start of run() (e.g., using tokio::runtime::Handle::try_current())
to return a descriptive Err instead of allowing tokio::spawn to panic, but at
minimum document the requirement in comments/docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant