Skip to content
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
dcf3314
changes after review
marcomq Dec 16, 2025
3f5348a
add middleware
marcomq Dec 16, 2025
0f805cb
fixes & add middlewares as features
marcomq Dec 16, 2025
fd78502
make bulk_send more robust
marcomq Dec 16, 2025
6056fa5
stabilize errors & fmt
marcomq Dec 16, 2025
108a126
compile fix
marcomq Dec 16, 2025
058328b
add dlq retry
marcomq Dec 16, 2025
82b6465
switch from vec<u8> to bytes and fix some tests
marcomq Dec 16, 2025
2942e59
disable jetstream optionally
marcomq Dec 16, 2025
3691e5d
renam await_ack to skip_ack
marcomq Dec 16, 2025
ee84573
fix kafka sasl key
marcomq Dec 16, 2025
ea9d194
fmt
marcomq Dec 16, 2025
7175143
switch to prefer bulk
marcomq Dec 17, 2025
9762d1c
fmt
marcomq Dec 17, 2025
dbbce6c
clippy
marcomq Dec 17, 2025
fb2d88a
fix mqtt error messages
marcomq Dec 17, 2025
e98be87
rewrote read direct test for bulk
marcomq Dec 17, 2025
f93b669
rename bulk to batch
marcomq Dec 17, 2025
d8745c1
add receive_batch for kafka
marcomq Dec 18, 2025
2ed6caf
add tests for separate single and batch read / write performance
marcomq Dec 18, 2025
a117227
address some coderabbit issues
marcomq Dec 18, 2025
2bed90e
add batch read for mongodb
marcomq Dec 18, 2025
59bfe8d
try to optimize nats receive_batch (wip)
marcomq Dec 19, 2025
77c7ce5
add comment
marcomq Dec 19, 2025
00619c9
change warning log
marcomq Dec 19, 2025
05629ba
add computehandler as middleware
marcomq Dec 19, 2025
9d31696
Merge remote-tracking branch 'origin/main' into dev
marcomq Dec 19, 2025
ca67038
message_id to u128 and make app_name more generic
marcomq Dec 19, 2025
1e9171f
adding proper message_ids
marcomq Dec 19, 2025
f0ae729
adding cfg opts and retry and random_panic middelware
marcomq Dec 20, 2025
c5e9a76
raneming retry to attempt, bugfixings
marcomq Dec 20, 2025
a0dcc62
misc fixes, add performance bench
marcomq Dec 21, 2025
62eedd0
rename crate and repo
marcomq Dec 21, 2025
b4fd5d8
fix test
marcomq Dec 21, 2025
ec6288d
fix minor issues, add comments
marcomq Dec 21, 2025
553dc50
fix ci
marcomq Dec 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 289 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "hot_queue"
name = "mq-bridge"
version = "0.1.0"
edition = "2021"

Expand All @@ -17,7 +17,6 @@ tracing = "0.1"
rand = "0.9.2"
bytes = { version = "1.11.0", features = ["serde"] }


# Common TLS dependencies
rustls = { version = "0.23", features = ["ring"], optional = true }
tokio-rustls = { version = "0.26", optional = true }
Expand Down Expand Up @@ -72,7 +71,7 @@ metrics = ["dep:metrics"]
# Endpoint features
kafka = ["rdkafka", "async-stream"]
amqp = ["lapin", "rustls", "rustls-pemfile", "url"]
nats = ["async-nats", "rustls", "rustls-pemfile"]
nats = ["async-nats", "rustls", "rustls-pemfile", "uuid"]
mongodb = ["dep:mongodb", "uuid"]
mqtt = ["rumqttc", "rustls", "tokio-rustls", "rustls-pemfile", "url"]
http = ["axum", "reqwest", "axum-server", "rustls", "tokio-rustls", "rustls-pemfile"]
Expand All @@ -85,4 +84,9 @@ tempfile = "3.10"
tracing-appender = "0.2"
chrono = "0.4"
metrics-util = "0.20.1"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
criterion = { version = "0.8.1", features = ["async", "async_tokio"] }

[[bench]]
name = "performance_bench"
harness = false
208 changes: 119 additions & 89 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,89 +1,119 @@
# hot_queue
Rust library to access different stream and message queues.

TODO: I will add the actual endpoint code later. No need to already create it now.
For now, I will just create the configuration structs, so I can parse yaml and json files
correctly.

It is possible to use following seperately:
- endpoint - have most simple access, can be amqp, mongodb etc... (will be implemted later)
- route, contains 2 endpoints

Between endpoint and route, there can be multiple middleware:
- retry
- deduplication (optional via feature)
- dead letter queue
- metrics

The message that is shared is a CanonicalMessage, which has a message_id, a byte array as body -
but it will be implemented later.

Configuration Schema Overview
The application uses a hierarchical configuration system. The structure is defined by Rust structs in src/model.rs and interpreted by the serde and config crates.

The key to understanding the endpoint configuration from, out is the combination of serde attributes you've used:

YAML Structure
Here is an example config.yml file that demonstrates the full structure.

yaml
kafka_to_nats: # The top-level keys are the route names
# (Optional) Number of concurrent processing tasks for this route
concurrency: 10

# The input/source endpoint for the route
input:
# (Optional) A list of middlewares to apply to each endpoint before or after send
middlewares:
deduplication:
sled_path: "/tmp/hot_queue/dedup_db"
ttl_seconds: 3600
metrics:
# metrics doesn't have much configuration, its presence enables it.
kafka:
# --- Kafka-specific fields ---
# `topic` is from `KafkaConsumerEndpoint`
topic: "input-topic"
# `brokers` and `group_id` are from the flattened `KafkaConfig`
brokers: "localhost:9092"
group_id: "my-consumer-group"
# Other optional KafkaConfig fields like `username`, `password`, `tls`, etc.
tls:
required: true
ca_file: "/path_to_ca" # optional
cert_file: "/path_to_cert" # optional
key_file: "/path_to_key" # optional
cert_password: "password"
accept_invalid_certs: true


# The output/sink endpoint for the route
output:
nats:
# --- NATS-specific fields ---
# `subject` is from `NatsPublisherEndpoint`
subject: "output-subject"
# `url` is from the flattened `NatsConfig`
url: "nats://localhost:4222"
# Other optional NatsConfig fields like `username`, `password`, `token`, etc.

Environment Variable Structure
The config crate maps environment variables to the YAML structure. It uses a prefix (in your case, HQ) and a separator (__).

Here is how the kafka_to_nats route from the YAML example above would be defined using environment variables:
sh
# Route settings
export HQ__KAFKA_TO_NATS__CONCURRENCY=10

# Input endpoint (kafka_to_nats.input)
export HQ__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC="input-topic"
export HQ__KAFKA_TO_NATS__INPUT__KAFKA__BROKERS="localhost:9092"
export HQ__KAFKA_TO_NATS__INPUT__KAFKA__GROUP_ID="my-consumer-group"

# Output endpoint (kafka_to_nats.output)
export HQ__KAFKA_TO_NATS__OUTPUT__NATS__SUBJECT="output-subject"
export HQ__KAFKA_TO_NATS__OUTPUT__NATS__URL="nats://localhost:4222"

# Dead-Letter Queue endpoint for the input (kafka_to_nats.input.middlewares.dlq)
export HQ__KAFKA_TO_NATS__INPUT__MIDDLEWARES__DLQ__NATS__SUBJECT="dlq-subject"
export HQ__KAFKA_TO_NATS__INPUT__MIDDLEWARES__DLQ__NATS__URL="nats://localhost:4222"
# mq-bridge
`mq-bridge` is an asynchronous message bridging library for Rust. It connects different messaging systems, data stores, and protocols. It is built on Tokio and supports patterns like retries, dead-letter queues, and message deduplication.

## Features

* **Supported Backends**: Kafka, NATS, AMQP (RabbitMQ), MQTT, MongoDB, HTTP, Files, and in-memory channels.
* **Configuration**: Routes can be defined via YAML or environment variables.
* **Middleware**:
* **Retries**: Exponential backoff for transient failures.
* **Dead-Letter Queues (DLQ)**: Redirect failed messages.
* **Deduplication**: Message deduplication using `sled`.
Comment on lines +9 to +11

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix list indentation to match Markdown standards.

The nested list items use 4-space indentation, but the markdown linter expects 2-space indentation for consistency.

🔎 Proposed fix
 *   **Middleware**:
-    *   **Retries**: Exponential backoff for transient failures.
-    *   **Dead-Letter Queues (DLQ)**: Redirect failed messages.
-    *   **Deduplication**: Message deduplication using `sled`.
+  *   **Retries**: Exponential backoff for transient failures.
+  *   **Dead-Letter Queues (DLQ)**: Redirect failed messages.
+  *   **Deduplication**: Message deduplication using `sled`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* **Retries**: Exponential backoff for transient failures.
* **Dead-Letter Queues (DLQ)**: Redirect failed messages.
* **Deduplication**: Message deduplication using `sled`.
* **Middleware**:
* **Retries**: Exponential backoff for transient failures.
* **Dead-Letter Queues (DLQ)**: Redirect failed messages.
* **Deduplication**: Message deduplication using `sled`.
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)

9-9: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


10-10: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)


11-11: Unordered list indentation
Expected: 2; Actual: 4

(MD007, ul-indent)

🤖 Prompt for AI Agents
In README.md around lines 9 to 11, the nested list items under the bullet list
are indented with 4 spaces but the repo markdown style requires 2-space
indentation; update each nested list line to use 2-space indentation (replace
the 4 leading spaces with 2) so the list conforms to the linter and renders
consistently.

* **Concurrency**: Configurable concurrency per route using Tokio.

## Core Concepts

* **Route**: A named data pipeline that defines a flow from one `input` to one `output`.
* **Endpoint**: A source or sink for messages.
* **Middleware**: Components that intercept and process messages (e.g., for error handling).

## Usage

### Programmatic Usage

You can define and run routes directly in Rust code.

```rust
use mq_bridge::models::{Endpoint, EndpointType, MemoryConfig, Route};
use std::time::Duration;
use tokio::time::timeout;

#[tokio::main]
async fn main() {
// Define a route from one in-memory channel to another
let route = Route {
input: Endpoint::new(EndpointType::Memory(MemoryConfig {
topic: "mem-in".to_string(),
capacity: Some(100),
})),
output: Endpoint::new(EndpointType::Memory(MemoryConfig {
topic: "mem-out".to_string(),
capacity: Some(100),
})),
concurrency: 1,
};

// Get handles to the memory channels for testing
let in_channel = route.input.channel().unwrap();
let out_channel = route.output.channel().unwrap();

// Run the route. It will stop when the input channel is closed and empty.
let (run_result, _) = tokio::join!(
route.run_until_err("memory-test", None),
async {
// Send a message
in_channel.send_message(mq_bridge::CanonicalMessage::new(b"hello".to_vec(), None)).await.unwrap();
// Close the input channel to allow the route to terminate
in_channel.close();
}
);

// Ensure the route ran without errors
run_result.unwrap();

// Verify the message was received
let received_messages = out_channel.drain_messages();
assert_eq!(received_messages.len(), 1);
assert_eq!(received_messages[0].payload, "hello".as_bytes());

println!("Message successfully bridged from 'mem-in' to 'mem-out'!");
}
```

## Configuration Details

### Environment Variables
All YAML configuration can be overridden with environment variables. The mapping follows this pattern:
`MQB__{ROUTE_NAME}__{PATH_TO_SETTING}`

For example, to set the Kafka topic for the `kafka_to_nats` route:
```sh
export MQB__KAFKA_TO_NATS__INPUT__KAFKA__TOPIC="my-other-topic"
```

### Middleware Configuration
Middleware is defined as a list under an endpoint.

```yaml
input:
middlewares:
- retry:
max_attempts: 5
initial_interval_ms: 200
- dlq:
endpoint:
nats:
subject: "my-dlq-subject"
url: "nats://localhost:4222"
- deduplication:
sled_path: "/var/data/mq-bridge/dedup_db"
ttl_seconds: 3600 # 1 hour
kafka:
# ... kafka config
```

## Running Tests
The project includes a comprehensive suite of integration and performance tests that require Docker.

To run the performance benchmarks for all supported backends:
```sh
cargo test --test integration_test --release -- --ignored --nocapture --test-threads=1
```

To run the criterion benchmarks:
```sh
cargo bench --features "full"
```

## License
`mq-bridge` is licensed under the MIT License.
Loading
Loading