Conversation
|
Warning Rate limit exceeded@marcomq has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 20 minutes and 44 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds feature-gated ZeroMQ endpoints and tests; introduces Delay and CommitConcurrency middleware; refactors deduplication to a TTL-state CAS; switches HTTP implementation to actix-web; enforces semaphore+JoinSet commit concurrency in routes; updates Cargo features, CI matrix, release workflow, schema, benches, and many tests. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Route as Route
participant Dedup as Deduplication
participant Publisher as ZeroMqPublisher
participant Zmq as ZeroMQ Socket
App->>Route: submit batch
Route->>Dedup: check message IDs (pending/processed/TTL)
alt not duplicate
Route->>Publisher: send_batch(serialized batch)
Publisher->>Zmq: send (PUSH/PUB/REQ)
alt REQ expects reply
Zmq-->>Publisher: reply (batch payload)
Publisher->>Route: return SentBatch/results
else no-reply
Publisher->>Route: return SentBatch (Ack)
end
Route->>Dedup: mark processed and invoke wrapped commit
else duplicate
Dedup-->>Route: skip publish and call original commit(None)
end
Route-->>App: processing complete / results
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @mq-bridge.schema.json:
- Around line 922-928: Update the JSON schema's "batch_size" property so its
default and description match the code (default should be 1); specifically
change the "default" value from 128 to 1 and update the description text to
state "Defaults to 1." Ensure the "batch_size" property name remains unchanged
so it aligns with the defaults applied in src/models.rs.
In @src/middleware/deduplication.rs:
- Around line 57-82: The deduplication has a race: you call
self.db.contains_key(&key) then only call db.insert(&key_clone, ...) inside the
commit closure, so concurrent consumers can both pass contains_key and process
the same message; change to an atomic insert-at-receive then update-at-commit
pattern: on receipt, attempt an insert-if-absent (use the same db instance used
in the closure) to write a "pending" marker for key (treat insert failure as
duplicate and skip), proceed with processing, and in the commit closure
replace/update that pending marker to a final "processed" value (or remove it on
failure); update references to self.db.contains_key, db.insert, original_commit,
key/key_clone and now to implement this insert-at-receive/update-at-commit
behavior.
🧹 Nitpick comments (5)
benches/performance_bench.rs (1)
298-331: Port/path coordination between publisher and consumer is fragile.The current logic relies on implicit ordering:
create_consumermust be called beforecreate_publisherfor them to use the same IPC path. If the call order changes, they'll connect to different sockets.
create_consumer: increments PORT viafetch_add, then usesPORT + 1create_publisher: only loads PORT without incrementingWhile the
run_benchmarks!macro does call consumer first (line 378 before 380), this coupling is non-obvious and error-prone.♻️ Suggested refactor: Make the pairing explicit
#[cfg(feature = "zeromq")] mod zeromq_helper { use super::*; use mq_bridge::endpoints::zeromq::{ZeroMqConsumer, ZeroMqPublisher}; use mq_bridge::models::{ZeroMqConfig, ZeroMqEndpoint, ZeroMqSocketType}; use rand::Rng; use std::sync::atomic::{AtomicU16, Ordering}; static PORT: Lazy<AtomicU16> = Lazy::new(|| { let mut rng = rand::rng(); AtomicU16::new(rng.random_range(10000..60000)) }); + fn next_ipc_path() -> String { + let port = PORT.fetch_add(1, Ordering::SeqCst); + format!("/tmp/mq-bridge-{}.sock", port) + } + + static CURRENT_PATH: Lazy<Mutex<String>> = Lazy::new(|| Mutex::new(String::new())); + pub async fn create_publisher() -> Arc<dyn MessagePublisher> { - let port = PORT.load(Ordering::SeqCst); + let path = CURRENT_PATH.lock().await.clone(); let config = ZeroMqEndpoint { topic: None, config: ZeroMqConfig { - url: format!("ipc:///tmp/mq-bridge-{}.sock", port), + url: format!("ipc://{}", path), socket_type: Some(ZeroMqSocketType::Push), bind: false, internal_buffer_size: Some(PERF_TEST_MESSAGE_COUNT + 1), }, }; Arc::new(ZeroMqPublisher::new(&config).await.unwrap()) } pub async fn create_consumer() -> Arc<Mutex<dyn MessageConsumer>> { - let port = PORT.fetch_add(1, Ordering::SeqCst) + 1; - let path = format!("/tmp/mq-bridge-{}.sock", port); + let path = next_ipc_path(); let _ = std::fs::remove_file(&path); + *CURRENT_PATH.lock().await = path.clone(); let config = ZeroMqEndpoint { topic: None, config: ZeroMqConfig { url: format!("ipc://{}", path),tests/integration/ibm_mq.rs (1)
79-81: Inconsistent configuration approach and batch size.This test sets
concurrencyandbatch_sizeprogrammatically, while other integration tests (NATS, Kafka, Memory, AWS) use YAML configuration. Additionally,batch_size: 50differs from other tests (128 for NATS/Kafka/Memory, 10 for AWS).Consider:
- Standardizing configuration approach: Align with the YAML pattern used in other tests for consistency.
- Documenting batch size rationale: Clarify why IBM MQ uses 50 vs 128 in other tests, or standardize if no specific reason exists.
- PR objective alignment: The PR states the default batch size is now 1 to reduce message loss, but this test uses 50. Add tests validating the default behavior.
tests/integration/memory.rs (1)
5-5: Unused constant:PERF_TEST_CONCURRENCY.The constant
PERF_TEST_CONCURRENCYis defined but never used in this file. Meanwhile, the route YAML configurations hard-codeconcurrency: 4.Consider either removing the unused constant or refactoring the YAML config to reference it for consistency and maintainability.
tests/integration/aws.rs (1)
13-14: Document batch size choice and verify PR objective alignment.AWS routes use
batch_size: 10, which differs from other backends (128 for NATS/Kafka/Memory, 50 for IBM MQ). While this may align with AWS SQS API constraints (ReceiveMessage supports up to 10 messages), the rationale should be documented.Additionally, the PR summary states the default batch size is now 1 to reduce message loss, but this test uses 10. Ensure default behavior is tested elsewhere.
Also applies to: 26-27
src/endpoints/zeromq.rs (1)
245-279: Consider simplifying Rep socket receive logic.The current Rep socket handling (lines 245-278) is complex with a dummy return and follow-up conditional. The comment on line 271 acknowledges this awkwardness. While functionally correct, the flow could be clearer.
The pattern of sending the item inside the match, waiting for reply, then returning a dummy and checking
if let ReceiverSocket::Rep(_)afterward works but is hard to follow.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (24)
.github/workflows/ci.yml.github/workflows/release.ymlCargo.tomlREADME.mdbenches/performance_bench.rsmq-bridge.schema.jsonsrc/canonical_message.rssrc/endpoints/mod.rssrc/endpoints/zeromq.rssrc/middleware/deduplication.rssrc/models.rssrc/route.rstests/README.mdtests/integration/amqp.rstests/integration/aws.rstests/integration/common.rstests/integration/ibm_mq.rstests/integration/kafka.rstests/integration/memory.rstests/integration/mongodb.rstests/integration/mqtt.rstests/integration/nats.rstests/integration/route.rstests/integration_test.rs
💤 Files with no reviewable changes (1)
- .github/workflows/release.yml
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-01-07T16:21:12.817Z
Learnt from: marcomq
Repo: marcomq/mq-bridge PR: 9
File: .github/workflows/test-matrix.yml:44-48
Timestamp: 2026-01-07T16:21:12.817Z
Learning: In the mq-bridge repo, CI should exclude IBM MQ client dependencies from test runs but include them for clippy checks. Configure tests to run with --features=full (which excludes ibm-mq), while clippy should run with --all-features (including ibm-mq) so code is verified without requiring IBM MQ client libraries on all platforms. Apply this pattern to workflow files under .github/workflows to ensure consistent behavior across test matrices.
Applied to files:
.github/workflows/ci.yml
📚 Learning: 2026-01-07T16:21:12.817Z
Learnt from: marcomq
Repo: marcomq/mq-bridge PR: 9
File: .github/workflows/test-matrix.yml:44-48
Timestamp: 2026-01-07T16:21:12.817Z
Learning: In the mq-bridge project, IBM MQ (ibm-mq feature) should be excluded from test runs but included in clippy checks. Tests use --features=full (excluding ibm-mq), while clippy uses --all-features (including ibm-mq) to verify code without requiring IBM MQ client libraries on all platforms.
Applied to files:
tests/README.md
🧬 Code graph analysis (4)
src/route.rs (3)
src/endpoints/memory.rs (7)
is_empty(81-83)len(76-78)send_batch(134-149)new(38-41)new(106-112)new(165-172)new(235-240)src/endpoints/mongodb.rs (1)
send_batch(140-199)src/traits.rs (5)
send_batch(114-117)send_batch(150-155)send_batch(172-177)send_batch(322-327)send_batch(383-395)
tests/integration/route.rs (3)
src/route.rs (1)
new(25-32)src/canonical_message.rs (2)
new(23-29)from_type(39-42)src/middleware/retry.rs (1)
new(15-17)
src/endpoints/zeromq.rs (5)
src/models.rs (2)
new(185-192)channel(208-213)tests/integration/common.rs (3)
new(62-66)new(134-148)as_any(611-613)src/canonical_message.rs (7)
new(23-29)s(64-64)from(112-114)from(118-120)from(124-126)from(130-132)from(144-149)src/type_handler.rs (1)
default(66-68)src/endpoints/mod.rs (1)
consumer(429-431)
tests/integration/common.rs (2)
src/endpoints/mongodb.rs (1)
serde_json(71-71)src/endpoints/file.rs (1)
serde_json(168-168)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (http)
- GitHub Check: Build (aws)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (full)
- GitHub Check: Build (nats)
- GitHub Check: Build (amqp)
- GitHub Check: Test on ubuntu-latest with Rust stable
- GitHub Check: Test on ubuntu-latest with Rust beta
- GitHub Check: Test
- GitHub Check: Documentation
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Performance Benchmarks
- GitHub Check: Integration Tests
🔇 Additional comments (43)
tests/integration_test.rs (1)
6-6: LGTM!The
#[allow(dead_code)]attribute is appropriate here sinceshould_runis only called within feature-gated blocks and would otherwise trigger a warning when certain features are disabled..github/workflows/ci.yml (1)
60-60: LGTM!Adding
zeromqto the build matrix ensures CI validates the new ZeroMQ feature. The existing pattern for IBM MQ exclusion from tests (using--features=full) while including it in clippy checks (--all-features) is correctly preserved. Based on learnings, this aligns with the project's CI strategy.tests/README.md (1)
1-1: LGTM!The test command documentation correctly shows the proper flag ordering and spacing for integration and memory tests.
Also applies to: 11-11
README.md (1)
20-20: LGTM!The README updates comprehensively document ZeroMQ support:
- Backend listed in features
- Endpoint behavior table shows ephemeral nature for both Consumer (PULL/REP) and Subscriber (SUB) modes
- Route 7 example demonstrates a valid PUSH/PULL pipeline configuration with
bindoptionsAlso applies to: 89-90, 410-423
benches/performance_bench.rs (2)
334-359: LGTM!The
memory_helpermodule correctly uses an atomic counter to generate unique topic names, avoiding collisions between benchmark iterations.
372-538: LGTM!The
run_benchmarks!macro consolidation and extendedbench_backend!variants significantly reduce code duplication. The new backends are properly integrated with appropriate feature gating (zeromq) or unconditional availability (memory).Also applies to: 556-570, 610-611
mq-bridge.schema.json (1)
1025-1077: LGTM!The ZeroMQ schema definitions are comprehensive:
ZeroMqEndpointincludes url (required), socket_type, bind, topic, and internal_buffer_sizeZeroMqSocketTypeenum covers all standard ZMQ patterns (push/pull, pub/sub, req/rep)- Proper defaults and constraints are applied
tests/integration/mongodb.rs (1)
14-15: LGTM!Explicitly setting
concurrency: 4andbatch_size: 128in the MongoDB route configurations improves test clarity and ensures consistent behavior across test runs rather than relying on defaults.Also applies to: 22-23
tests/integration/nats.rs (1)
12-13: Verify alignment with PR objective: default batch size of 1.The PR summary states the default batch size is being changed to 1 to reduce message loss, yet these test routes use
batch_size: 128. While this may be intentional for performance testing, it raises questions about whether the new default behavior is being adequately tested.Consider adding test cases that validate the default
batch_size: 1behavior to ensure message loss reduction is properly tested. Additionally, document why performance tests use 128 if the production default is now 1.Also applies to: 20-21
tests/integration/memory.rs (1)
12-13: Verify alignment with PR objective: default batch size of 1.The PR summary indicates the default batch size is being changed to 1 to reduce message loss, but these routes specify
batch_size: 128. While this may be appropriate for performance testing, ensure that default behavior (batch_size: 1) is tested elsewhere.Also applies to: 20-21
tests/integration/kafka.rs (1)
13-14: Verify alignment with PR objective: default batch size of 1.The PR summary states the default batch size is being changed to 1 to reduce message loss, yet these test routes use
batch_size: 128. While this may be intentional for performance benchmarking, consider adding test coverage for the new default behavior to ensure message loss reduction is properly validated.Also applies to: 27-28
tests/integration/amqp.rs (1)
13-14: LGTM! Consistent test configuration updates.The addition of
concurrency: 4andbatch_size: 128to both AMQP routes enables testing of parallelism and batching features. These values are consistent with similar updates across other integration tests (MQTT, Kafka, etc.) as noted in the PR summary.Also applies to: 21-22
tests/integration/mqtt.rs (1)
13-14: LGTM! Consistent test configuration updates.The addition of
concurrency: 4andbatch_size: 128mirrors the AMQP test updates and enables testing of parallel processing and message batching for MQTT routes.Also applies to: 21-22
src/endpoints/mod.rs (3)
29-30: LGTM! Proper feature gating for ZeroMQ module.The ZeroMQ module is correctly exposed behind the
zeromqfeature flag, consistent with other optional endpoint modules.
135-142: LGTM! ZeroMQ consumer creation follows established patterns.The consumer creation logic properly handles both
SubscribeandConsumemodes, mirroring the approach used for Kafka, NATS, AMQP, and MQTT endpoints. The feature gating is correct.
287-290: LGTM! ZeroMQ publisher creation is consistent.The publisher creation follows the same pattern as other endpoint types with appropriate feature gating.
Cargo.toml (3)
3-3: LGTM! Appropriate version bump.The version increment from 0.1.5 to 0.1.6 is suitable for adding a new feature (ZeroMQ support) and stability fixes as described in the PR.
87-88: Nice addition! Schema generation support.The new
schemafeature enablingschemarsdependency will be useful for generating JSON schemas from the data models.
69-70: No action required; dependency is properly configured.The ZeroMQ dependency using
version = "0.4"is correctly specified. The semver constraint resolves to the latest 0.4.x version (0.4.1), which is not yanked and maintains no known security vulnerabilities across RustSec, GitHub advisories, or NVD databases. The optional feature gating is properly implemented. No updates or security patches are required.src/canonical_message.rs (1)
31-37: Remove unusednew_bytesmethod or add usage.The
new_bytesconstructor is not currently used anywhere in the codebase, including the ZeroMQ implementation. If this method was added as an optimization for future use, it should either be removed now to avoid dead code, or the implementation that uses it should be added to justify its inclusion.Likely an incorrect or invalid review comment.
tests/integration/route.rs (5)
7-10: LGTM!Import updates correctly include
MiddlewareandRetryMiddlewaretypes needed for the new middleware-based tests.
84-89: LGTM!The updated expectation correctly reflects the new behavior where non-retryable deserialization errors are logged and dropped, allowing the route to continue rather than crash.
91-121: LGTM!Good test coverage for the scenario where a retryable error occurs without middleware — verifying the route crashes as expected. The test properly closes the input channel to trigger the route to process the message.
123-174: LGTM!Excellent test for retry middleware behavior. The test correctly:
- Configures
RetryMiddlewarewithmax_attempts: 3- Uses an atomic counter to track attempts
- Fails with retryable errors for the first 2 attempts, then succeeds on the 3rd
- Verifies the route completes successfully and the message is published
208-209: LGTM!Updated expectation correctly reflects that non-retryable handler errors are now logged and dropped, allowing the route to continue.
src/models.rs (3)
47-49: Verify the batch size change impact on throughput.Changing
default_batch_sizefrom 128 to 1 significantly reduces batching, which may improve message delivery reliability but could impact throughput performance. The PR title mentions this is to "reduce message loss" — ensure this tradeoff is acceptable for your use case.
567-574: LGTM!
MqttProtocolenum is well-defined with proper serde attributes and V5 as the sensible default.
604-638: LGTM!ZeroMQ configuration structures follow the established patterns in the codebase. The
ZeroMqSocketTypeenum covers the common ZeroMQ socket patterns (Push/Pull, Pub/Sub, Req/Rep).One consideration:
socket_typeisOption<ZeroMqSocketType>with no#[serde(default)]on it, but it does have#[serde(default)]at the struct level which will make itNoneby default. The code inzeromq.rshandles this by defaulting toPushorPulldepending on context, which works correctly.tests/integration/common.rs (4)
118-120: LGTM!Simplified message generation using
format!and direct byte construction is cleaner and avoids unnecessary JSON parsing overhead.
428-434: LGTM!Using a static payload for performance tests is a good optimization — it eliminates per-message JSON serialization overhead and ensures consistent payload sizes across test runs.
471-485: LGTM!The multi-generator pattern effectively parallelizes message creation to prevent the producer from becoming a bottleneck. The message distribution logic correctly handles the remainder using
if i < num_messages % generator_count { 1 } else { 0 }.Dropping
txafter spawning ensures the channel closes when all generators complete.
702-715: LGTM!Consistent application of the multi-generator pattern in
measure_single_write_performance, matching the approach inmeasure_write_performance.src/route.rs (6)
9-9: LGTM!Import updates correctly add
PublisherErrorfor enhanced error handling andJoinSetfor improved task management.Also applies to: 15-15
148-168: LGTM!Good refactor to use
received_batchdirectly instead of destructuring into separate tuple. The code is cleaner and more readable.
173-189: LGTM!Improved error handling for partial batch failures:
- Correctly identifies if any retryable errors exist
- Propagates the first retryable error to trigger reconnection
- Logs and drops non-retryable failures with message IDs for debugging
- Commits successful messages when only non-retryable failures exist
218-264: LGTM!Good migration to
JoinSet-based worker management. The worker logic correctly:
- Reports retryable errors via error channel and breaks
- Logs and drops non-retryable failures individually
- Commits successful responses after handling failures
276-287: LGTM!Good defensive check — detecting unexpected worker completion or panics and shutting down the route. This prevents silent failures where workers die without processing the remaining queue.
324-324: LGTM!Clean shutdown using
join_set.join_next()loop to wait for all workers.src/endpoints/zeromq.rs (5)
1-14: LGTM!Clean module setup with appropriate imports for ZeroMQ integration, async channels, and trait implementations.
32-113: LGTM!Well-structured publisher implementation:
- Background task handles actual socket operations, keeping the publisher non-blocking
- Proper socket type dispatch for Push/Pub/Req patterns
- Bounded channel prevents unbounded memory growth
expects_replyflag correctly derived from socket type
116-154: LGTM!
MessagePublisherimplementation correctly handles:
- Batch serialization to JSON
- Request-reply pattern with oneshot channels for Req sockets
- Fire-and-forget pattern for Push/Pub sockets
- Appropriate error classification (retryable for channel issues, non-retryable for serialization)
358-381: Verifystd::sync::Mutexusage in async context.Using
std::sync::Mutex(line 366) inside an async closure is generally safe as long as the lock is not held across.awaitpoints. Here, the lock is acquired, modified, and released before any async operation, so this is correct.However, if this code evolves to include
.awaitwhile holding the lock, it could cause issues. Consider adding a comment or usingtokio::sync::Mutexif you want to be defensive against future changes.
423-459: LGTM!Good integration test for Push/Pull pattern. The test:
- Sets up consumer (bind) and publisher (connect) correctly
- Uses a fixed port which could cause issues in parallel tests, but acceptable for a single test
- Properly uses timeout to prevent hanging
- Verifies message content matches
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @Cargo.toml:
- Around line 69-71: The Cargo.toml currently depends on the Rust-native zeromq
crate (dependency name "zeromq" version "0.4") which is WIP and may lack
stability/feature parity for production; replace or make optional use of it by
switching to the C-backed wrapper "zeromq-src" (or document a clear opt-in) to
ensure production stability. Update the dependency declaration for "zeromq" to
either remove it or mark it as strictly optional with a clear feature, and add
"zeromq-src" as the recommended dependency (or add a feature that selects
"zeromq-src" instead), and update any related build/feature flags and README
notes so consumers use "zeromq-src" for production. Ensure references to the
dependency name "zeromq" in Cargo.toml features and code are adjusted to use the
new feature name or "zeromq-src" so builds remain consistent.
In @src/route.rs:
- Around line 245-248: The spawned commit task using tokio::spawn (wrapping
commit(None).await and drop(permit)) can be orphaned on shutdown; instead accept
and use a shared tokio::task::JoinSet passed into the workers and replace
tokio::spawn with join_set.spawn(async move { commit(None).await; drop(permit);
}); ensure the worker shutdown path awaits join_set.join_next() (or
aborts/awaits remaining handles) so in-flight commits are tracked, and keep the
permit drop after commit to preserve backpressure semantics.
🧹 Nitpick comments (1)
src/route.rs (1)
244-244: Inconsistent error handling:expectvsmap_errfor semaphore acquisition.The concurrent path uses
.expect("Semaphore closed")while the sequential path (line 175) uses.map_err(). A closed semaphore would cause a worker panic, which is caught byjoin_next(), but it's inconsistent and could mask the root cause.Consider using the same error propagation pattern, or at minimum, logging before the worker exits.
♻️ Consistent approach
-let permit = commit_semaphore.clone().acquire_owned().await.expect("Semaphore closed"); +let permit = match commit_semaphore.clone().acquire_owned().await { + Ok(p) => p, + Err(_) => { + warn!("Semaphore closed, worker exiting"); + break; + } +};
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
Cargo.tomlmq-bridge.schema.jsonsrc/command_handler.rssrc/endpoints/kafka.rssrc/middleware/deduplication.rssrc/models.rssrc/route.rstests/integration/common.rstests/integration/ibm_mq.rstests/integration/mod.rstests/integration/zeromq.rstests/performance_pipeline.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/integration/ibm_mq.rs
🧰 Additional context used
🧬 Code graph analysis (3)
tests/integration/common.rs (3)
src/route.rs (1)
msg(380-380)src/type_handler.rs (1)
msg(119-119)src/endpoints/file.rs (1)
serde_json(168-168)
src/route.rs (3)
src/models.rs (2)
default_max_parallel_commits(54-56)new(192-199)src/endpoints/mongodb.rs (1)
send_batch(140-199)src/traits.rs (5)
send_batch(114-117)send_batch(150-155)send_batch(172-177)send_batch(322-327)send_batch(383-395)
tests/integration/zeromq.rs (2)
tests/integration/common.rs (2)
run_performance_pipeline_test(165-171)setup_logging(277-303)tests/performance_pipeline.rs (1)
test_zeromq_performance_pipeline(66-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (nats)
- GitHub Check: Build (aws)
- GitHub Check: Build (http)
- GitHub Check: Build (full)
- GitHub Check: Build (amqp)
- GitHub Check: Documentation
- GitHub Check: Test
- GitHub Check: Check
- GitHub Check: Integration Tests
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Performance Benchmarks
- GitHub Check: Test on ubuntu-latest with Rust stable
🔇 Additional comments (29)
src/command_handler.rs (1)
185-189: LGTM!The migration to
Route::new()constructor is clean and aligns with the new API. The test continues to properly chain.with_handler(handler)and validates the route execution flow.tests/integration/mod.rs (1)
15-19: LGTM!The ZeroMQ module addition follows the established pattern for feature-gated integration test modules. The
#[cfg(feature = "zeromq")]guard is consistent with other optional backends.src/middleware/deduplication.rs (1)
57-83: LGTM - At-Least-Once semantics correctly implemented.The refactored deduplication flow properly:
- Pre-checks for duplicates using
contains_keyto skip already-processed messages- Defers DB insertion to the commit wrapper, ensuring the dedup key is recorded only after successful processing but before broker ack
The comment on line 71-72 correctly explains the rationale for inserting before the broker commit to prevent duplicates on crash scenarios.
tests/integration/common.rs (4)
120-121: LGTM!The message generation now uses a formatted string that produces valid JSON. This is cleaner and more predictable for test payloads.
430-434: Good optimization with static payload.Using
LazyforSTATIC_PAYLOADavoids repeated serialization overhead in performance tests, ensuring consistent and efficient message generation.
472-491: LGTM - Multi-producer pattern for write performance.The multi-generator approach correctly:
- Clamps generator count to a reasonable range (1-8)
- Distributes messages evenly with proper remainder handling
- Drops the original sender to allow channel closure when all generators complete
676-684: Semaphore-based commit throttling is appropriate.The
MAX_PARALLEL_COMMITSlimit of 4096 prevents resource exhaustion while still allowing high concurrency. The spawned tasks correctly acquire permits before committing and release them afterward.Note: The test completes without awaiting all commits, which is acceptable for throughput measurement but means some commits may still be in-flight when the duration is recorded.
tests/performance_pipeline.rs (2)
3-7: LGTM!The memory performance pipeline test follows the established pattern. Using
#[ignore]is appropriate for performance tests that require explicit invocation.
63-68: LGTM!The ZeroMQ performance test is correctly gated with
#[cfg(feature = "zeromq")]and follows the same pattern as other backend performance tests.tests/integration/zeromq.rs (1)
1-37: LGTM - Well-structured ZeroMQ integration test.The test configuration correctly sets up a PUSH/PULL socket pair with:
- Publisher (PUSH) binding on port 5558
- Consumer (PULL) connecting to the same port
- Appropriate concurrency (4) and batch size (128) settings
The test follows the established pattern from other backend integration tests.
src/endpoints/kafka.rs (1)
172-210: LGTM - Abort-on-error preserves message ordering.The refactored batch processing correctly aborts the entire batch when an enqueue failure occurs. This ensures that:
- Failed messages and all subsequent messages are returned for retry
- Message ordering is preserved, preventing out-of-order delivery
The iterator pattern (
while let+ consuming remaining viafor) cleanly handles the abort flow.src/route.rs (5)
9-11: LGTM! Import additions for new concurrency primitives.The imports for
BatchCommitFunc,Semaphore, andJoinSetare correctly added to support the new parallel commit and worker pool management features.Also applies to: 17-18
34-34: LGTM! Route initialization extended with max_parallel_commits.Correctly wires the new
max_parallel_commitsfield using the default frommodels.rs.
183-204: Partial failure handling looks correct.The logic correctly distinguishes between retryable errors (which trigger reconnection) and non-retryable errors (which are logged and dropped). Committing after handling non-retryable failures ensures successfully sent messages are acknowledged.
300-311: Good supervision pattern for worker health monitoring.Using
join_next()in the select loop to detect unexpected worker completion or panics is a solid pattern for route resilience. The biased select ensures errors and worker failures are prioritized over new work.
344-356: Shutdown logic for workers is sound.Dropping the work channel and draining the JoinSet ensures workers complete their current work before the route exits. The final error check is a good defensive measure.
Cargo.toml (3)
3-3: LGTM! Version bump for new features.Appropriate minor version increment for adding ZeroMQ support and configuration changes.
81-82: LGTM! Feature organization is clear.The
fastfeature for common backends without optional heavy dependencies (mongodb, aws, zeromq) andfullfor everything is a sensible split.
88-89: LGTM! Schema and zeromq features correctly wired.Both features are properly defined with their respective dependencies.
Also applies to: 102-102
src/models.rs (6)
33-38: LGTM! New max_parallel_commits field with serde wiring.The field is properly documented, has a serde default, and follows the existing pattern for route configuration.
50-52: Verify the batch_size default change from 128 to 1.Changing the default batch size from 128 to 1 is a significant behavioral change that affects throughput vs. latency tradeoffs. Ensure this is intentional and consider:
- Documenting this change in release notes
- Users relying on the previous default will see different behavior on upgrade
54-56: LGTM! Default max_parallel_commits value.A default of 4096 parallel commit tasks provides good headroom for high-throughput scenarios while the semaphore prevents unbounded growth.
264-264: LGTM! ZeroMq variant added to EndpointType.Follows the same pattern as other endpoint variants.
574-581: LGTM! MqttProtocol enum.Clean enum for MQTT protocol version selection with V5 as the default.
611-645: LGTM! ZeroMQ endpoint configuration types.The
ZeroMqEndpoint,ZeroMqConfig, andZeroMqSocketTypefollow the established patterns for other endpoint types. The socket type enum covers the common ZeroMQ patterns (push/pull, pub/sub, req/rep).The implementation already provides sensible context-appropriate defaults when
socket_typeis not specified:Pushfor publisher,Pullfor consumer, andSubfor subscriber. Validation also ensures unsupported socket types for each endpoint are rejected with clear errors.mq-bridge.schema.json (4)
918-961: LGTM! Route schema updated with new fields.The schema correctly reflects the Rust model:
batch_sizedefaults to 1max_parallel_commitsdefaults to 4096- Both have proper type constraints (uint, minimum 0)
267-277: LGTM! ZeroMQ endpoint integrated into Endpoint oneOf.The ZeroMQ endpoint is properly added to the union of endpoint types, following the same pattern as other endpoints.
1032-1084: LGTM! ZeroMQ schema definitions.The
ZeroMqEndpointandZeroMqSocketTypedefinitions correctly mirror the Rust types:
urlis requiredbinddefaults to falsesocket_typeis nullable with valid enum valuesinternal_buffer_sizehas proper uint constraints
325-326: Good schema hygiene with unevaluatedProperties.Using
unevaluatedProperties: falseon the Endpoint definition ensures strict validation against the oneOf variants, catching typos in endpoint type names.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @src/middleware/deduplication.rs:
- Around line 73-127: The compare-and-swap calls misuse sled's API: pass slices
instead of &Vec and extract the current value from CompareAndSwapError. Replace
uses of Some(&pending_val) with Some(pending_val.as_slice()) (or own an IVec)
when calling compare_and_swap, and in matches like Ok(Err(current_val)) treat
current_val as a CompareAndSwapError and access its current via
current_val.current.as_deref() (an Option<&[u8]>) to inspect bytes/length;
likewise for the second compare_and_swap use the current Option<&[u8]> as the
expected argument. Keep the existing logic around STATE_PENDING, PENDING_TTL,
self.ttl_seconds, now and is_duplicate but operate on the dereferenced byte
slice from CompareAndSwapError.
In @src/models.rs:
- Around line 442-444: Add a missing documentation comment for the
NatsConfig.request_reply field explaining that when true the publisher uses the
request-reply pattern (sender waits for a response using
core_client.request_with_headers() with timeout handling) and clarify default
behavior, and move/ensure the existing comment for delayed_ack remains above
delayed_ack; also update the user README or configuration examples to mention
request_reply support and its effect on publisher behavior.
🧹 Nitpick comments (6)
src/route.rs (4)
183-204: Clarify the treatment of partial batch failures.The logic now treats any retryable error in a partial batch as fatal (line 190-194), while non-retryable errors result in dropped messages with logged errors (line 196-198). This asymmetry means:
- Retryable errors → route teardown and reconnect (entire batch may be redelivered)
- Non-retryable errors → messages dropped, batch committed
This could lead to duplicate processing of successful messages when a retryable failure occurs. Consider whether this trade-off aligns with your durability vs. exactly-once semantics requirements.
244-254: Simplify the semaphore acquisition error handling.The
Err(_)case on line 246 logs a warning and breaks, treating semaphore closure as a signal to exit. However,Semaphore::acquire_owned()only returnsErrwhen the semaphore is closed, which never happens in this code (the semaphore is never explicitly closed).Since the semaphore won't be closed during normal operation, this error path is unreachable. Consider simplifying to
.unwrap()or.expect()with a clear message, or document why semaphore closure is a valid shutdown path.♻️ Simplified version
-let permit = match commit_semaphore.clone().acquire_owned().await { - Ok(p) => p, - Err(_) => { - warn!("Semaphore closed, worker exiting"); - break; - } -}; +let permit = commit_semaphore + .clone() + .acquire_owned() + .await + .expect("Semaphore should never be closed");
257-288: Duplicate semaphore error handling.Lines 278-283 duplicate the same error handling pattern as lines 244-250. Consider extracting this into a helper function or macro to reduce code duplication.
175-180: Clarify the design intent for fire-and-forget spawned commit tasks.The spawned commit tasks acquire semaphore permits and run in the background without explicit tracking. While the
Arc<Semaphore>ensures no resource leak (permits are released when tasks complete), the sequential runner spawns these tasks without joining them, leaving them to run independently after shutdown. The concurrent runner is more robust—it joins worker tasks but their spawned commit tasks are also fire-and-forget. Confirm whether this design is intentional and whether unbounded lingering commit tasks during shutdown/reconnection are acceptable, or if these tasks should be explicitly coordinated (e.g., viaJoinSet, cancellation token, or timeout).src/middleware/deduplication.rs (2)
59-63: Consider making PENDING_TTL configurable.The
PENDING_TTLis hardcoded to 60 seconds. Consider adding this toDeduplicationMiddlewareconfig to allow tuning based on crash recovery requirements.
245-286: Consider expanding test coverage for TTL and state transitions.The basic deduplication test is correct. Consider adding tests for:
- TTL expiration (verify expired entries can be reprocessed)
- Pending state timeout recovery
- Processed state persistence across consumer restarts
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
Cargo.tomlsrc/middleware/deduplication.rssrc/models.rssrc/route.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- Cargo.toml
🧰 Additional context used
🧬 Code graph analysis (1)
src/route.rs (4)
src/models.rs (2)
default_max_parallel_commits(54-56)new(192-199)src/endpoints/memory.rs (7)
new(38-41)new(106-112)new(165-172)new(235-240)is_empty(81-83)len(76-78)send_batch(134-149)src/endpoints/mongodb.rs (1)
send_batch(140-199)src/traits.rs (5)
send_batch(114-117)send_batch(150-155)send_batch(172-177)send_batch(322-327)send_batch(383-395)
🪛 GitHub Actions: Test Matrix
src/middleware/deduplication.rs
[error] 78-78: the trait bound IVec: From<&Vec<u8>> is not satisfied for the call to Tree::compare_and_swap. The code uses .compare_and_swap(&key, None::<&[u8]>, Some(&pending_val)) and requires NV: Into<IVec>; &Vec<u8> does not implement Into<IVec>.
[error] 83-83: no method named len found for struct CompareAndSwapError in the current scope.
[error] 84-84: cannot index into a value of type CompareAndSwapError.
[error] 85-85: cannot index into a value of type CompareAndSwapError.
[error] 94-94: the trait bound CompareAndSwapError: AsRef<[u8]> is not satisfied for the required NV/current value in compare_and_swap.
[error] 95-95: the method as_ref is not available on CompareAndSwapError; attempted usage implies a mismatch in expected types for the swap operation.
[error] 108-108: the trait bound IVec: From<&Vec<u8>> is not satisfied for the compare_and_swap call with Some(¤t_val); &Vec<u8> cannot be converted into IVec via From.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Build (mqtt)
- GitHub Check: Build (http)
- GitHub Check: Build (aws)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (amqp)
- GitHub Check: Build (full)
- GitHub Check: Build (nats)
- GitHub Check: Test
- GitHub Check: Integration Tests
- GitHub Check: Check
- GitHub Check: Documentation
- GitHub Check: Performance Benchmarks
🔇 Additional comments (17)
src/route.rs (7)
9-11: LGTM!The import additions correctly bring in
PublisherErrorandSentBatchneeded for the enhanced error handling logic introduced in this PR.
17-18: LGTM!Adding
Semaphorefor parallel commit control andJoinSetfor worker supervision are appropriate concurrency primitives for the new worker management approach.
34-34: LGTM!The new
max_parallel_commitsfield is correctly initialized using the default helper function, allowing configurable control over commit concurrency.
142-142: LGTM!The semaphore is correctly initialized with the configured limit and wrapped in
Arcfor sharing across spawned commit tasks.
153-169: LGTM!The refactoring to use
received_batchstructure improves readability and correctly handles the three error cases: empty batch (continue), end-of-stream (graceful shutdown), and connection errors (propagate for reconnect).
312-323: Excellent worker supervision with JoinSet.The addition of
join_next()monitoring in the select loop ensures that worker panics or unexpected exits trigger immediate route shutdown, improving reliability and observability.
360-360: LGTM!The graceful shutdown correctly waits for all workers to complete using
join_set.join_next(), ensuring clean termination.src/models.rs (6)
36-38: LGTM!The new
max_parallel_commitsfield provides configurable control over commit concurrency, with a sensible default of 4096. The documentation clearly indicates it's optional.
264-264: LGTM!The new
ZeroMq(ZeroMqEndpoint)variant correctly extends theEndpointTypeenum to support ZeroMQ endpoints, aligning with the PR's objective of adding ZeroMQ support.
408-408: LGTM!The documentation clarification that
delayed_ackis publisher-only improves API clarity and helps prevent misuse.
574-581: LGTM!The
MqttProtocolenum provides a clear, type-safe way to specify MQTT protocol versions (V5 or V3), with V5 as the sensible default.
611-645: LGTM! Well-structured ZeroMQ configuration types.The ZeroMQ configuration types are well-designed:
ZeroMqEndpointcombines topic and config, consistent with other endpoint typesZeroMqConfigincludes essential fields:url,socket_type,bindflag, andinternal_buffer_sizeZeroMqSocketTypeenum covers the common ZeroMQ socket types (Push/Pull, Pub/Sub, Req/Rep)- Proper use of
Optionfor optional fields and#[serde(default)]for defaults
33-33: This review comment cannot be verified with the available information. The current code and documentation are consistent (both state default is 1), and all integration tests explicitly configurebatch_size: 128rather than relying on the default. Without git history access confirming a recent change from 128 to 1, I cannot determine if this is an actual breaking change or if the default has always been 1.If this is intended to flag a breaking change from a recent commit, please provide the commit hash or PR details for verification.
src/middleware/deduplication.rs (4)
129-133: Duplicate handling logic is sound.The duplicate detection correctly acknowledges the message via
original_commit(None)and continues to the next message. This works correctly once the compare-and-swap logic is fixed.
154-212: Cleanup logic correctly handles both legacy and new formats.The cleanup task properly handles backward compatibility by supporting both 8-byte (legacy) and 9-byte (new stateful) timestamp encodings. Error handling is comprehensive with appropriate logging at each failure point.
218-230: Batch implementation correctly prioritizes deduplication correctness.The decision to process one message at a time, despite the
_max_messagesparameter, is the right choice to ensure correct deduplication logic. The comment clearly explains this design decision.
135-152: The commit wrapper implementation is correct and does not have the described issue.The wrapped commit function only marks messages as processed when they've actually been successfully processed. Failed messages are never passed to the commit function because:
- Only successful responses (from
Sent::Response) are collected and passed to commit- Retryable errors trigger an immediate
Err()return, preventing commit from being called- Non-retryable errors are logged/dropped but don't result in error responses being passed to commit
The dedup DB state will remain in pending for failed messages, allowing them to be retried after
PENDING_TTLexpires if not recovered from upstream backups. No further action is needed.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @src/route.rs:
- Around line 175-180: The spawned commit tasks in the sequential path use
tokio::spawn and are never tracked, so shutdown can return before commits
finish; change the code to register these spawned tasks with the same JoinSet
used by the concurrent path (the commit_tasks JoinSet) instead of calling
tokio::spawn directly: move the acquired permit and the commit(None).await into
a closure passed to commit_tasks.spawn (or commit_tasks.spawn(async move { ...
})) so the JoinSet owns the JoinHandle and you can await all commits on
shutdown, and apply the same change to the other spawn site around
commit_semaphore (the block at the other mentioned location).
🧹 Nitpick comments (2)
src/route.rs (2)
196-198: Consider adding observability for dropped messages.Messages are correctly dropped when non-retryable errors occur, and each is logged. However, for operational monitoring, consider adding a counter or metric to track the rate of dropped messages over time.
183-189: Consider removingexpect()to avoid panic potential.While the
expect("has_retryable is true")is currently safe (guarded byif has_retryable), usingexpect()in production code introduces a potential panic point if the logic ever changes. Consider usingunwrap_or_else()with a fallback or restructuring to avoidexpect().♻️ Alternative pattern without expect()
-let has_retryable = failed.iter().any(|(_, e)| matches!(e, PublisherError::Retryable(_))); -if has_retryable { - let failed_count = failed.len(); - let (_, first_error) = failed - .into_iter() - .find(|(_, e)| matches!(e, PublisherError::Retryable(_))) - .expect("has_retryable is true"); +if let Some((_, first_error)) = failed + .iter() + .find(|(_, e)| matches!(e, PublisherError::Retryable(_))) +{ + let failed_count = failed.len(); return Err(anyhow::anyhow!( "Failed to send {} messages in batch. First retryable error: {}", failed_count, first_error )); }Also applies to: 258-264
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/route.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/route.rs (2)
src/models.rs (1)
default_max_parallel_commits(54-56)src/traits.rs (5)
send_batch(114-117)send_batch(150-155)send_batch(172-177)send_batch(322-327)send_batch(383-395)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (aws)
- GitHub Check: Build (http)
- GitHub Check: Build (nats)
- GitHub Check: Build (full)
- GitHub Check: Build (amqp)
- GitHub Check: Check
- GitHub Check: Documentation
- GitHub Check: Test
- GitHub Check: Integration Tests
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Performance Benchmarks
🔇 Additional comments (9)
src/route.rs (9)
9-11: LGTM! Import additions support the new concurrency control.The additions of
PublisherError,Semaphore, andJoinSetare necessary for the new commit throttling and worker management features.Also applies to: 17-18
34-34: LGTM! Field initialization follows established patterns.The
max_parallel_commitsfield is correctly initialized using the default value from the models module.
142-142: LGTM! Semaphore initialization is correct.The semaphore correctly limits concurrent commit operations using the configured
max_parallel_commitsvalue.
238-239: LGTM! Per-worker commit tracking ensures graceful shutdown.Each worker correctly maintains its own
commit_tasksJoinSet and waits for all in-flight commits to complete before exiting (line 302). This prevents data loss during shutdown.Also applies to: 302-303
245-251: LGTM! Semaphore closure is handled gracefully.Workers correctly detect when the semaphore is closed and exit cleanly without panicking. This supports proper shutdown coordination.
Also applies to: 279-285
315-326: LGTM! JoinSet monitoring detects worker failures.The route correctly monitors the worker
JoinSetfor both panics and unexpected exits, triggering a route shutdown and reconnect. This ensures that worker failures don't go unnoticed.Note: This monitoring will also detect workers that exit due to retryable errors (flagged separately), which triggers the route to restart.
359-363: LGTM! Graceful shutdown sequence is well-coordinated.The concurrent path correctly:
- Closes the work channel (line 361) to signal workers to stop accepting new batches
- Waits for all workers to complete (line 363), which in turn wait for their in-flight commits (line 302)
This ensures no data loss during shutdown.
153-159: LGTM! Batch handling refactored for clarity.The introduction of the
received_batchvariable improves readability by making the destructuring and usage of batch components more explicit.Also applies to: 169-169, 172-173
258-274: Review comment is incorrect regarding worker behavior.The current implementation does not suffer from progressive concurrency degradation. When a worker encounters a retryable error (lines 258-274) or publisher error (lines 291-298), it sends an error via
err_tx, which causes the entire route to restart (line 303-305 returnsErr(), triggering the outer reconnect loop at lines 76-78). On reconnection, all workers in the pool are respawned fresh via a newrun_concurrently()call. This behavior matches the sequential path: both paths return an error that causes the outer loop to reconnect and rebuild the full worker pool, not exit independently. The route will maintain full concurrency across reconnections.
| let has_retryable = failed.iter().any(|(_, e)| matches!(e, PublisherError::Retryable(_))); | ||
| if has_retryable { | ||
| let failed_count = failed.len(); | ||
| let (_, first_error) = failed | ||
| .into_iter() | ||
| .next() | ||
| .expect("failed_count > 0 implies at least one failed message"); | ||
| .find(|(_, e)| matches!(e, PublisherError::Retryable(_))) | ||
| .expect("has_retryable is true"); | ||
| return Err(anyhow::anyhow!( | ||
| "Failed to send {} messages in batch. First error: {}", | ||
| "Failed to send {} messages in batch. First retryable error: {}", | ||
| failed_count, | ||
| first_error | ||
| )); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's understand the repository structure
git ls-files | head -20Repository: marcomq/mq-bridge
Length of output: 524
🏁 Script executed:
# Check if src/route.rs exists and get basic info
wc -l src/route.rsRepository: marcomq/mq-bridge
Length of output: 77
🏁 Script executed:
# Read the specific section mentioned (lines 183-195) with surrounding context
sed -n '170,210p' src/route.rsRepository: marcomq/mq-bridge
Length of output: 2462
🏁 Script executed:
# Find the function containing this code by looking at broader context
sed -n '140,220p' src/route.rsRepository: marcomq/mq-bridge
Length of output: 4372
🏁 Script executed:
# Search for the reconnect logic and backoff configuration
rg -n "reconnect|5.?second|backoff" src/route.rs -A 2 -B 2Repository: marcomq/mq-bridge
Length of output: 2399
🏁 Script executed:
# Search for where errors from the route function are handled
rg -n "spawn|tokio::spawn" src/route.rs -A 3 -B 1 | head -50Repository: marcomq/mq-bridge
Length of output: 2628
🏁 Script executed:
# Search for retry-related code
rg -n "retry|Retry" src/route.rs -A 2 -B 2Repository: marcomq/mq-bridge
Length of output: 2948
🏁 Script executed:
# Look for any configuration or thresholds
rg -n "threshold|consecutive|attempt" src/ -l | head -10Repository: marcomq/mq-bridge
Length of output: 232
🏁 Script executed:
# Check if there are tests documenting the expected behavior
fd test src/ -type f | xargs rg -l "Partial|retryable" 2>/dev/null | head -5Repository: marcomq/mq-bridge
Length of output: 358
🏁 Script executed:
# Look for the publisher trait/implementation to understand SentBatch::Partial
rg -n "SentBatch|PublisherError::Retryable" src/ --type rust -B 2 -A 2 | head -60Repository: marcomq/mq-bridge
Length of output: 3895
🏁 Script executed:
# Look at the retry middleware
cat src/middleware/retry.rsRepository: marcomq/mq-bridge
Length of output: 8243
🏁 Script executed:
# Check the PublisherError definition and understand Retryable vs non-retryable
rg -n "enum PublisherError|Retryable|impl.*PublisherError" src/ -A 5 | head -80Repository: marcomq/mq-bridge
Length of output: 4985
🏁 Script executed:
# Look at the concurrent runner to see if it has different logic
sed -n '220,290p' src/route.rsRepository: marcomq/mq-bridge
Length of output: 4078
🏁 Script executed:
# See how RetryPublisher is configured/created
rg -n "RetryPublisher|RetryMiddleware" src/ -B 2 -A 2 | head -80Repository: marcomq/mq-bridge
Length of output: 3648
🏁 Script executed:
# Check the models to see RetryMiddleware configuration
rg -n "struct RetryMiddleware|RetryMiddleware {" src/ -A 5Repository: marcomq/mq-bridge
Length of output: 204
🏁 Script executed:
# Look for where publishers are created to understand the retry config
rg -n "create_publisher" src/route.rs -A 20 | head -50Repository: marcomq/mq-bridge
Length of output: 2259
🏁 Script executed:
# Check the default retry configuration
rg -n "default_retry" src/ -A 3Repository: marcomq/mq-bridge
Length of output: 458
🏁 Script executed:
# Check how RetryMiddleware is applied and if it's default
rg -n "apply_middlewares\|wrap_with_middlewares" src/ -A 10 | head -60Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# See if there are tests showing how the retry + route error handling interact
fd test -name "*.rs" | xargs rg -l "Partial.*Retryable\|retryable.*route" | head -5Repository: marcomq/mq-bridge
Length of output: 291
🏁 Script executed:
# Search for integration tests related to retryable errors in routes
fd test -name "*.rs" -path "*/tests/*" | head -10Repository: marcomq/mq-bridge
Length of output: 291
🏁 Script executed:
# Look for tests that verify the error handling behavior
rg "Retryable.*route\|route.*Retryable" src/ --type rust -A 5 -B 2Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Check if there are any issues or TODOs about this aggressive reconnect behavior
rg "TODO\|FIXME\|XXX" src/route.rs -A 2Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Understand the full flow: check what happens when RetryPublisher exhausts retries
# and returns a Partial with retryable errors
cat src/middleware/retry.rs | sed -n '100,130p'Repository: marcomq/mq-bridge
Length of output: 1527
🏁 Script executed:
# Let's see the complete flow - check how RetryMiddleware is applied
rg "apply_publisher_middlewares\|wrap_publisher" src/ -A 15 | head -80Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Check if RetryPublisher is always applied or conditional
rg -n "middlewares" src/endpoints.rs -A 5 -B 2 | head -50Repository: marcomq/mq-bridge
Length of output: 117
🏁 Script executed:
# Verify: when RetryPublisher exhausts retries, what does it return?
sed -n '115,125p' src/middleware/retry.rsRepository: marcomq/mq-bridge
Length of output: 554
🏁 Script executed:
# Check the full middleware application stack
ls -la src/endpoints/Repository: marcomq/mq-bridge
Length of output: 1188
🏁 Script executed:
# Look at how publishers are created with middleware
rg "create_publisher_from_route" src/ -A 30 | head -80Repository: marcomq/mq-bridge
Length of output: 4565
🏁 Script executed:
# Check the endpoints module to understand middleware application
head -100 src/endpoints/mod.rsRepository: marcomq/mq-bridge
Length of output: 3457
🏁 Script executed:
# Check if RetryPublisher is conditionally applied or always applied
rg "apply_middlewares_to_publisher\|Middleware::Retry" src/middleware/ -A 5 -B 2Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Verify the complete middleware application for publishers
sed -n '60,90p' src/middleware/mod.rsRepository: marcomq/mq-bridge
Length of output: 1535
🏁 Script executed:
# Check if there's documentation about the expected behavior
head -50 README.md | grep -i retryRepository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Final verification: understand the complete flow by checking default middleware config
rg "middlewares.*default\|fn default" src/models.rs | grep -A 5 -B 5 middlewareRepository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Check if RetryMiddleware is part of default config
rg "pub struct Endpoint\|pub middlewares" src/models.rs -A 10 | head -30Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Confirm: does a standard route have retry middleware by default?
rg "default.*endpoint\|Endpoint::default\|new.*endpoint" src/ -A 5 | head -40Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Look at the Endpoint struct definition directly
rg "struct Endpoint" src/models.rs -A 20Repository: marcomq/mq-bridge
Length of output: 1476
🏁 Script executed:
# Check if there are any examples or configuration files showing typical setup
fd "config\|example" -name "*.json" -o -name "*.yaml" -o -name "*.toml" | head -5Repository: marcomq/mq-bridge
Length of output: 291
🏁 Script executed:
# Let me understand better: when RetryPublisher returns Partial with retryable errors,
# the route code gets them. Let's trace this one more time to be absolutely certain
sed -n '70,110p' src/middleware/retry.rsRepository: marcomq/mq-bridge
Length of output: 1783
Acknowledge existing retry mechanism before implementing additional retries.
The codebase already provides a RetryPublisher middleware with configurable exponential backoff (default: 3 attempts, 50ms initial interval) that retries failed messages before triggering the route reconnect. When a retryable error occurs in a Partial response, it only triggers reconnect after the publisher-level retries are exhausted.
If retries at the publisher level are insufficient or you want to avoid reconnect on transient failures, ensure the RetryMiddleware is configured on the output endpoint. Alternatively, consider implementing a threshold for consecutive retryable failures before triggering a full reconnect, or adding per-message retry logic within the publisher implementation rather than tearing down the entire route.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @src/models.rs:
- Around line 47-48: The default_batch_size() function was changed from 128 to
1, which is a breaking change not documented; update the codebase and docs by
(1) adding a changelog entry describing the breaking change and migration
guidance, (2) updating the outdated comment in route.rs that still references
"default batch size (128)" to reflect the new default and note the breaking
change, and (3) optionally adjusting or documenting integration tests that
hardcode batch_size: 128 so they either set batch_size explicitly or are updated
to expect the new default; reference the function default_batch_size(), the
comment in src/route.rs, and the tests that set batch_size: 128 when making
these edits.
🧹 Nitpick comments (3)
tests/integration/route.rs (1)
308-323: Timing-based assertions may be flaky in CI.The assertions comparing
Duration::from_millis(200)andDuration::from_millis(250)could fail on slow CI runners or under load.Consider relaxing timing thresholds
// Case 1: High concurrency (Parallel commits) -> Should be fast (no blocking on semaphore) let duration_fast = run_test_case(10).await; assert!( - duration_fast < Duration::from_millis(200), + duration_fast < Duration::from_millis(500), "Fast route took too long: {:?}", duration_fast ); // Case 2: Low concurrency (Sequential commits) -> Should be slow (~300ms) - // Msg 1 & 2 sent at T=0. Msg 3 at T=100. Msg 4 at T=200. Msg 5 at T=300. let duration_slow = run_test_case(1).await; assert!( - duration_slow >= Duration::from_millis(250), + duration_slow >= Duration::from_millis(200), "Slow route was too fast: {:?}", duration_slow ); + // Also verify slow is significantly slower than fast + assert!( + duration_slow > duration_fast, + "Sequential should be slower than parallel" + );src/middleware/deduplication.rs (2)
61-63: Consider making PENDING_TTL configurable.The hardcoded 5-second
PENDING_TTLmay be too short for handlers that take longer to process messages, potentially causing duplicate processing after a crash recovery.Consider deriving PENDING_TTL from handler timeout or making it configurable
+const DEFAULT_PENDING_TTL: u64 = 5; + // Use a prefix to distinguish between pending (0) and processed (1) states. // Pending state has a short TTL to allow recovery from crashes. const STATE_PENDING: u8 = 0; const STATE_PROCESSED: u8 = 1; -const PENDING_TTL: u64 = 5; +// TODO: Consider making this configurable via DeduplicationMiddleware +const PENDING_TTL: u64 = DEFAULT_PENDING_TTL;
73-132: CAS loop implementation is correct but could benefit from backoff.The compare-and-swap loop handles contention correctly, but under extreme contention it could spin-loop without yielding.
Consider adding
tokio::task::yield_now().awaitafter a few failed CAS attempts to prevent CPU spinning under high contention.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
Cargo.tomlREADME.mdmq-bridge.schema.jsonsrc/middleware/deduplication.rssrc/middleware/mod.rssrc/models.rssrc/route.rstests/integration/route.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- Cargo.toml
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-07T16:21:12.817Z
Learnt from: marcomq
Repo: marcomq/mq-bridge PR: 9
File: .github/workflows/test-matrix.yml:44-48
Timestamp: 2026-01-07T16:21:12.817Z
Learning: In the mq-bridge project, IBM MQ (ibm-mq feature) should be excluded from test runs but included in clippy checks. Tests use --features=full (excluding ibm-mq), while clippy uses --all-features (including ibm-mq) to verify code without requiring IBM MQ client libraries on all platforms.
Applied to files:
README.md
🧬 Code graph analysis (2)
tests/integration/route.rs (2)
src/endpoints/memory.rs (1)
len(76-78)src/middleware/retry.rs (1)
new(15-17)
src/route.rs (2)
src/models.rs (1)
new(185-192)src/traits.rs (5)
send_batch(114-117)send_batch(150-155)send_batch(172-177)send_batch(322-327)send_batch(383-395)
🪛 LanguageTool
README.md
[style] ~38-~38: Consider using a more formal alternative here.
Context: ...ibrary was created in 2025 and is still kind of new. There are automated unit and integ...
(KIND_OF_SORT_OF_FORMAL)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Build (full)
- GitHub Check: Build (amqp)
- GitHub Check: Build (aws)
- GitHub Check: Build (http)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (nats)
- GitHub Check: Documentation
- GitHub Check: Integration Tests
- GitHub Check: Check
- GitHub Check: Test
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Performance Benchmarks
🔇 Additional comments (22)
README.md (3)
1-16: LGTM - Documentation updates for ZeroMQ support.The title change, diagram update, and description refinements appropriately reflect the library's expanded capabilities with ZeroMQ support.
89-90: ZeroMQ endpoint behavior documented correctly.The persistence table entries for ZeroMQ Consumer (PULL/REP) and Subscriber (SUB) as ephemeral are accurate for ZeroMQ's design.
410-422: Comprehensive ZeroMQ configuration example.The Route 7 example clearly demonstrates PUSH/PULL pipeline configuration with bind/connect semantics.
src/models.rs (3)
290-296: CommitConcurrencyMiddleware provides essential backpressure control.The struct is simple and focused. The
limitfield controls parallel commit tasks effectively.
745-757: TlsConfig expansion is comprehensive.The new fields (
ca_file,cert_file,key_file,cert_password,accept_invalid_certs) provide complete TLS configuration capability across all endpoints.
656-682: ZeroMQ configuration structure is well-designed.The
socket_typebeingOption<ZeroMqSocketType>provides appropriate flexibility with sensible context-specific defaults:ZeroMqProducerdefaults toPush,ZeroMqConsumertoPull, andZeroMqSubscribertoSub. Each endpoint implementation safely handles theNonecase usingunwrap_or()with type-appropriate defaults, eliminating runtime error risk.src/middleware/mod.rs (2)
48-48: CommitConcurrency middleware correctly handled as configuration-only.The comment clarifies that the actual concurrency limiting is implemented in Route, not as a middleware wrapper.
82-85: Appropriate warning for misconfigured CommitConcurrency.Logging a warning when CommitConcurrency is placed on the output endpoint helps users correct their configuration without failing silently.
tests/integration/route.rs (3)
84-85: Behavior change: non-retryable errors now logged and dropped.This is a significant change from the previous behavior. The route now continues instead of failing on non-retryable errors.
91-121: Good test coverage for retryable error without middleware.This test correctly verifies that retryable errors propagate up and crash the route when no retry middleware is configured.
123-174: Comprehensive retry middleware test.The test verifies that the retry middleware correctly retries and eventually succeeds, with proper counting of attempts.
src/middleware/deduplication.rs (2)
178-199: Backward compatibility for legacy 8-byte entries is well handled.The cleanup logic correctly supports both 8-byte (legacy) and 9-byte (new state+timestamp) formats, ensuring smooth migration.
144-157: Commit wrapper correctly transitions state from pending to processed.The error is logged but doesn't fail the commit, which is appropriate since the message was already processed.
src/route.rs (5)
141-151: CommitConcurrency middleware extraction is clean.The default of 4096 parallel commits is high enough to not bottleneck most workloads while still providing an upper bound.
184-189: Semaphore-based commit limiting is correctly implemented.The owned permit ensures the slot is released when the spawned task completes or is dropped.
192-212: Partial batch handling correctly distinguishes retryable from non-retryable errors.Retryable errors trigger route reconnection while non-retryable errors are logged and the message is dropped, allowing the route to continue.
334-345: JoinSet monitoring catches unexpected worker exits.Treating any unexpected worker completion as an error is appropriate since workers should only exit when the work channel is closed.
320-322: Worker waits for in-flight commits before exiting.This ensures no commits are lost when the worker exits, providing proper graceful shutdown semantics.
mq-bridge.schema.json (4)
82-96: CommitConcurrencyMiddleware schema correctly defined.The
limitfield is required with proper minimum constraint.
988-1024: Route schema reflects new defaults.The
batch_sizedefault of 1 andconcurrencydefault of 1 match the Rust implementation.
1101-1146: ZeroMqEndpoint schema is comprehensive.All ZeroMQ configuration options are properly defined with appropriate types and defaults. The
socket_typebeing nullable allows for endpoint-role-based inference.
346-347: Good addition of unevaluatedProperties: false.This prevents silent typos in YAML configuration files by rejecting unknown properties.
| pub(crate) fn default_batch_size() -> usize { | ||
| 128 | ||
| 1 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n src/models.rs | sed -n '40,55p'Repository: marcomq/mq-bridge
Length of output: 439
🏁 Script executed:
find . -type f -name "CHANGELOG*" -o -name "BREAKING*" -o -name "MIGRATIONS*" | head -20Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
rg -i "batch.?size|breaking" --max-count=20Repository: marcomq/mq-bridge
Length of output: 2644
🏁 Script executed:
git log --oneline src/models.rs 2>/dev/null | head -10Repository: marcomq/mq-bridge
Length of output: 111
Default batch_size changed from 128 to 1 — breaking change not documented.
This is a breaking change. The outdated comment in src/route.rs still references "default batch size (128)", and all integration tests hardcode batch_size: 128, indicating they are protecting against the new default. Add a changelog entry documenting this breaking change.
🤖 Prompt for AI Agents
In @src/models.rs around lines 47 - 48, The default_batch_size() function was
changed from 128 to 1, which is a breaking change not documented; update the
codebase and docs by (1) adding a changelog entry describing the breaking change
and migration guidance, (2) updating the outdated comment in route.rs that still
references "default batch size (128)" to reflect the new default and note the
breaking change, and (3) optionally adjusting or documenting integration tests
that hardcode batch_size: 128 so they either set batch_size explicitly or are
updated to expect the new default; reference the function default_batch_size(),
the comment in src/route.rs, and the tests that set batch_size: 128 when making
these edits.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @src/middleware/deduplication.rs:
- Around line 49-72: The 5-second PENDING_TTL (const PENDING_TTL) is likely too
short and can let pending markers expire before handler+publish complete; change
PENDING_TTL from a hardcoded const to a configurable value (e.g., make it a
parameter on the deduplication function or read from app config/env and pass it
into the code that constructs pending_val), increase the default to a safer
margin (30–60 seconds) and validate it's >= 5, and update any docs/comments to
note the TTL semantics; ensure you update uses of PENDING_TTL and keep
STATE_PENDING and STATE_PROCESSED semantics unchanged.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/middleware/deduplication.rstests/integration/route.rs
🧰 Additional context used
🧬 Code graph analysis (1)
tests/integration/route.rs (2)
src/models.rs (1)
new(185-192)src/middleware/retry.rs (1)
new(15-17)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Build (http)
- GitHub Check: Build (aws)
- GitHub Check: Build (full)
- GitHub Check: Build (amqp)
- GitHub Check: Build (nats)
- GitHub Check: Build (mqtt)
- GitHub Check: Build (mongodb)
- GitHub Check: Performance Benchmarks
- GitHub Check: Documentation
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Integration Tests
- GitHub Check: Test
- GitHub Check: Check
🔇 Additional comments (8)
tests/integration/route.rs (4)
7-10: LGTM!Import additions appropriately support the new retry middleware tests below.
84-85: Verify this breaking behavior change is intentional.The route now treats non-retryable errors (including deserialization failures) as logged and dropped rather than causing route failure. This is a significant behavioral change that could affect users who expect routes to stop on handler errors.
Please confirm this change aligns with the intended error handling semantics and consider documenting it as a breaking change.
91-121: LGTM!Test correctly verifies that retryable errors without middleware cause route failure. This provides good coverage for the error propagation path.
123-174: LGTM!Test correctly validates retry middleware behavior with max_attempts configuration. The handler fails twice before succeeding, and the test properly verifies both the retry count and successful message publication.
src/middleware/deduplication.rs (4)
140-144: LGTM!Correctly handles duplicates by logging them and committing with
Noneto acknowledge the message without reprocessing.
184-205: LGTM!Cleanup logic properly handles both 8-byte and 9-byte timestamp formats with appropriate warnings for invalid entries. The offset-based approach cleanly supports backward compatibility.
88-106: LGTM!TTL checking correctly differentiates between pending (5s) and processed (config TTL) states, maintains backward compatibility with 8-byte entries, and defensively treats invalid lengths as expired.
108-127: Duplicate detection and expiry logic is sound.Correctly identifies duplicates within TTL (line 108-110) and attempts to claim expired entries via CAS (lines 113-126). DB errors are properly propagated as connection errors.
The unbounded retry concern was already flagged in an earlier comment for the overall loop structure.
| let original_commit = received.commit; | ||
| let key = message.message_id.to_be_bytes().to_vec(); | ||
| let message_id_hex = format!("{:032x}", message.message_id); | ||
|
|
||
| let now = SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) | ||
| .context("System time is before UNIX EPOCH")? | ||
| .as_secs(); | ||
| // Atomically insert only if key doesn't exist | ||
| match self | ||
| .db | ||
| .compare_and_swap(&key, None as Option<&[u8]>, Some(&now.to_be_bytes()[..])) | ||
| .context("Failed to perform compare-and-swap in deduplication DB")? | ||
| { | ||
| Ok(_) => { | ||
| // Successfully inserted - not a duplicate, proceed | ||
| trace!( | ||
| message_id = %message_id_hex, | ||
| "Deduplication check passed (new message)" | ||
| ); | ||
| let now_bytes = now.to_be_bytes(); | ||
|
|
||
| // Use a prefix to distinguish between pending (0) and processed (1) states. | ||
| // Pending state has a short TTL to allow recovery from crashes. | ||
| const STATE_PENDING: u8 = 0; | ||
| const STATE_PROCESSED: u8 = 1; | ||
| const PENDING_TTL: u64 = 5; | ||
|
|
||
| let mut pending_val = Vec::with_capacity(9); | ||
| pending_val.push(STATE_PENDING); | ||
| pending_val.extend_from_slice(&now_bytes); | ||
|
|
||
| let mut processed_val = Vec::with_capacity(9); | ||
| processed_val.push(STATE_PROCESSED); | ||
| processed_val.extend_from_slice(&now_bytes); | ||
|
|
There was a problem hiding this comment.
Consider whether 5-second PENDING_TTL is sufficient.
Line 63 sets PENDING_TTL to 5 seconds for crash recovery. If message processing (handler execution + downstream publishing) takes longer than 5 seconds, the pending entry could expire and another instance might process the same message, defeating deduplication.
Consider:
- Increasing PENDING_TTL to a safer margin (e.g., 30-60 seconds)
- Making PENDING_TTL configurable
- Or documenting this limitation
🤖 Prompt for AI Agents
In @src/middleware/deduplication.rs around lines 49 - 72, The 5-second
PENDING_TTL (const PENDING_TTL) is likely too short and can let pending markers
expire before handler+publish complete; change PENDING_TTL from a hardcoded
const to a configurable value (e.g., make it a parameter on the deduplication
function or read from app config/env and pass it into the code that constructs
pending_val), increase the default to a safer margin (30–60 seconds) and
validate it's >= 5, and update any docs/comments to note the TTL semantics;
ensure you update uses of PENDING_TTL and keep STATE_PENDING and STATE_PROCESSED
semantics unchanged.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @mq-bridge.schema.json:
- Around line 1018-1030: The JSON schema fields "batch_size" and "concurrency"
currently permit a minimum of 0; update both field definitions so their
"minimum" is 1 (ensuring "batch_size" and "concurrency" cannot be zero) while
keeping their other properties ("type", "format", "default", etc.) unchanged.
- Around line 82-96: Update the JSON schema for CommitConcurrencyMiddleware to
disallow a zero concurrency value: change the "minimum" of the "limit" property
from 0 to 1 so the CommitConcurrencyMiddleware.limit cannot be set to 0 (which
would cause Semaphore::new(0) deadlock); modify the "limit" property inside the
CommitConcurrencyMiddleware schema accordingly.
In @src/endpoints/zeromq.rs:
- Around line 409-411: The as_any() implementation on ZeroMqSubscriber currently
delegates to the inner ZeroMqConsumer (self.0.as_any()), which breaks wrapper
type downcasts; change ZeroMqSubscriber::as_any to return self (i.e., a
reference to the ZeroMqSubscriber) instead of delegating to ZeroMqConsumer so
tests and downcast_ref::<ZeroMqSubscriber>() succeed, matching the pattern used
by MqttSubscriber and KafkaSubscriber.
🧹 Nitpick comments (4)
src/endpoints/zeromq.rs (1)
234-284: Consider refactoring Rep socket handling for clarity.The control flow for Rep socket (lines 245-277) is functional but complex. The special-case handling with early
continuemakes the logic harder to follow.Consider refactoring to make the Rep socket handling more explicit, perhaps by extracting it into a separate async function.
♻️ Potential refactor approach
Extract Rep socket handling into a helper function:
async fn handle_rep_socket( socket: &mut zeromq::RepSocket, tx: &Sender<Result<ConsumerItem, ConsumerError>> ) -> bool { match socket.recv().await { Ok(msg) => { let (reply_tx, reply_rx) = oneshot::channel(); let item = ConsumerItem { msg, reply_tx: Some(reply_tx) }; if tx.send(Ok(item)).await.is_err() { return false; // channel closed } let reply = reply_rx.await.unwrap_or_else(|_| ZmqMessage::from(bytes::Bytes::new())); socket.send(reply).await.ok(); // ignore send errors true } Err(e) => { tx.send(Err(ConsumerError::Connection(anyhow!(e)))).await.is_ok() } } }Then simplify the loop to:
loop { let should_continue = match &mut socket { ReceiverSocket::Rep(s) => handle_rep_socket(s, &tx).await, // ... other cases }; if !should_continue { break; } }mq-bridge.schema.json (3)
1117-1120: Consider renaming TlsConfig.required field.The field named
required(line 1117-1120) controls whether TLS is enabled, but this name is confusing as it semantically conflicts with JSON Schema'srequiredkeyword. Consider renaming it toenabledortls_enabledfor clarity.Example:
"enabled": { "description": "If true, enable TLS/SSL.", "type": "boolean" }
56-63: Consider using $ref for repeated TLS default values.The TLS default object is repeated verbatim across 7 different endpoints (AmqpEndpoint, HttpEndpoint, IbmMqEndpoint, KafkaEndpoint, MongoDbEndpoint, MqttEndpoint, NatsEndpoint). While valid, this creates a maintenance burden where any change to TLS defaults requires updating multiple locations.
Consider using JSON Schema's
$defswith$reffor the default value, or document this as an intentional design choice if inline defaults are preferred for clarity.Also applies to: 381-388, 439-446, 541-548, 716-723, 814-821, 935-942
1084-1088: Document security implications of accept_invalid_certs.The
accept_invalid_certsoption (line 1084-1088) disables server certificate verification, which is a significant security risk in production. While this is useful for development/testing and defaults tofalse, consider adding a stronger warning in the description about the security implications.Suggested description update:
"description": "If true, disable server certificate verification (INSECURE - for development/testing only, never use in production)."
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
mq-bridge.schema.jsonsrc/endpoints/zeromq.rssrc/middleware/delay.rssrc/middleware/mod.rssrc/models.rstests/integration/route.rs
🧰 Additional context used
🧬 Code graph analysis (3)
src/endpoints/zeromq.rs (3)
src/models.rs (2)
new(185-192)channel(208-213)src/canonical_message.rs (8)
new(23-29)s(64-64)from(112-114)from(118-120)from(124-126)from(130-132)from(144-149)get_payload_str(86-88)src/endpoints/mod.rs (1)
consumer(429-431)
src/middleware/delay.rs (2)
src/models.rs (2)
new(185-192)channel(208-213)tests/integration/route.rs (2)
receive_batch(245-259)as_any(260-262)
src/models.rs (1)
src/type_handler.rs (1)
default(66-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Build (http)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (full)
- GitHub Check: Build (amqp)
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Test on ubuntu-latest with Rust stable
- GitHub Check: Build (aws)
- GitHub Check: Performance Benchmarks
- GitHub Check: Documentation
- GitHub Check: Build (nats)
- GitHub Check: Check
- GitHub Check: Test
- GitHub Check: Integration Tests
🔇 Additional comments (25)
src/middleware/mod.rs (2)
13-13: LGTM! Delay module properly declared and imported.The delay middleware follows the same pattern as other middleware modules in this file.
Also applies to: 22-22
50-51: LGTM! Middleware wiring is correct and user-friendly.The warning message for
CommitConcurrencyon publishers (lines 85-87) is particularly helpful, as it clearly guides users to configure it on the input endpoint instead.Also applies to: 85-90
tests/integration/route.rs (6)
7-10: LGTM! Import additions support the new test cases.
82-89: Verify the behavioral change for non-retryable errors.The route now continues after non-retryable errors (logging and dropping messages) instead of returning an error. This improves resilience but represents a significant behavioral change that affects error handling semantics.
Ensure that:
- Users who rely on routes failing on handler errors are aware of this change
- There's adequate logging to track dropped messages
- This behavior is documented
Also applies to: 206-213
91-121: LGTM! Test correctly validates retryable error handling without middleware.This test provides good coverage for the expected failure case when retryable errors occur without retry middleware.
123-174: LGTM! Comprehensive test for retry middleware behavior.The test correctly validates that the retry middleware attempts up to
max_attemptstimes and succeeds when the handler eventually returnsOk.
215-328: LGTM! Excellent test for commit concurrency limiting.The custom
SlowCommitMiddlewareis a clever testing pattern that wraps the commit closure to introduce controllable delays. The timing assertions effectively validate that the concurrency limit is enforced.
330-376: LGTM! Integration test validates delay middleware timing.The test correctly verifies that the delay middleware introduces the expected latency in message processing.
src/middleware/delay.rs (3)
11-40: LGTM! DelayConsumer implementation is correct.The middleware applies the delay before each receive operation, which is appropriate for simulating slow consumers or rate limiting. Note that for
receive_batch, the delay is applied once per batch, not per message.
42-74: LGTM! DelayPublisher mirrors DelayConsumer behavior.The implementation is consistent with
DelayConsumer- the delay is applied once per batch operation.
76-126: LGTM! Tests validate delay timing.The unit tests appropriately verify that the configured delay is applied to both consumer and publisher operations.
src/endpoints/zeromq.rs (4)
116-154: LGTM! Publisher implementation correctly handles request-reply pattern.The distinction between request-reply (Req socket) and fire-and-forget (Push/Pub) is well-implemented, with appropriate error categorization.
340-387: LGTM! Complex but correct reply coordination in receive_batch.The commit closure (lines 356-380) correctly implements the reply coordination for Rep sockets, tracking responses for each message in the batch and sending the final reply when all are processed. The use of
Arc<Mutex<BatchReplyState>>is appropriate for this shared state pattern.
421-458: LGTM! Test validates basic push/pull functionality.The test correctly verifies message transmission through ZeroMQ push/pull sockets with proper timeout handling.
292-300: Add clarifying comment on frame handling indecode_batch.The use of
frames.last()at line 294 is correct for the current socket types (Pull, Sub, Rep) since the payload is always the final frame in a ZeroMQ message. However, add an explicit comment documenting this assumption—especially for Rep socket which receives multipart messages [identity, empty delimiter, payload] from Req. This clarifies whylast()is the right choice rather thanfirst()or direct indexing.src/models.rs (9)
274-274: LGTM! New middleware types properly integrated.The
CommitConcurrencyandDelaymiddleware variants align with the implementations insrc/middleware/.Also applies to: 277-277
291-297: LGTM! Middleware configuration structs are well-defined.Both
CommitConcurrencyMiddlewareandDelayMiddlewarehave clear, simple configuration structures.Also applies to: 337-342
413-431: LGTM! Kafka configuration enhancements improve flexibility.The addition of
producer_optionsandconsumer_optionsallows users to pass arbitrary librdkafka configuration, which is essential for advanced use cases. The improved documentation clearly indicates publisher-only vs consumer-only options.
453-483: LGTM! Comprehensive NATS configuration enhancements.The expanded configuration options provide much better control over authentication, request-reply patterns, and JetStream behavior. The clear documentation of publisher-only vs consumer-only options is helpful.
596-628: LGTM! MQTT protocol version support is well-implemented.The addition of
MqttProtocolenum with V3/V5 variants (defaulting to V5) provides necessary flexibility for different MQTT broker requirements.
664-702: LGTM! ZeroMQ configuration is comprehensive and well-documented.The configuration structs provide all necessary options for ZeroMQ socket setup:
- Multiple socket types (Push/Pull, Pub/Sub, Req/Rep)
- Bind vs Connect mode
- Configurable buffer size
The documentation clearly explains each option.
753-766: LGTM! TLS configuration expanded for full certificate control.The additions of
ca_file,cert_file,key_file,cert_password, andaccept_invalid_certsprovide comprehensive TLS/mTLS configuration options. The helper methodsis_mtls_client_configured()andis_tls_server_configured()are useful.
257-257: Verify ZeroMQ is properly feature-gated in the enum and all related code.The
ZeroMq(ZeroMqEndpoint)variant added toEndpointTypeshould have feature-gating consistent with other conditional endpoint types. Verify:
- The
ZeroMqvariant includes#[cfg(feature = "zeromq")]attribute (if other endpoints like AWS use conditional compilation)- All ZeroMQ-related code throughout the codebase is consistently gated under the same feature flag
33-33: Current batch_size default of 1 is correctly documented.The code is internally consistent: the comment states "Defaults to 1" and the
default_batch_size()function returns 1. However, the claim of a breaking change from 10 to 1 cannot be verified from the repository history. The initial commit includes the entire codebase with this default already set to 1, with no prior commits showing a transition from 10 to 1.If this represents a change from earlier versions, that context is not available in the current repository history.
Likely an incorrect or invalid review comment.
mq-bridge.schema.json (1)
1127-1172: Socket_type defaults mismatch and missing TLS configuration.The
socket_typefield is nullable in the schema withdefault: null, but the implementation provides context-dependent defaults:Pushfor publishers,Pullfor consumers, andSubfor subscribers (lines 35, 193, 395). Update the schema to reflect these runtime defaults rather than allowing null.Additionally, this endpoint lacks TLS configuration entirely, unlike all other endpoints (AMQP, Kafka, NATS, MQTT, HTTP). ZeroMQ does not provide built-in TLS/CURVE support in version 0.4 of the Rust crate, so clarify whether secure communication is intentionally unsupported or requires external handling.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/models.rs (1)
34-51: Batch size default changed from 128 to 1 with significant performance implications. Documentation is outdated.The batch_size default reduction from 128 to 1 will significantly affect performance:
- Lower latency: Messages process immediately rather than waiting for batches
- Lower throughput: More frequent I/O operations and higher per-message overhead
- Higher CPU usage: More processing cycles required per message
This is a breaking change that favors low-latency over high-throughput workloads. Users migrating from previous versions may experience performance degradation in high-volume scenarios unless they explicitly set
batch_size.Additionally, the documentation in
src/route.rsline 23 is outdated—the comment claims "default batch size (128)" while the actual default is now 1. Update this to reflect the current behavior.
🤖 Fix all issues with AI agents
In @src/endpoints/zeromq.rs:
- Around line 246-268: In the ReceiverSocket::Rep match arm, don't silently
substitute an empty ZmqMessage when reply_rx.await fails; instead match on
reply_rx.await, and when it Errs, emit a warning (e.g. using tracing::warn! or
the crate logger) that the consumer dropped the reply channel (include
identifying context such as the original msg or a request id if available), then
proceed to send the empty reply via s.send(reply) to preserve behavior; update
the code that constructs ConsumerItem and the s.send call accordingly so the
warning is logged before sending the fallback ZmqMessage.
- Around line 74-103: When the background task receives a mismatched job type it
only logs and returns, causing callers to hang; update the match arms handling
PublisherJob::Send and PublisherJob::Request to always respond on the provided
channels (ack_tx or reply_tx) with an error result instead of just logging.
Specifically, in the PublisherJob::Send branch where SenderSocket::Req(_) is
matched, send an Err via ack_tx (e.g., an appropriate send error or a custom
mismatch error) before continuing; similarly in the PublisherJob::Request branch
where non-Req sockets are matched, send an Err on reply_tx so the requester is
unblocked. Ensure you use the same channel types (ack_tx.send(...) and
reply_tx.send(...)) and preserve existing success paths in Request/Send
handling.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
mq-bridge.schema.jsonsrc/endpoints/zeromq.rssrc/models.rs
🧰 Additional context used
🧬 Code graph analysis (2)
src/models.rs (1)
src/type_handler.rs (1)
default(66-68)
src/endpoints/zeromq.rs (2)
src/models.rs (2)
new(188-195)channel(211-216)src/canonical_message.rs (8)
new(23-29)s(64-64)from(112-114)from(118-120)from(124-126)from(130-132)from(144-149)get_payload_str(86-88)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Build (full)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (amqp)
- GitHub Check: Build (aws)
- GitHub Check: Build (nats)
- GitHub Check: Build (http)
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Test
- GitHub Check: Integration Tests
- GitHub Check: Documentation
- GitHub Check: Performance Benchmarks
🔇 Additional comments (12)
src/endpoints/zeromq.rs (6)
1-26: LGTM: Clean module structure and type definitions.The imports and internal enum definitions are well-organized and appropriate for the ZeroMQ integration.
112-155: LGTM: Publisher implementation handles both request-reply and fire-and-forget patterns correctly.The implementation appropriately differentiates between Req sockets (which expect replies) and Push/Pub sockets (fire-and-forget), with proper error handling in both paths.
157-189: LGTM: Well-structured reply coordination types.The use of
Arc<Mutex<BatchReplyState>>to coordinate replies across a batch is appropriate, allowing each message to independently record its response while tracking completion.
293-339: LGTM: Batch decode and buffer management logic is sound.The approach of using the last frame for the payload and coordinating replies through shared state is appropriate for ZeroMQ's multi-frame message model.
390-413: LGTM: Clean subscriber abstraction.The wrapper type appropriately defaults to Sub socket type while delegating all behavior to the underlying consumer implementation.
415-459: LGTM: Basic integration test validates core functionality.The test covers the fundamental push-pull pattern with proper timeout handling. While additional test coverage for pub/sub and req/rep patterns would be beneficial, the current test is a solid foundation.
src/models.rs (5)
273-283: LGTM: New middleware types follow established patterns.The
CommitConcurrencyMiddlewareandDelayMiddlewareadditions are well-structured and consistent with existing middleware configuration patterns. The field types and serde attributes are appropriate.Also applies to: 294-300, 340-345
260-260: LGTM: ZeroMQ configuration types are well-structured.The ZeroMQ endpoint and configuration types follow the established pattern used by other messaging providers. Documentation is clear, and the use of
Optiontypes with defaults is appropriate.Also applies to: 666-704
407-433: LGTM: Kafka configuration enhancements improve clarity and flexibility.The addition of
producer_optionsandconsumer_optionsprovides necessary flexibility for advanced librdkafka configuration. The clarified comments (e.g., "Publisher only") improve documentation quality.
449-485: LGTM: Comprehensive NATS configuration expansion.The additions provide comprehensive control over NATS/JetStream behavior, including authentication, TLS, request-reply patterns, and stream management. Documentation clearly distinguishes publisher-only semantics.
618-630: LGTM: TLS and protocol enhancements support advanced security scenarios.The MQTT protocol selection and expanded TLS configuration (including mTLS support) provide necessary flexibility. The warning about
accept_invalid_certsbeing insecure is appropriate.Also applies to: 750-768
mq-bridge.schema.json (1)
1-1186: LGTM: Generated schema is consistent with Rust type definitions.The JSON schema properly reflects all the changes from
src/models.rs, including:
- ZeroMQ endpoint types and socket variants (lines 1128-1184)
- New middleware types (CommitConcurrency, Delay)
- Expanded TLS configurations across endpoints
- Updated batch_size default of 1 (line 1023)
As this is a generated file (via the
schemafeature), the consistency between Rust types and the schema is as expected. The validation constraints and default values are appropriate.
| ReceiverSocket::Rep(s) => { | ||
| match s.recv().await { | ||
| Ok(msg) => { | ||
| let (reply_tx, reply_rx) = oneshot::channel(); | ||
| let item = ConsumerItem { | ||
| msg, | ||
| reply_tx: Some(reply_tx), | ||
| }; | ||
| if tx.send(Ok(item)).await.is_err() { | ||
| break; | ||
| } | ||
| // Wait for the reply from the consumer logic | ||
| let reply = reply_rx | ||
| .await | ||
| .unwrap_or_else(|_| ZmqMessage::from(bytes::Bytes::new())); | ||
| s.send(reply).await.map(|_| ConsumerItem { | ||
| msg: ZmqMessage::from(bytes::Bytes::new()), | ||
| reply_tx: None, | ||
| }) // Dummy return to satisfy type, we loop anyway | ||
| } | ||
| Err(e) => Err(e), | ||
| } | ||
| } |
There was a problem hiding this comment.
Silent error handling could mask missing replies.
Line 258-260 falls back to an empty ZmqMessage when reply_rx fails, which occurs if the consumer logic drops the reply channel without sending a response. This masks the issue and sends an empty reply to the Rep socket, potentially confusing the requester.
Consider logging a warning when the reply channel is dropped without a response to aid debugging.
📝 Proposed improvement
// Wait for the reply from the consumer logic
let reply = reply_rx
.await
- .unwrap_or_else(|_| ZmqMessage::from(bytes::Bytes::new()));
+ .unwrap_or_else(|_| {
+ tracing::warn!("Reply channel dropped without response, sending empty reply");
+ ZmqMessage::from(bytes::Bytes::new())
+ });🤖 Prompt for AI Agents
In @src/endpoints/zeromq.rs around lines 246 - 268, In the ReceiverSocket::Rep
match arm, don't silently substitute an empty ZmqMessage when reply_rx.await
fails; instead match on reply_rx.await, and when it Errs, emit a warning (e.g.
using tracing::warn! or the crate logger) that the consumer dropped the reply
channel (include identifying context such as the original msg or a request id if
available), then proceed to send the empty reply via s.send(reply) to preserve
behavior; update the code that constructs ConsumerItem and the s.send call
accordingly so the warning is logged before sending the fallback ZmqMessage.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @src/endpoints/zeromq.rs:
- Around line 260-262: The current reply handling uses
reply_rx.await.unwrap_or_else(|_| ZmqMessage::from(bytes::Bytes::new())) which
silently replaces a failed receive with an empty ZmqMessage; instead handle the
Result from reply_rx.await explicitly, log the error (use your logger) when
Err(e) occurs, and send a clear sentinel failure message to the Rep socket
(e.g., a ZmqMessage with an error flag or payload like "consumer_failed") rather
than an empty Bytes value; update the code path around reply_rx,
ZmqMessage::from, and bytes::Bytes::new() to perform the error log and construct
a failure-indicating ZmqMessage.
🧹 Nitpick comments (3)
src/endpoints/zeromq.rs (3)
237-287: Refactor Rep socket handling for clarity.The Rep socket logic (lines 248-280) is convoluted: it sends the item, waits for a reply inline, sends the reply back to the socket, then continues with special-case error handling. The dummy
ConsumerItemreturn on lines 263-266 and the comment on line 274 indicate this structure is awkward.Consider restructuring to separate receive logic from reply logic, or using a clearer control flow pattern.
358-362: Consider usingif letfor buffer draining.Line 359 uses
unwrap()which is safe due to the check on line 358, but could be more idiomatic with pattern matching.♻️ More idiomatic alternative
while messages.len() < max_messages && !self.buffer.is_empty() { - let buffered = self.buffer.pop_front().unwrap(); - messages.push(buffered.msg); - contexts.push(buffered.reply_context); + if let Some(buffered) = self.buffer.pop_front() { + messages.push(buffered.msg); + contexts.push(buffered.reply_context); + } }
429-466: Consider expanding test coverage.The test covers only the Push/Pull pattern. Consider adding tests for:
- Pub/Sub pattern with topic filtering
- Req/Rep pattern with reply handling
- Batch processing with multiple messages
- Error cases (connection failures, serialization errors)
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/endpoints/zeromq.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/endpoints/zeromq.rs (4)
src/middleware/delay.rs (4)
new(17-22)new(48-53)as_any(37-39)as_any(71-73)src/models.rs (2)
new(188-195)channel(211-216)src/canonical_message.rs (7)
new(23-29)s(64-64)from(112-114)from(118-120)from(124-126)from(130-132)from(144-149)tests/integration/route.rs (1)
as_any(260-262)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (aws)
- GitHub Check: Build (full)
- GitHub Check: Build (nats)
- GitHub Check: Build (amqp)
- GitHub Check: Build (http)
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Integration Tests
- GitHub Check: Check
- GitHub Check: Documentation
- GitHub Check: Test
- GitHub Check: Performance Benchmarks
🔇 Additional comments (1)
src/endpoints/zeromq.rs (1)
1-14: No action needed. The zeromq crate version "0.4" in use is the latest stable minor version (0.4.1 released Oct 2024), and no security advisories exist for the pure-Rust zeromq implementation. The concern in the original review is not substantiated.Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @src/endpoints/http.rs:
- Around line 90-99: The code logs the server start twice for both TLS and
non-TLS branches; remove the duplicate info! calls so each branch logs a single
message including addr and workers: keep one info! invocation in the tls branch
(before calling load_rustls_config and bind_rustls_0_23) and one in the non-tls
branch (before server.bind), referencing tls_config.is_tls_server_configured(),
addr, and workers to locate the lines to change.
🧹 Nitpick comments (1)
Cargo.toml (1)
70-71: Consider updating zeromq to 0.5.0 when it reaches stable release.The zeromq crate at version 0.4 has no known security vulnerabilities and is the latest stable release. A pre-release version 0.5.0-pre is available (updated December 2024), but consider upgrading to it only once a stable 0.5.x release is available. The crate is actively maintained.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
Cargo.tomlsrc/endpoints/http.rssrc/endpoints/mod.rssrc/models.rs
🧰 Additional context used
🧬 Code graph analysis (3)
src/endpoints/mod.rs (1)
src/endpoints/http.rs (2)
new(47-125)new(272-295)
src/endpoints/http.rs (2)
src/endpoints/mod.rs (1)
create_publisher_from_route(202-213)src/traits.rs (2)
handle(21-21)handle(36-38)
src/models.rs (1)
src/type_handler.rs (1)
default(66-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Integration Tests
- GitHub Check: Build (nats)
- GitHub Check: Build (aws)
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Build (http)
- GitHub Check: Test on ubuntu-latest with Rust stable
- GitHub Check: Build (full)
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Build (kafka)
- GitHub Check: Build (mongodb)
- GitHub Check: Test on ubuntu-latest with Rust beta
- GitHub Check: Documentation
- GitHub Check: Test
- GitHub Check: Performance Benchmarks
🔇 Additional comments (23)
src/endpoints/mod.rs (4)
29-30: LGTM: ZeroMQ module declaration follows established patterns.The module declaration is properly feature-gated and consistent with other endpoint modules.
150-157: LGTM: HTTP feature gating updated for actix-web migration.The feature gate name change from "axum" to "http-server" correctly aligns with the HTTP framework migration to actix-web, and the error message has been updated accordingly.
287-290: LGTM: ZeroMQ publisher creation follows established patterns.The publisher creation logic is consistent with other endpoints and properly feature-gated.
135-142: ZeroMQ consumer implementations are properly implemented.Both
ZeroMqSubscriber::newandZeroMqConsumer::newexist insrc/endpoints/zeromq.rswith correct async signatures returninganyhow::Result<Self>. The consumer creation logic correctly branches onConsumerMode::Subscribe, and error handling via the?operator is appropriate. No changes needed.Cargo.toml (5)
3-3: Version bump is appropriate, but note potential breaking change.The minor version bump from 0.1.5 to 0.1.6 is suitable for new features (ZeroMQ support). However, be aware that changing the default batch size (in src/models.rs) could affect existing deployments and might warrant documentation or migration notes.
19-19: LGTM: serde_json patch update.Patch version update from 1.0.148 to 1.0.149 should be safe and likely contains bug fixes.
75-76: LGTM: Feature organization is well-structured.The new features (
middleware,schema,zeromq) are properly organized, and thehttp-serverfeature has been correctly updated to useactix-web. Thefullfeature appropriately includes the newzeromqendpoint.Also applies to: 80-81, 87-88, 98-98, 101-101
25-26: Dependency versions are current and secure.Both
async-channel2.5.0 andonce_cell1.21.3 are the latest available versions on crates.io with no active security vulnerabilities. The historical RUSTSEC-2019-0017 advisory foronce_cellonly affects versions < 1.0.1 and does not impact version 1.21.3.
62-62: Update actix-web to 4.12.1 (latest patch).Version 4.12 is secure with no known CVEs, but 4.12.1 is now available. Additionally, verify the migration from axum to actix-web is thoroughly tested, as this represents a significant framework change affecting the HTTP handling layer.
src/endpoints/http.rs (5)
74-81: LGTM: Worker count configuration is sensible.The logic correctly defaults to the number of available CPU cores when
workersis 0, with a safe fallback to 1 if detection fails. This is a standard pattern for actix-web applications.
104-117: LGTM: Server lifecycle management is correct.The server spawn and shutdown handling properly coordinate startup signaling and graceful shutdown via actix-web's server handle.
129-154: LGTM: TLS configuration helper is well-structured.The
load_rustls_confighelper properly extracts TLS setup logic, with appropriate error handling and standard rustls patterns.
182-242: LGTM: Request handling properly migrated to actix-web.The
handle_requestfunction signature and response handling have been correctly updated for actix-web, with appropriate error responses and content-type handling.
641-641: LGTM: Test assertions correctly updated.The test assertions properly use
reqwest::StatusCodefor the HTTP client responses, aligning with the framework migration.Also applies to: 684-684
src/models.rs (9)
49-51: Default batch size change to 1 - ensure this is documented.Changing the default batch size to 1 will affect all routes that don't explicitly configure
batch_size. This may:
- Reduce throughput in high-volume scenarios (more batching overhead)
- Improve latency for low-volume scenarios (faster processing)
Ensure this change is documented in release notes and migration guides.
277-277: LGTM: CommitConcurrencyMiddleware properly defined.The new middleware for limiting concurrent commit tasks is well-defined with clear documentation and appropriate serde/schema attributes.
Also applies to: 294-300
280-280: LGTM: DelayMiddleware properly defined.The delay middleware is simple and well-documented, with appropriate type and schema annotations.
Also applies to: 340-345
415-433: LGTM: Kafka configuration enhancements are comprehensive.The expanded KafkaConfig properly adds authentication, TLS, and flexible producer/consumer options for librdkafka configuration.
455-484: LGTM: NATS configuration enhancements are well-structured.The expanded NatsConfig adds comprehensive authentication, request-reply pattern support, and JetStream configuration options with clear documentation.
619-630: LGTM: MQTT protocol version configuration added.The
MqttProtocolenum properly supports MQTT v3 and v5, with v5 as the sensible default.
260-260: LGTM: ZeroMQ configuration is comprehensive.The ZeroMQ endpoint configuration supports all common socket types (Push/Pull/Pub/Sub/Req/Rep) with flexible bind/connect modes and buffer configuration. Well-documented and properly integrated into the EndpointType enum.
Also applies to: 666-704
730-730: LGTM: HTTP workers configuration added.The
workersfield properly supports configuring the number of actix-web worker threads.
757-770: TLS configuration enhanced - note security implications.The expanded TLS configuration properly supports various authentication scenarios. The
accept_invalid_certsflag should be used carefully and ideally only for development/testing, as it disables server certificate verification.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @src/endpoints/http.rs:
- Around line 90-99: The code logs the same startup message twice for both HTTPS
and HTTP; remove the duplicate info! calls so each branch logs a single line. In
the block using tls_config.is_tls_server_configured(), keep one info!("Starting
HTTPS source on {} with {} workers", addr, workers) and remove the other
duplicate; similarly in the else branch keep one info!("Starting HTTP source on
{} with {} workers", addr, workers) and remove the duplicate. Ensure the
remaining messages stay immediately before calling load_rustls_config(...) /
server.bind_rustls_0_23(...) and server.bind(...).
🧹 Nitpick comments (1)
src/endpoints/http.rs (1)
216-216: Consider making the timeout configurable.The 30-second timeout is hardcoded. Different use cases may require different timeout values (e.g., long-running processing vs. fast responses). Consider adding a
timeoutfield toHttpConfigwith a sensible default.💡 Example implementation
Add to
HttpConfig:pub struct HttpConfig { pub url: Option<String>, pub tls: TlsConfig, pub response_out: Option<Box<Endpoint>>, pub workers: Option<usize>, pub timeout_secs: Option<u64>, // Default to 30 if None }Then use it in handle_request:
- let timeout_duration = std::time::Duration::from_secs(30); + let timeout_duration = std::time::Duration::from_secs( + state.config.timeout_secs.unwrap_or(30) + );
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/endpoints/http.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: Build (full)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (aws)
- GitHub Check: Build (nats)
- GitHub Check: Build (amqp)
- GitHub Check: Build (http)
- GitHub Check: Test on ubuntu-latest with Rust beta
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Check
- GitHub Check: Test
- GitHub Check: Test on ubuntu-latest with Rust stable
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Documentation
- GitHub Check: Integration Tests
- GitHub Check: Performance Benchmarks
🔇 Additional comments (6)
src/endpoints/http.rs (6)
74-81: LGTM: Worker configuration logic is sound.The automatic parallelism detection with sensible fallbacks (default to available cores, fallback to 1 if unavailable) is well-implemented.
245-259: LGTM: Response construction is clean.The response handling correctly extracts content-type from metadata with a sensible default and properly distinguishes between response messages and acknowledgments.
262-412: LGTM: HttpPublisher implementation is robust.The publisher correctly handles mTLS configuration, response sinks, and batch sending with proper error collection for partial failures. The concurrent send strategy in
send_batchusingjoin_allis appropriate for HTTP requests.
468-515: LGTM: Integration test covers the happy path.The test properly validates the request/response flow between HttpConsumer and HttpPublisher, including response handling via the commit callback.
643-643: LGTM: Test assertions correctly use reqwest types.The status code checks properly use
reqwest::StatusCode::OKwhich is appropriate after the migration to actix-web and reqwest.Also applies to: 687-687
129-154: The TLS configuration is correctly implemented and compatible with the library versions. The code uses rustls 0.23 with actix-web 4.12 (featurerustls-0_23), and thebind_rustls_0_23()method at line 94 properly matches this configuration. Thewith_no_client_auth()setting is intentional and consistent throughout the codebase (also used in MQTT and NATS endpoints), indicating a deliberate design choice for server-only TLS authentication without client certificates. The rustls-pemfile 2.2 dependency correctly supports certificate and key parsing for this setup.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @src/endpoints/zeromq.rs:
- Around line 437-439: The test test_zeromq_push_pull currently hardcodes port =
5556 which can cause flakiness; replace that with a dynamic allocation by
calling the existing get_free_port() helper (or equivalent) and assign its
result to port, then build url = format!("tcp://127.0.0.1:{}", port) as before;
ensure you import or reference get_free_port() in the test module so the test
uses a free ephemeral port instead of 5556.
- Around line 84-99: When a job type mismatches the socket (SenderSocket::Req
receiving a Send job, and Push/Pub sockets receiving a Request job) you must
reply on the job's oneshot so the caller doesn't hang; in the
SenderSocket::Req(_) branch send an Err through the Send job's ack oneshot (the
ack_tx used by PublisherJob::Send) and in the Push/Pub mismatch branch send an
Err through the Request job's reply_tx (used by PublisherJob::Request), include
a descriptive error message in the Err and keep the tracing::error log as well.
🧹 Nitpick comments (3)
src/endpoints/http.rs (1)
193-204: Consider logging when message-id header parsing fails.When a
message-idheader is present but doesn't match any expected format (UUID, hex, decimal), it silently falls back toNone. This could mask configuration issues where clients send malformed IDs.💡 Optional: Add trace logging for parse failures
if let Ok(uuid) = Uuid::parse_str(s) { message_id = Some(uuid.as_u128()); } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) { message_id = Some(n); } else if let Ok(n) = s.parse::<u128>() { message_id = Some(n); + } else { + trace!(header_value = %s, "Failed to parse message-id header, generating new ID"); }src/endpoints/zeromq.rs (1)
262-273: Fallback reply message on consumer failure lacks context.When the consumer logic fails to provide a reply (line 262-268), the fallback
"consumer_failed"message is sent. This is a reasonable fallback, but consider including more context for debugging.Additionally, the dummy
ConsumerItemreturn (lines 270-273) is an awkward pattern to satisfy the type system.💡 Optional: Improve fallback message clarity
let reply = match reply_rx.await { Ok(msg) => msg, Err(e) => { tracing::error!( "Failed to receive reply from consumer logic: {}", e ); - ZmqMessage::from(bytes::Bytes::from("consumer_failed")) + ZmqMessage::from(bytes::Bytes::from( + r#"{"error":"consumer_failed","detail":"reply channel closed"}"# + )) } };src/models.rs (1)
774-781: Consider differentiating mTLS client and TLS server configuration checks.Both
is_mtls_client_configured()andis_tls_server_configured()perform identical checks. While this works, the semantics differ:
- mTLS client typically also needs
ca_fileto verify the server- TLS server may optionally need
ca_filefor client certificate verificationThe current implementation is functional but could be more precise.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/endpoints/http.rssrc/endpoints/zeromq.rssrc/models.rs
🧰 Additional context used
🧬 Code graph analysis (3)
src/endpoints/http.rs (3)
src/endpoints/mod.rs (1)
create_publisher_from_route(202-213)src/traits.rs (2)
handle(21-21)handle(36-38)src/canonical_message.rs (1)
s(64-64)
src/endpoints/zeromq.rs (2)
src/models.rs (2)
new(188-195)channel(211-216)src/canonical_message.rs (7)
new(23-29)s(64-64)from(112-114)from(118-120)from(124-126)from(130-132)from(144-149)
src/models.rs (1)
src/type_handler.rs (1)
default(66-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Build (http)
- GitHub Check: Build (full)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (nats)
- GitHub Check: Build (aws)
- GitHub Check: Build (amqp)
- GitHub Check: Test
- GitHub Check: Performance Benchmarks
- GitHub Check: Integration Tests
- GitHub Check: Documentation
- GitHub Check: Check
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Test on ubuntu-latest with Rust stable
🔇 Additional comments (15)
src/endpoints/http.rs (6)
1-26: LGTM - Imports and module setup are clean.The imports are well-organized with appropriate feature gates for
actix-webandreqwest. The conditional compilation markers are correctly applied.
49-131: LGTM - HttpConsumer initialization is well-structured.The server lifecycle management is properly implemented with:
- Correct worker count calculation with parallelism fallback
- Proper TLS/non-TLS binding paths
- Clean shutdown coordination via watch channel
- Server readiness signaling before awaiting
134-160: LGTM - TLS configuration loading is correct.The rustls configuration properly:
- Loads certificate chain and private key from PEM files
- Uses
with_no_client_auth()for server-side TLS without requiring client certificates- Provides appropriate error messages for missing files
263-278: LGTM - Response construction is appropriate.The
make_responsefunction correctly:
- Extracts content-type from message metadata with a sensible default
- Returns 202 Accepted for acknowledged but no-response scenarios
280-431: LGTM - HttpPublisher implementation is solid.The publisher correctly:
- Configures mTLS when certificates are provided
- Propagates message metadata as HTTP headers
- Handles batch sends in parallel with proper error aggregation
- Returns appropriate
SentBatchvariants based on results
433-708: LGTM - Test coverage is comprehensive.The test suite effectively covers:
- Configuration parsing from YAML
- End-to-end consumer/publisher communication
- Server lifecycle and graceful shutdown
- Various response routing scenarios
src/endpoints/zeromq.rs (3)
112-157: LGTM - send_batch implementation handles both fire-and-forget and request-reply patterns.The batch sending logic correctly:
- Serializes messages to JSON
- Distinguishes between acknowledgment-only and request-reply modes
- Uses appropriate error classifications (Retryable vs NonRetryable)
371-396: LGTM - Batch reply coordination is well-designed.The commit closure correctly:
- Tracks pending replies per batch item
- Aggregates responses when all items are processed
- Sends the combined response only once all items have been committed
The
lock().unwrap()on line 379 is acceptable since mutex poisoning is rare and typically indicates an unrecoverable state.
404-427: LGTM - ZeroMqSubscriber is a clean wrapper.The subscriber correctly defaults to
Subsocket type and delegates all consumer operations to the underlyingZeroMqConsumer.src/models.rs (6)
49-51: Confirm the default batch size change is intentional.The default batch size has been changed to
1. This is a behavioral change that may affect performance for existing users who relied on a larger default. Ensure this is documented in release notes or migration guides.
294-345: LGTM - New middleware configuration types are well-defined.
CommitConcurrencyMiddlewareandDelayMiddlewareare properly structured with:
- Appropriate serde annotations
- Schema generation support
- Strict field validation via
deny_unknown_fields
666-704: LGTM - ZeroMQ configuration types are complete.The configuration properly supports:
- All common ZeroMQ socket patterns (Push/Pull, Pub/Sub, Req/Rep)
- Bind vs connect mode selection
- Optional topic for Pub/Sub filtering
- Configurable internal buffer size
717-733: LGTM - HttpConfig extensions are appropriate.The new fields provide useful configurability:
workersallows tuning actix-web thread pool sizemessage_id_headerenables custom message ID extractionDefault handling is properly implemented in the consumer code.
623-630: LGTM - MqttProtocol enum enables protocol version selection.Defaulting to V5 is appropriate as it's the current standard, with V3 available for legacy broker compatibility.
869-870: LGTM - Test updates handle new middleware variants.The test correctly accounts for the new
CommitConcurrencyandDelaymiddleware variants in the match arms.
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In @src/endpoints/zeromq.rs:
- Around line 442-443: Replace the hardcoded port 5556 by binding to port 0 so
the OS chooses an available port: change the code that sets the local port
variable (currently named port) and url to use "tcp://127.0.0.1:0", then after
binding the ZeroMQ socket call the socket's method to retrieve the actual bound
endpoint (e.g. get_last_endpoint() or equivalent) and use that returned URL for
peers/clients instead of the original url string; update any references that
assumed the numeric port to use the discovered endpoint.
In @src/middleware/deduplication.rs:
- Around line 149-153: When a duplicate is detected in the receive loop
(condition using is_duplicate and logging message_id_hex) the call
original_commit(None).await blocks the loop; change this to run the commit
asynchronously by spawning a background task so the loop can continue (e.g., use
tokio::spawn to call original_commit(None).await) and ensure any required
ownership/cloning of variables is addressed for the spawned task; if
ordering/ack semantics are important, alternatively document the current
blocking behavior and the throughput trade-off instead of spawning.
- Around line 59-77: The PENDING_TTL of 5 seconds (PENDING_TTL) is too short and
allows STATE_PENDING entries to expire during long processing, letting
duplicates bypass the CAS logic; change PENDING_TTL to a safe higher default
that exceeds your max processing time or make it configurable (e.g., via
env/config) and ensure code that writes STATE_PENDING uses that value;
alternatively implement a refresh/heartbeat to extend the pending entry while
processing or switch to relying on STATE_PROCESSED writes after commit so
long-running operations don't allow duplicate processing.
- Around line 155-173: The commit wrapper around original_commit currently marks
the message as processed unconditionally; change the closure in the Box::new
move |response| block so it inspects the response parameter and only calls
db.insert(&key_clone, processed_val) when the response indicates success (e.g.,
response is None or an Ok variant), otherwise do not update the DB (or
optionally record a failed state), and adjust the error/trace logs accordingly;
keep references to original_commit, key_clone, processed_val and db.insert when
making the conditional change.
In @src/route.rs:
- Around line 214-222: When send_batch returns Err(e) inside the loop, don’t
return immediately; instead capture/store the error (e.g., into a local variable
like send_error), break out of the loop, then after the loop drain commit_tasks
with while commit_tasks.join_next().await.is_some() {} and only then return the
stored error (e.g., Err(send_error.into())); update the Err branch that
currently does `return Err(e.into())` in the function containing commit_tasks so
in-flight commit tasks are awaited before propagating the error.
🧹 Nitpick comments (12)
src/route.rs (3)
141-152: Duplicate logic for extractingmax_parallel_commits.This middleware extraction pattern is duplicated in
run_concurrently(lines 242-250). Consider extracting a helper method and defining the default value as a constant.♻️ Suggested refactor
Add a helper method to the
Routeimpl:impl Route { const DEFAULT_MAX_PARALLEL_COMMITS: usize = 4096; fn max_parallel_commits(&self) -> usize { self.input .middlewares .iter() .find_map(|m| match m { models::Middleware::CommitConcurrency(c) => Some(c.limit), _ => None, }) .unwrap_or(Self::DEFAULT_MAX_PARALLEL_COMMITS) } }Then use
self.max_parallel_commits()in both runners.
186-191: Explicitdrop(permit)is redundant.The permit will be dropped automatically when the async block completes. The explicit drop is harmless but unnecessary.
♻️ Optional cleanup
commit_tasks.spawn(async move { commit(None).await; - // Permit is dropped here, releasing the slot - drop(permit); + // permit is dropped automatically here });
197-206: Potential simplification for finding first retryable error.The pattern of checking
has_retryablethen usingfindwithexpectcan be combined into a single operation.♻️ Suggested simplification
-let has_retryable = failed.iter().any(|(_, e)| matches!(e, PublisherError::Retryable(_))); -if has_retryable { - let failed_count = failed.len(); - let (_, first_error) = failed - .into_iter() - .find(|(_, e)| matches!(e, PublisherError::Retryable(_))) - .expect("has_retryable is true"); +let failed_count = failed.len(); +if let Some((_, first_error)) = failed.iter().find(|(_, e)| matches!(e, PublisherError::Retryable(_))) { return Err(anyhow::anyhow!( "Failed to send {} messages in batch. First retryable error: {}", failed_count, first_error )); }src/endpoints/zeromq.rs (4)
74-107: Detached background task has no explicit cleanup.The spawned task runs indefinitely until the channel
rxis closed (when the publisher is dropped). While this works, there's no way to explicitly abort the task or await its completion. Consider storing theJoinHandleif graceful shutdown is needed.This is acceptable for now if fire-and-forget semantics are intended, but may complicate debugging or resource tracking.
383-383: Usingstd::sync::Mutex::lock().unwrap()in async context.While the lock is held briefly and not across await points (which is correct),
unwrap()will panic if the mutex is poisoned. Consider usinglock().expect("...")with a descriptive message or handling the poisoned case.♻️ Suggested improvement
-let mut state = ctx.state.lock().unwrap(); +let mut state = ctx.state.lock().expect("BatchReplyState mutex poisoned");
306-319:decode_batchsilently falls back to raw bytes on deserialization failure.If the payload cannot be deserialized as
Vec<CanonicalMessage>orCanonicalMessage, it falls back to wrapping raw bytes. This could mask protocol errors or corrupted data. Consider logging a warning when falling back to raw bytes.♻️ Suggested improvement
if let Ok(message) = serde_json::from_slice::<CanonicalMessage>(&payload) { return Ok(vec![message]); } +tracing::debug!("ZeroMQ message could not be deserialized, treating as raw payload"); Ok(vec![CanonicalMessage::new(payload.to_vec(), None)])
284-291: Control flow for Rep socket handling is convoluted.The comment acknowledges this. The Rep socket case sends the item inside the match, then continues the loop, while other sockets send after the match. Consider restructuring for clarity.
One approach is to use a separate helper or restructure the match to return an
Option<ConsumerItem>:let item_opt = match &mut socket { ReceiverSocket::Rep(s) => { // Handle rep inline, return None to skip outer send // ... None } ReceiverSocket::Pull(s) => Some(s.recv().await.map(|msg| ConsumerItem { msg, reply_tx: None })), // ... }; if let Some(item_res) = item_opt { // send to tx }src/endpoints/http.rs (2)
193-204: Consider adding debug logging for message ID parsing failuresThe multiple format parsing attempts are good, but failures are silent. Adding a debug-level log when the header is present but parsing fails would aid troubleshooting.
📝 Suggested enhancement
let mut message_id = None; if let Some(header_value) = req.headers().get(state.message_id_header.as_str()) { if let Ok(s) = header_value.to_str() { if let Ok(uuid) = Uuid::parse_str(s) { message_id = Some(uuid.as_u128()); } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) { message_id = Some(n); } else if let Ok(n) = s.parse::<u128>() { message_id = Some(n); + } else { + tracing::debug!(header_value = %s, "Failed to parse message ID from header"); } } }
251-251: Consider using 503 Service Unavailable for pipeline closure
InternalServerError(500) suggests a permanent problem, whereas "Pipeline closed" might be temporary.ServiceUnavailable(503) would better indicate that the service might recover.♻️ Suggested improvement
- Err(_) => HttpResponse::InternalServerError().body("Pipeline closed"), + Err(_) => HttpResponse::ServiceUnavailable().body("Pipeline closed"),src/middleware/deduplication.rs (3)
78-147: Refactor the busy-wait CAS loop to use exponential backoff.The current CAS loop yields only after every 10 iterations, which can lead to excessive CPU usage and contention under high load. A tight loop of up to 1000 attempts may thrash the CPU and the database, especially if multiple threads are competing for the same keys.
♻️ Recommended refactor with exponential backoff
let mut is_duplicate = false; - let mut yield_counter = 0; let mut total_attempts = 0; + let mut backoff_ms = 1; const MAX_TOTAL_ATTEMPTS: usize = 1000; loop { if total_attempts >= MAX_TOTAL_ATTEMPTS { return Err(ConsumerError::Connection(anyhow::anyhow!( "Deduplication CAS exceeded max attempts for message ID {}", message_id_hex ))); } - if yield_counter > 10 { - tokio::task::yield_now().await; - yield_counter = 0; + if total_attempts > 0 { + tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(100); // Cap at 100ms } - yield_counter += 1; total_attempts += 1; // ... rest of CAS logicThis reduces contention and CPU usage while still making forward progress.
176-176: Consider deterministic cleanup triggering instead of random chance.The cleanup task is triggered with a random probability (
rand::random::<u8>() < 5, approximately 1.95% chance per message). This non-deterministic approach means:
- Under low message rates, cleanup may be delayed indefinitely
- Under high message rates, cleanup may run too frequently
- Behavior is unpredictable and harder to test
♻️ Alternative: Time-based or counter-based cleanup
Option 1: Time-based (periodic background task)
// In DeduplicationConsumer::new, spawn a periodic cleanup task let db_clone = db.clone(); let ttl = config.ttl_seconds; tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); loop { interval.tick().await; // ... cleanup logic ... } });Option 2: Counter-based (every N messages)
// Add counter field to DeduplicationConsumer cleanup_counter: AtomicUsize, // In receive loop: if self.cleanup_counter.fetch_add(1, Ordering::Relaxed) % 50 == 0 { // Trigger cleanup every 50 messages tokio::spawn(async move { /* cleanup */ }); }Both approaches provide predictable cleanup behavior and easier testing.
59-63: Optional: Move constants to module level for clarity.The constants
STATE_PENDING,STATE_PROCESSED, andPENDING_TTLare defined within thereceivefunction. While this scopes them appropriately, moving them to module level would improve discoverability and align with typical Rust style for constants that define the protocol of the deduplication mechanism.♻️ Move constants to module level
use tracing::{debug, error, info, instrument, trace, warn}; +/// Deduplication state values stored in the DB entry's first byte +const STATE_PENDING: u8 = 0; +const STATE_PROCESSED: u8 = 1; + +/// TTL for pending state entries (short to allow crash recovery) +const PENDING_TTL: u64 = 5; + pub struct DeduplicationConsumer { inner: Box<dyn MessageConsumer>,Then remove the definitions from inside the function (lines 59-63).
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/endpoints/http.rssrc/endpoints/zeromq.rssrc/middleware/deduplication.rssrc/route.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/endpoints/zeromq.rs (2)
src/models.rs (1)
new(188-195)src/canonical_message.rs (7)
new(23-29)s(64-64)from(112-114)from(118-120)from(124-126)from(130-132)from(144-149)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (nats)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (http)
- GitHub Check: Build (aws)
- GitHub Check: Build (full)
- GitHub Check: Build (amqp)
- GitHub Check: Performance Benchmarks
- GitHub Check: Integration Tests
- GitHub Check: Test
- GitHub Check: Check
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Documentation
- GitHub Check: Test on ubuntu-latest with Rust beta
- GitHub Check: Test on macos-latest with Rust stable
🔇 Additional comments (15)
src/route.rs (3)
337-348: Worker exit is treated as unexpected, but it may be intentional afterbreak.When a worker breaks due to a retryable error (line 296) or semaphore closure (lines 271, 305),
join_set.join_next()will fire withOk(()). The current logic at lines 339-342 treats this as an error and shuts down the route. This is likely intentional for error propagation, but the error message "finished unexpectedly" may be misleading when the worker intentionally exited after sending an error toerr_tx.Consider whether the error message should differentiate between intentional exits (after error reporting) and truly unexpected exits. The current behavior is safe but may produce confusing logs when debugging.
22-35: LGTM!The
Route::newconstructor is straightforward and correctly initializes the route with default concurrency and batch size.
41-114: LGTM!The
runmethod properly sets up the reconnect loop with internal shutdown channels, timeout handling, and graceful shutdown coordination. The 5-second timeout for route startup and the reconnect backoff are reasonable defaults.src/endpoints/zeromq.rs (4)
389-390: Filtered responses may silently drop None values.The
filter_mapdiscardsNoneresponses, which means if a message didn't get a response, it won't appear in the final reply. If the receiver expects a response for each input message (positional correspondence), this could cause issues.Verify that the design intent is to return only non-None responses rather than preserving positional correspondence. If positional correspondence is needed, consider using
mapwith a default or returningOption<CanonicalMessage>in the response.
32-114: LGTM on publisher structure and socket handling.The publisher correctly handles Push, Pub, and Req socket types with appropriate bind/connect logic. The background task dispatching between Send and Request jobs is well-structured.
359-401: LGTM onreceive_batchimplementation.The batch receiving logic with buffer management and reply context coordination is well-designed. The commit closure properly handles the BatchReplyState for request-reply patterns.
469-469: No issue found –sendmethod exists as a trait default implementation.The
MessagePublishertrait provides a default implementation ofsend()at src/traits.rs:119 that wrapssend_batch(). The code at line 469 is correct and will compile without issues.ZeroMqPublisheronly needs to implementsend_batch(), and it correctly inherits the defaultsend()method from the trait.Likely an incorrect or invalid review comment.
src/endpoints/http.rs (8)
59-67: LGTM: Message ID header configurationThe configurable message ID header with sensible default is well-implemented.
81-88: LGTM: Worker count calculationThe worker count logic correctly uses
available_parallelism()with appropriate fallback handling.
236-249: LGTM: Response sink handling logicThe conditional response routing (direct vs. sink-mediated) is correctly implemented with appropriate fallbacks.
262-276: LGTM: Response constructionThe response mapping correctly handles both explicit responses and acknowledgments, with appropriate HTTP status codes (200 OK vs. 202 Accepted).
89-106: LGTM: Server configuration and TLS bindingThe actix-web server setup correctly handles both TLS and non-TLS modes with appropriate worker configuration and manual shutdown handling.
107-132: LGTM: Server lifecycle managementThe server spawning, ready signaling, and graceful shutdown handling are correctly implemented with proper async coordination.
492-493: No action needed. HttpConfig properly implements the Default trait with sensible defaults for all fields (url, tls, response_out, workers, message_id_header). The derive macro#[derive(Debug, Deserialize, Serialize, Clone, Default)]ensures correct default initialization, and TlsConfig also derives Default. The pattern..Default::default()used throughout the tests is safe and correct.Likely an incorrect or invalid review comment.
135-160: API usage is correct for rustls 0.23.0The rustls API chain
ServerConfig::builder().with_no_client_auth().with_single_cert(certs, key)?is correct and aligns with best practices for this version. Note thatwith_no_client_auth()means the server does not require client certificates; if mutual TLS (mTLS) is needed in the future, this would need adjustment.
| let port = 5556; | ||
| let url = format!("tcp://127.0.0.1:{}", port); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check the file exists and examine lines around 442-443
wc -l src/endpoints/zeromq.rsRepository: marcomq/mq-bridge
Length of output: 88
🏁 Script executed:
# Look at the specific lines in context
sed -n '430,455p' src/endpoints/zeromq.rsRepository: marcomq/mq-bridge
Length of output: 693
🏁 Script executed:
# Check if this is test code
sed -n '1,50p' src/endpoints/zeromq.rs | head -30Repository: marcomq/mq-bridge
Length of output: 905
🏁 Script executed:
# Run the provided search for other hardcoded ZeroMQ ports
rg -n "tcp://127.0.0.1:\d+" --type rustRepository: marcomq/mq-bridge
Length of output: 267
🏁 Script executed:
# Look at all test functions in zeromq.rs
rg -n "#\[.*test\]" src/endpoints/zeromq.rs -A 5Repository: marcomq/mq-bridge
Length of output: 270
🏁 Script executed:
# Check Cargo.toml for test parallelism configuration
cat Cargo.toml | grep -A 10 "\[\[test\]\]"Repository: marcomq/mq-bridge
Length of output: 43
🏁 Script executed:
# Check if there are other test functions using the same port in this file
rg -n "5556" src/endpoints/zeromq.rsRepository: marcomq/mq-bridge
Length of output: 89
🏁 Script executed:
# Check if there are more test functions in zeromq.rs that we might have missed
rg -n "async fn test_" src/endpoints/zeromq.rsRepository: marcomq/mq-bridge
Length of output: 103
🏁 Script executed:
# Let's see the entire test module to understand the structure better
sed -n '440,477p' src/endpoints/zeromq.rsRepository: marcomq/mq-bridge
Length of output: 1313
Use dynamic port assignment to prevent test flakiness.
The test uses a hardcoded port (5556) which can fail if the port is already in use. Consider using port 0 to let the OS assign an available port dynamically, a standard practice for test reliability.
🤖 Prompt for AI Agents
In @src/endpoints/zeromq.rs around lines 442 - 443, Replace the hardcoded port
5556 by binding to port 0 so the OS chooses an available port: change the code
that sets the local port variable (currently named port) and url to use
"tcp://127.0.0.1:0", then after binding the ZeroMQ socket call the socket's
method to retrieve the actual bound endpoint (e.g. get_last_endpoint() or
equivalent) and use that returned URL for peers/clients instead of the original
url string; update any references that assumed the numeric port to use the
discovered endpoint.
| // Use a prefix to distinguish between pending (0) and processed (1) states. | ||
| // Pending state has a short TTL to allow recovery from crashes. | ||
| const STATE_PENDING: u8 = 0; | ||
| const STATE_PROCESSED: u8 = 1; | ||
| const PENDING_TTL: u64 = 5; | ||
|
|
||
| let mut pending_val = Vec::with_capacity(9); | ||
| pending_val.push(STATE_PENDING); | ||
| pending_val.extend_from_slice(&now_bytes); | ||
|
|
||
| let mut processed_val = Vec::with_capacity(9); | ||
| processed_val.push(STATE_PROCESSED); | ||
| processed_val.extend_from_slice(&now_bytes); | ||
|
|
||
| // Attempt atomic insert-if-absent to reserve the message ID | ||
| let mut is_duplicate = false; | ||
| let mut yield_counter = 0; | ||
| let mut total_attempts = 0; | ||
| const MAX_TOTAL_ATTEMPTS: usize = 1000; |
There was a problem hiding this comment.
Critical: PENDING_TTL is too short for typical message processing.
The PENDING_TTL of 5 seconds (line 63) creates a race condition: if processing a message takes longer than 5 seconds, the pending entry expires. When a duplicate message arrives after expiration, the CAS logic at lines 117-120 treats the pending entry as expired and overwrites it (lines 121-135), allowing the same message to be processed multiple times.
This defeats the purpose of deduplication for any message that requires more than 5 seconds to process (e.g., network calls, database operations, external API calls).
Impact: Duplicate processing, potential data corruption, idempotency violations.
🔧 Suggested fix
Increase PENDING_TTL to a value that covers the expected maximum processing time, or make it configurable:
- const PENDING_TTL: u64 = 5;
+ const PENDING_TTL: u64 = 300; // 5 minutes, or make this configurableAlternatively, consider implementing a heartbeat mechanism to refresh the pending state during long-running processing, or redesign to rely solely on the processed state after commit.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Use a prefix to distinguish between pending (0) and processed (1) states. | |
| // Pending state has a short TTL to allow recovery from crashes. | |
| const STATE_PENDING: u8 = 0; | |
| const STATE_PROCESSED: u8 = 1; | |
| const PENDING_TTL: u64 = 5; | |
| let mut pending_val = Vec::with_capacity(9); | |
| pending_val.push(STATE_PENDING); | |
| pending_val.extend_from_slice(&now_bytes); | |
| let mut processed_val = Vec::with_capacity(9); | |
| processed_val.push(STATE_PROCESSED); | |
| processed_val.extend_from_slice(&now_bytes); | |
| // Attempt atomic insert-if-absent to reserve the message ID | |
| let mut is_duplicate = false; | |
| let mut yield_counter = 0; | |
| let mut total_attempts = 0; | |
| const MAX_TOTAL_ATTEMPTS: usize = 1000; | |
| // Use a prefix to distinguish between pending (0) and processed (1) states. | |
| // Pending state has a short TTL to allow recovery from crashes. | |
| const STATE_PENDING: u8 = 0; | |
| const STATE_PROCESSED: u8 = 1; | |
| const PENDING_TTL: u64 = 300; // 5 minutes, or make this configurable | |
| let mut pending_val = Vec::with_capacity(9); | |
| pending_val.push(STATE_PENDING); | |
| pending_val.extend_from_slice(&now_bytes); | |
| let mut processed_val = Vec::with_capacity(9); | |
| processed_val.push(STATE_PROCESSED); | |
| processed_val.extend_from_slice(&now_bytes); | |
| // Attempt atomic insert-if-absent to reserve the message ID | |
| let mut is_duplicate = false; | |
| let mut yield_counter = 0; | |
| let mut total_attempts = 0; | |
| const MAX_TOTAL_ATTEMPTS: usize = 1000; |
🤖 Prompt for AI Agents
In @src/middleware/deduplication.rs around lines 59 - 77, The PENDING_TTL of 5
seconds (PENDING_TTL) is too short and allows STATE_PENDING entries to expire
during long processing, letting duplicates bypass the CAS logic; change
PENDING_TTL to a safe higher default that exceeds your max processing time or
make it configurable (e.g., via env/config) and ensure code that writes
STATE_PENDING uses that value; alternatively implement a refresh/heartbeat to
extend the pending entry while processing or switch to relying on
STATE_PROCESSED writes after commit so long-running operations don't allow
duplicate processing.
| if is_duplicate { | ||
| info!(message_id = %message_id_hex, "Duplicate message detected and skipped"); | ||
| original_commit(None).await; | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Blocking commit for duplicates may impact throughput.
When a duplicate is detected, original_commit(None).await is called synchronously within the receive loop (line 151). This blocks the loop from processing the next message until the commit completes. If the commit involves network I/O (e.g., ACKing to a broker), this can significantly reduce throughput, especially under high duplicate message rates.
In contrast, non-duplicate messages return the commit to the caller, who invokes it asynchronously after processing.
Options to consider:
- Spawn the commit:
tokio::spawn(async move { original_commit(None).await; });— trades ordering guarantees for throughput. - Accept the trade-off: If commit ordering and immediate acknowledgment of duplicates are critical, document this behavior and its performance implications.
If ordering is required, consider documenting this trade-off so users understand the performance characteristics when duplicate rates are high.
🤖 Prompt for AI Agents
In @src/middleware/deduplication.rs around lines 149 - 153, When a duplicate is
detected in the receive loop (condition using is_duplicate and logging
message_id_hex) the call original_commit(None).await blocks the loop; change
this to run the commit asynchronously by spawning a background task so the loop
can continue (e.g., use tokio::spawn to call original_commit(None).await) and
ensure any required ownership/cloning of variables is addressed for the spawned
task; if ordering/ack semantics are important, alternatively document the
current blocking behavior and the throughput trade-off instead of spawning.
| let db = self.db.clone(); | ||
| let key_clone = key.clone(); | ||
|
|
||
| // Wrap commit to update DB to "processed" state | ||
| let commit = Box::new(move |response| { | ||
| Box::pin(async move { | ||
| original_commit(response).await; | ||
|
|
||
| // Update the pending marker to the final processed value | ||
| if let Err(e) = db.insert(&key_clone, processed_val) { | ||
| error!( | ||
| "Failed to update message as processed in deduplication DB: {}", | ||
| e | ||
| ); | ||
| } else { | ||
| trace!("Updated message as processed in deduplication DB"); | ||
| } | ||
| }) as crate::traits::BoxFuture<'static, ()> | ||
| }); |
There was a problem hiding this comment.
DB is updated to processed regardless of processing outcome.
The commit wrapper (lines 159-173) unconditionally updates the deduplication DB to STATE_PROCESSED after forwarding the response to the original commit. This occurs even if the response parameter indicates processing failure (Some(error)).
Implication: A message that fails processing is still marked as processed in the deduplication DB. If the same message_id arrives again (e.g., via broker retry), it will be treated as a duplicate and skipped (line 149-153), resulting in message loss for failed messages.
This enforces at-most-once semantics, which may conflict with user expectations of retry-on-failure (at-least-once).
💡 Potential fix for at-least-once semantics
Update the DB to processed state only if the response indicates success:
// Wrap commit to update DB to "processed" state
let commit = Box::new(move |response| {
Box::pin(async move {
original_commit(response).await;
- // Update the pending marker to the final processed value
- if let Err(e) = db.insert(&key_clone, processed_val) {
- error!(
- "Failed to update message as processed in deduplication DB: {}",
- e
- );
- } else {
- trace!("Updated message as processed in deduplication DB");
+ // Only mark as processed if commit succeeded (response is None)
+ if response.is_none() {
+ if let Err(e) = db.insert(&key_clone, processed_val) {
+ error!(
+ "Failed to update message as processed in deduplication DB: {}",
+ e
+ );
+ } else {
+ trace!("Updated message as processed in deduplication DB");
+ }
+ } else {
+ // On failure, remove pending state to allow retry
+ if let Err(e) = db.remove(&key_clone) {
+ warn!("Failed to remove pending state for failed message: {}", e);
+ }
}
}) as crate::traits::BoxFuture<'static, ()>
});Note: If strict at-most-once deduplication is desired, document this behavior clearly so users understand that failed messages with the same ID will not be retried.
🤖 Prompt for AI Agents
In @src/middleware/deduplication.rs around lines 155 - 173, The commit wrapper
around original_commit currently marks the message as processed unconditionally;
change the closure in the Box::new move |response| block so it inspects the
response parameter and only calls db.insert(&key_clone, processed_val) when the
response indicates success (e.g., response is None or an Ok variant), otherwise
do not update the DB (or optionally record a failed state), and adjust the
error/trace logs accordingly; keep references to original_commit, key_clone,
processed_val and db.insert when making the conditional change.
| }); | ||
| } | ||
| Err(e) => return Err(e.into()), // Propagate error to trigger reconnect | ||
| } | ||
| } | ||
| } | ||
| } | ||
| while commit_tasks.join_next().await.is_some() {} | ||
| Ok(false) // Indicate graceful shutdown due to end-of-stream |
There was a problem hiding this comment.
In-flight commits may be orphaned on error return.
When send_batch returns Err(e) at line 216, the function returns immediately without waiting for commit_tasks to complete. This could leave in-flight commits running after the route has exited.
🐛 Suggested fix
Err(e) => return Err(e.into()), // Propagate error to trigger reconnectConsider draining commit_tasks before returning:
-Err(e) => return Err(e.into()), // Propagate error to trigger reconnect
+Err(e) => {
+ while commit_tasks.join_next().await.is_some() {}
+ return Err(e.into());
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| }); | |
| } | |
| Err(e) => return Err(e.into()), // Propagate error to trigger reconnect | |
| } | |
| } | |
| } | |
| } | |
| while commit_tasks.join_next().await.is_some() {} | |
| Ok(false) // Indicate graceful shutdown due to end-of-stream | |
| }); | |
| } | |
| Err(e) => { | |
| while commit_tasks.join_next().await.is_some() {} | |
| return Err(e.into()); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| while commit_tasks.join_next().await.is_some() {} | |
| Ok(false) // Indicate graceful shutdown due to end-of-stream |
🤖 Prompt for AI Agents
In @src/route.rs around lines 214 - 222, When send_batch returns Err(e) inside
the loop, don’t return immediately; instead capture/store the error (e.g., into
a local variable like send_error), break out of the loop, then after the loop
drain commit_tasks with while commit_tasks.join_next().await.is_some() {} and
only then return the stored error (e.g., Err(send_error.into())); update the Err
branch that currently does `return Err(e.into())` in the function containing
commit_tasks so in-flight commit tasks are awaited before propagating the error.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @src/endpoints/http.rs:
- Around line 245-246: The send error from state.tx.send((message,
commit)).await is currently ignored; update the code to capture the Result's Err
variant, log the error before returning the HTTP 500 response (e.g., use the
application's logger or tracing::error!), and include context such as that
sending the (message, commit) to the bridge failed; keep the existing
HttpResponse::InternalServerError().body("Failed to send request to bridge") but
call the logger with the error prior to returning.
- Around line 206-218: Current behavior transforms empty GET body query
parameters into a JSON payload via url::form_urlencoded and serde_json::to_vec,
which changes semantics and is surprising; instead stop converting query params
into payload in the block around payload and req.query_string(), preserve
payload as-is, and ensure the raw query string is placed into metadata under the
existing key (e.g., "http_query") where downstream code already reads it; if you
must support conversion, gate it behind a clear feature flag (e.g.,
convert_query_to_payload) and add logging on serde_json::to_vec failures so
serialization errors are not silent.
- Around line 286-290: The default content-type value in the content_type
variable assignment is hardcoded to "application/json" in the unwrap_or
fallback, which may not accurately represent the actual payload being returned.
Replace this hardcoded default with a more appropriate generic default such as
"application/octet-stream" for binary data or "text/plain" for text data.
Alternatively, consider removing the unwrap_or entirely to make the content-type
explicitly required in the metadata, ensuring clients receive accurate
information about the response payload format.
🧹 Nitpick comments (3)
src/endpoints/http.rs (3)
59-62: Consider simplifying the message_id_header initialization.The current implementation clones the Option before unwrapping. This can be simplified for better readability and performance.
♻️ Suggested simplification
- let message_id_header = config - .message_id_header - .clone() - .unwrap_or_else(|| "message-id".to_string()); + let message_id_header = config + .message_id_header + .as_deref() + .unwrap_or("message-id") + .to_string();
193-204: Consider improving message ID parsing logic and documentation.The current implementation tries multiple parsing strategies (UUID, hex with "0x" prefix, decimal), which is flexible but could lead to unexpected behavior:
- The hex parsing
trim_start_matches("0x")removes all leading "0x" prefixes, which could accept malformed inputs like "0x0x123"- The final decimal parsing might parse hex strings without the "0x" prefix as decimal numbers, leading to different message IDs than intended
- The expected formats and parsing precedence are not documented
♻️ Suggested improvements
Option 1: Add validation and documentation
+ // Message ID can be provided in three formats: + // 1. UUID (e.g., "550e8400-e29b-41d4-a716-446655440000") + // 2. Hexadecimal with 0x prefix (e.g., "0x1234abcd") + // 3. Decimal number (e.g., "123456789") let mut message_id = None; if let Some(header_value) = req.headers().get(state.message_id_header.as_str()) { if let Ok(s) = header_value.to_str() { if let Ok(uuid) = Uuid::parse_str(s) { message_id = Some(uuid.as_u128()); - } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) { + } else if s.starts_with("0x") || s.starts_with("0X") { + if let Ok(n) = u128::from_str_radix(&s[2..], 16) { - message_id = Some(n); + message_id = Some(n); + } } else if let Ok(n) = s.parse::<u128>() { message_id = Some(n); } } }This ensures that "0x" prefixed strings are only parsed as hex, and prevents accepting malformed inputs.
250-250: Consider making the request timeout configurable.The 30-second timeout is hardcoded, which may not suit all use cases. Consider adding a configurable timeout field to
HttpConfigto allow users to adjust this based on their pipeline processing requirements.Example configuration addition:
pub struct HttpConfig { // ... existing fields ... pub request_timeout_secs: Option<u64>, }Then use it in the handler:
let timeout_duration = std::time::Duration::from_secs( config.request_timeout_secs.unwrap_or(30) );
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/endpoints/http.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/endpoints/http.rs (3)
src/endpoints/mod.rs (1)
create_publisher_from_route(202-213)src/traits.rs (2)
handle(21-21)handle(36-38)src/canonical_message.rs (2)
s(64-64)parse(81-83)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (full)
- GitHub Check: Build (http)
- GitHub Check: Build (nats)
- GitHub Check: Build (aws)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (amqp)
- GitHub Check: Test on macos-latest with Rust stable
- GitHub Check: Test on ubuntu-latest with Rust beta
- GitHub Check: Performance Benchmarks
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Test
- GitHub Check: Integration Tests
- GitHub Check: Check
- GitHub Check: Documentation
🔇 Additional comments (5)
src/endpoints/http.rs (5)
5-27: LGTM! Feature gates and imports are well-structured.The feature-gated imports for actix-web and reqwest are properly organized and support the migration from Axum to Actix-Web.
81-88: LGTM! Worker calculation logic is sound.The implementation correctly handles the default case by using available parallelism, with a sensible fallback to 1 worker if detection fails.
89-132: LGTM! Server initialization and lifecycle management are well-implemented.The Actix-Web server setup properly handles:
- Worker configuration
- TLS binding when configured
- Manual shutdown via watch channel
- Ready signaling to ensure server is operational before returning
135-160: LGTM! TLS configuration loading follows rustls best practices.The implementation correctly loads certificates and keys from files and constructs a rustls ServerConfig with appropriate settings for server-side TLS.
505-719: LGTM! Comprehensive test coverage for the Actix-Web migration.The test suite properly covers:
- Consumer-publisher integration
- Request-reply patterns with response sinks
- Server shutdown behavior
- Static response handling
- Response endpoint integration
The updated tests align well with the Actix-Web implementation changes.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @Cargo.toml:
- Around line 62-77: Bump the optional dependency versions in Cargo.toml to the
latest patch releases: change actix-web from "4.12" to "4.12.1", zeromq from
"0.4" to "0.4.1", and ensure schemars is at the current 1.2 patch if needed;
locate the dependency entries (actix-web, zeromq, schemars) and update their
version strings, then regenerate/update the lockfile (cargo update) so the new
patches are picked up.
🧹 Nitpick comments (3)
src/endpoints/http.rs (3)
147-154: Consider using async file I/O for TLS certificate loading.The function uses synchronous
std::fs::File::openwhich can block the async runtime. While acceptable for one-time startup operations, usingtokio::fswould be more consistent with the async context.♻️ Suggested async refactor
-fn load_rustls_config( +async fn load_rustls_config( tls_config: &crate::models::TlsConfig, ) -> anyhow::Result<rustls::ServerConfig> { let cert_file = tls_config .cert_file .as_ref() .ok_or_else(|| anyhow!("Missing cert_file"))?; let key_file = tls_config .key_file .as_ref() .ok_or_else(|| anyhow!("Missing key_file"))?; - let cert_file = std::fs::File::open(cert_file)?; - let mut cert_reader = std::io::BufReader::new(cert_file); - let certs = rustls_pemfile::certs(&mut cert_reader).collect::<Result<Vec<_>, _>>()?; + let cert_data = tokio::fs::read(cert_file).await?; + let mut cert_reader = std::io::BufReader::new(cert_data.as_slice()); + let certs = rustls_pemfile::certs(&mut cert_reader).collect::<Result<Vec<_>, _>>()?; - let key_file = std::fs::File::open(key_file)?; - let mut key_reader = std::io::BufReader::new(key_file); + let key_data = tokio::fs::read(key_file).await?; + let mut key_reader = std::io::BufReader::new(key_data.as_slice()); let key = rustls_pemfile::private_key(&mut key_reader)? .ok_or_else(|| anyhow!("No private key found"))?;Note: This would also require updating line 100 to
load_rustls_config(&tls_config).await?.
193-204: Message ID parsing could be case-insensitive for hex prefix.The hex prefix check on line 198 only handles lowercase
"0x". While uppercase"0X"will gracefully fall through to decimal parsing, consider using case-insensitive matching for robustness.♻️ Suggested improvement
- } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) { + } else if let Ok(n) = u128::from_str_radix( + s.trim_start_matches("0x").trim_start_matches("0X"), 16 + ) {Or use
s.to_lowercase()before prefix stripping.
239-239: Consider making the request timeout configurable.The 30-second timeout is hardcoded. For flexibility across different deployment scenarios, consider exposing this as a configuration option in
HttpConfig.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
Cargo.tomlsrc/endpoints/http.rssrc/endpoints/zeromq.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/endpoints/zeromq.rs
🧰 Additional context used
🧬 Code graph analysis (1)
src/endpoints/http.rs (3)
src/traits.rs (2)
handle(21-21)handle(36-38)src/canonical_message.rs (1)
s(64-64)src/route.rs (1)
msg(417-417)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (aws)
- GitHub Check: Build (full)
- GitHub Check: Build (http)
- GitHub Check: Build (mongodb)
- GitHub Check: Build (amqp)
- GitHub Check: Build (nats)
- GitHub Check: Test
- GitHub Check: Integration Tests
- GitHub Check: Test on windows-latest with Rust stable
- GitHub Check: Check
- GitHub Check: Documentation
- GitHub Check: Test on ubuntu-latest with Rust beta
- GitHub Check: Performance Benchmarks
- GitHub Check: Test on macos-latest with Rust stable
🔇 Additional comments (9)
src/endpoints/http.rs (6)
28-45: LGTM!The type definitions are appropriately feature-gated and the
HttpConsumerStatecorrectly includes the configurablemessage_id_headerfield for flexible message ID extraction.
81-88: LGTM!The worker count logic correctly defaults to available parallelism when set to 0, with a fallback to 1 if parallelism detection fails.
162-184: LGTM!The
MessageConsumerimplementation correctly handles HTTP's request-response model where messages are processed individually. The_max_messagesparameter being unused is appropriate here.
271-286: LGTM!The response construction correctly handles content-type from metadata with a sensible default, and uses appropriate HTTP status codes (200 OK for responses, 202 Accepted for acknowledgments).
288-439: LGTM!The
HttpPublisherimplementation using reqwest is well-structured with proper error handling, mTLS support, and efficient batch processing viajoin_all.
441-443: LGTM!Test module is appropriately feature-gated and provides comprehensive coverage for the HTTP consumer/publisher integration scenarios.
Cargo.toml (3)
3-3: LGTM!Version bump to 0.1.6 aligns with the new features (ZeroMQ support, actix-web migration).
78-101: LGTM!Feature definitions are well-organized:
- The
middlewaremeta-feature conveniently bundlesmetricsanddedup- The
fullfeature correctly includes the newzeromqendpoint- The
http-serverfeature properly referencesactix-webafter the migration from axum
103-119: LGTM!Dev-dependencies and platform-specific configurations remain appropriate for testing and Windows compatibility.
| actix-web = { version = "4.12", features = ["rustls-0_23"], optional = true } | ||
| reqwest = { version = "0.13.1", features = ["json"], optional = true } | ||
| axum-server = { version = "0.8.0", features = ["tls-rustls"], optional = true } | ||
|
|
||
| # AWS | ||
| aws-config = { version = "1.8.12", optional = true, features = ["behavior-version-latest"] } | ||
| aws-sdk-sqs = { version = "1.91", optional = true } | ||
| aws-sdk-sns = { version = "1.92", optional = true } | ||
|
|
||
| # ZeroMQ | ||
| zeromq = { version = "0.4", optional = true } | ||
|
|
||
| url = { version = "2.5", optional = true } | ||
|
|
||
| # Memory (for testing) | ||
| async-channel = { version = "2.5.0" } | ||
| once_cell = { version = "1.21.3" } | ||
| # Generate JSON schema | ||
| schemars = { version = "1.2", optional = true } | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
actix-web Rust crate latest version 2025
💡 Result:
The latest actix-web release is 4.12.1 (published in December 2025). [1][2]
Sources:
- docs.rs: actix-web 4.12.1 documentation. [1]
- Fedora package announcement / RPM metadata showing 4.12.1 (Dec 11, 2025). [2]
🌐 Web query:
zeromq zmq Rust crate latest version
💡 Result:
- zmq (bindings to libzmq): latest published version 0.10.0. [1]
- zeromq (native Rust zmq.rs): latest published version 0.4.1. [2]
Sources:
[1] docs.rs — zmq crate (latest).
[2] docs.rs — zeromq crate (latest).
🌐 Web query:
schemars Rust crate latest version
💡 Result:
The latest released version of the schemars crate is 1.2.0 (as of January 9, 2026). [1][2]
Sources:
- docs.rs documentation for schemars 1.2.0. [1]
- Release-monitoring entry showing prior release history (1.0.4 etc.). [2]
Consider updating to the latest patch versions of the optional dependencies.
The specified versions are valid, but newer patch releases are available:
- actix-web: update from 4.12 to 4.12.1
- zeromq: update from 0.4 to 0.4.1
- schemars 1.2 is current as of January 2026
🤖 Prompt for AI Agents
In @Cargo.toml around lines 62 - 77, Bump the optional dependency versions in
Cargo.toml to the latest patch releases: change actix-web from "4.12" to
"4.12.1", zeromq from "0.4" to "0.4.1", and ensure schemars is at the current
1.2 patch if needed; locate the dependency entries (actix-web, zeromq, schemars)
and update their version strings, then regenerate/update the lockfile (cargo
update) so the new patches are picked up.
Description
Type of Change
Testing
Checklist
Related Issues
Fixes #
Related to #
Additional Notes
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.