Skip to content

Commit 25cd4f0

Browse files
authored
Merge pull request #22 from marcomq/dev
Add Status info
2 parents 039696c + c1ffa3f commit 25cd4f0

33 files changed

Lines changed: 2601 additions & 457 deletions

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mq-bridge"
3-
version = "0.2.9"
3+
version = "0.2.10"
44
edition = "2021"
55
authors = ["Marco Mengelkoch"]
66
rust-version = "1.75" # 2021 edition was released in 1.56
@@ -76,6 +76,7 @@ hyper = { version = "1.8", optional = true, features = ["client", "server", "htt
7676
hyper-util = { version = "0.1", optional = true, features = ["tokio", "client", "client-legacy", "server", "http1", "http2"] }
7777
hyper-rustls = { version = "0.27", optional = true, features = ["http2"] }
7878
http-body-util = { version = "0.1", optional = true }
79+
http-body = { version = "1.0", optional = true }
7980
h2 = { version = "0.4", optional = true }
8081
flate2 = { version = "1.0", optional = true }
8182
base64 = { version = "0.22", optional = true }
@@ -129,7 +130,7 @@ amqp = ["lapin", "rustls", "rustls-pemfile", "url"]
129130
nats = ["async-nats", "rustls", "rustls-pemfile"]
130131
mongodb = ["dep:mongodb"]
131132
mqtt = ["rumqttc", "rustls", "tokio-rustls", "rustls-pemfile", "url"]
132-
http = ["hyper", "hyper-util", "hyper-rustls", "http-body-util", "rustls", "tokio-rustls", "rustls-pemfile", "webpki-roots", "h2", "dep:flate2", "dep:base64"]
133+
http = ["hyper", "hyper-util", "hyper-rustls", "http-body-util", "dep:http-body", "rustls", "tokio-rustls", "rustls-pemfile", "webpki-roots", "h2", "dep:flate2", "dep:base64"]
133134
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-sns"]
134135
ibm-mq = ["mqi"]
135136
zeromq = ["dep:zeromq"]

README.md

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ It may still be possible that there are issues with
4444
- nats, if jetstream support is disabled
4545
- TLS integration, as this also hasn't been tested a lot and is usually non-trivial to set up
4646

47+
48+
Due to the large code base, it may still be possible that some endpoints may show
49+
issues in production; therefore, they should be tested locally first. They all worked locally
50+
for me and didn't show data loss during simple in-flight broker restarts.
51+
Kafka, MongoDB, IBM-MQ, Files, and Memory are considered production-ready.
52+
4753
### When to use mq-bridge
4854
* **Hybrid Messaging**: Connect systems speaking different protocols (e.g., MQTT to Kafka) without writing custom adapters.
4955
* **Infrastructure Abstraction**: Write business logic that consumes `CanonicalMessage`s, allowing you to swap the underlying transport (e.g., switching from RabbitMQ to NATS) via configuration.
@@ -64,34 +70,40 @@ It may still be possible that there are issues with
6470
* **Middleware**: Components that intercept and process messages (e.g., for error handling).
6571
* **Handler**: A programmatic component for business logic, such as transforming/consuming messages (`CommandHandler`) or subscribe them (`EventHandler`).
6672

67-
## Endpoint Behavior
73+
## Backend Features & Configuration
6874

69-
`mq-bridge` endpoints generally default to a **Consumer** pattern (Queue), where messages are persisted (if supported by the backend) and distributed among workers.
75+
`mq-bridge` endpoints generally default to a **Consumer** pattern (Queue), where messages are persisted and distributed among workers. To achieve **Subscriber** (Pub/Sub) behavior, specific configuration is required.
7076

71-
To achieve **Subscriber** (Pub/Sub) behavior—where messages are broadcast to all active instances—you must configure the specific backend accordingly. There is no global "subscriber mode" toggle; it is determined by the configuration of the endpoint.
77+
The table below summarizes the capabilities and configuration for each backend:
7278

73-
| Backend | Default Behavior (Queue) | Configuration for Subscriber (Pub/Sub) | Response Support |
79+
| Backend | Subscriber Config (Pub/Sub) | Request-Reply | Nack Support |
7480
| :--- | :--- | :--- | :--- |
75-
| **Kafka** | Persistent (Consumer Group) | Omit `group_id` (generates unique ID) | No |
76-
| **NATS** | Persistent (JetStream Durable) | Set `subscriber_mode: true` | Yes |
77-
| **AMQP** | Persistent (Durable Queue) | Set `subscribe_mode: true` | No |
78-
| **MQTT** | Persistent Session | Set `clean_session: true` | No |
79-
| **IBM MQ** | Persistent Queue | Set `topic` instead of `queue` | No |
80-
| **MongoDB** | Persistent (Collection) | Set `change_stream: true` | Yes |
81-
| **SQLx** | Persistent (Table) | Not supported | No |
82-
| **AWS** | Persistent (SQS) | Not supported directly (Use SNS->SQS) | No |
83-
| **Memory** | Ephemeral (Channel) | Set `subscribe_mode: true` | Yes |
84-
| **File** | Queue (Reads from start) | Set `mode: subscribe` (Tails file) or `mode: group_subscribe` (Persistent tail) | No |
85-
| **HTTP** | Ephemeral (Request) | N/A | Yes (Implicit) |
86-
| **ZeroMQ** | Ephemeral (PULL) | Set `socket_type: "sub"` | No |
87-
88-
### Response Mode
89-
The `response` output endpoint allows sending a reply back to the requester. This is useful for synchronous request-reply patterns (e.g., HTTP-to-NATS-to-HTTP).
90-
91-
* **Availability**: Only available if the **Input** endpoint supports request-reply (HTTP, NATS, Memory, MongoDB).
92-
* **Configuration**: Use `response: {}` as the output endpoint.
81+
| **AMQP** | Set `subscribe_mode: true` | Emulated (Property) | **Yes** (Basic.nack) |
82+
| **AWS** | N/A (Use SNS) | No | **Yes** (Visibility Timeout) |
83+
| **File** | Set `mode: subscribe` | No | Simulated (In-Memory) |
84+
| **gRPC** | N/A | No | No |
85+
| **HTTP** | N/A | **Native** (Implicit) | **Yes** (HTTP 500) |
86+
| **IBM MQ** | Set `topic` | No | **Yes** (Tx Rollback) |
87+
| **Kafka** | Omit `group_id` | Emulated (Header) | Eventual (Skip Offset) |
88+
| **Memory** | Set `subscribe_mode: true` | Emulated (Metadata) | **Yes** (Re-queue), by default **disabled** |
89+
| **MongoDB** | Set `change_stream: true` | Emulated (Metadata) | **Yes** (Unlock) |
90+
| **MQTT** | Set `clean_session: true` | Emulated (Property) | Eventual (Skip Ack) |
91+
| **NATS** | Set `subscriber_mode: true` | **Native** (Inbox) | **Yes** (JetStream Nak) |
92+
| **Sled** | Set `delete_after_read: false` | No | **Yes** (Tx Rollback) |
93+
| **SQLx** | Not supported | No | Eventual (Skip Delete) |
94+
| **ZeroMQ** | Set `socket_type: "sub"` | **Native** (REQ/REP) | No |
95+
96+
### Feature Details
97+
* **Request-Reply**:
98+
* **Native**: Uses protocol-level correlation (e.g., HTTP connection, NATS reply subject).
99+
* **Emulated**: Publishes a new message to a reply destination (specified by the `reply_to` metadata field) carrying a `correlation_id` metadata field.
100+
* **Nack Support**: If "Yes", the backend supports explicit negative acknowledgement triggering redelivery. "Eventual" means redelivery depends on timeout or connection drop. "Simulated" is handled in-memory by the bridge.
101+
102+
### Response Endpoint
103+
The `response` output endpoint allows sending a reply back to the requester. This is useful for synchronous request-reply patterns (e.g., HTTP-to-NATS-to-HTTP). Use `response: {}` as the output endpoint configuration.
104+
93105
* **Caveats**:
94-
* If the input does not support responses (e.g., File, Kafka), the message sent to `response` will be dropped.
106+
* If the input does not support responses (e.g., File, SQLx), the message sent to `response` will be dropped.
95107
* Ensure timeouts are configured correctly on the requester side, as the bridge processing time adds latency.
96108
* Middleware that drops metadata (like `correlation_id`) may break the response chain.
97109

@@ -339,18 +351,30 @@ The times are not stable yet, it is therefore recommended to perform the integra
339351

340352
## AI Disclaimer
341353

342-
This library has been widely written with AI assistance. I used Gemini for planning and writing,
343-
CodeRabbit for reviews and Copilot/Claude for bugfixing and other small things.
354+
This library has been widely written with AI assistance.
355+
356+
Some of the code - the core for example, was originally written by myself,
357+
but most other was generated by AI. I mostly used Gemini for
358+
planning and writing, CodeRabbit for reviews and Claude for bugfixing and
359+
more complicated tasks that Gemini couldn't solve properly.
344360
While some of the AI output was great, some other output wasn't.
345361
I am aware that in year 2026, AI is still not generating perfect code and sometimes
346-
even breaks simple stuff. I reviewed all the
347-
output code and re-specified it or changed the code manually whan insuficcient.
362+
breaks simple stuff or forgets important lines during refactorings that then cause
363+
severe issues.
364+
I reviewed all the output code, cleaned it up manually,
365+
re-specified and refactored it when insuficcient.
366+
**I do trust the current code as much as if it would be completely written by myself.**
367+
348368
I didn't change the AI code appearance, so you will sometimes still see code that just
349369
looks as it is plain from AI and also most of the readme here was actually written
350370
by AI. I don't think it is bad practice, to keep the original code and text appearance.
351-
I'm not an english native speaker, so the AI output for text is mostly just
352-
way better what I could write. For AI code, the readability is usually
353-
sufficient, even if it is sometimes much more verbose what I would write in code.
371+
I'm not an english native speaker, so the AI output for english text is just
372+
way better than my text. For AI code, the readability is usually
373+
good, even if it is more verbose than what I would write.
374+
However, especially for the different endpoints, there is already a lot of existing
375+
code and the AI could also just assist a lot there. Thats mostly the reason,
376+
why there are so many available endpoints in this library, they just could be added
377+
very easily and showed a sufficient code quality.
354378

355379
## License
356380
`mq-bridge` is licensed under the MIT License.

src/endpoints/amqp.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::canonical_message::tracing_support::LazyMessageIds;
22
use crate::models::AmqpConfig;
33
use crate::traits::{
4-
BatchCommitFunc, BoxFuture, ConsumerError, MessageConsumer, MessageDisposition,
4+
BatchCommitFunc, BoxFuture, ConsumerError, EndpointStatus, MessageConsumer, MessageDisposition,
55
MessagePublisher, PublisherError, ReceivedBatch, Sent, SentBatch,
66
};
77
use crate::CanonicalMessage;
@@ -297,6 +297,33 @@ impl MessagePublisher for AmqpPublisher {
297297
}
298298
}
299299

300+
async fn status(&self) -> EndpointStatus {
301+
let state = self.state.read().await;
302+
let conn_status = state.connection.status();
303+
let chan_status = state.channel.status();
304+
let healthy = conn_status.connected() && chan_status.connected();
305+
let error = if !healthy {
306+
Some(format!(
307+
"Connection: '{:?}', Channel: '{:?}'",
308+
conn_status.state(),
309+
chan_status.state()
310+
))
311+
} else {
312+
None
313+
};
314+
EndpointStatus {
315+
healthy,
316+
error,
317+
target: if self.exchange.is_empty() {
318+
self.queue.clone()
319+
} else {
320+
self.exchange.clone()
321+
},
322+
details: serde_json::json!({ "queue": self.queue, "delayed_ack": self.delayed_ack }),
323+
..Default::default()
324+
}
325+
}
326+
300327
fn as_any(&self) -> &dyn Any {
301328
self
302329
}
@@ -308,6 +335,7 @@ pub struct AmqpConsumer {
308335
channel: Channel,
309336
queue: String,
310337
is_poisoned: Arc<AtomicBool>,
338+
prefetch: u16,
311339
}
312340

313341
impl AmqpConsumer {
@@ -409,6 +437,7 @@ impl AmqpConsumer {
409437
channel,
410438
queue: queue_name,
411439
is_poisoned: Arc::new(AtomicBool::new(false)),
440+
prefetch: prefetch_count,
412441
})
413442
}
414443
}
@@ -620,6 +649,51 @@ impl MessageConsumer for AmqpConsumer {
620649
Ok(ReceivedBatch { messages, commit })
621650
}
622651

652+
async fn status(&self) -> EndpointStatus {
653+
let conn_status = self._conn.status();
654+
let chan_status = self.channel.status();
655+
let mut healthy = conn_status.connected() && chan_status.connected();
656+
let mut pending: Option<usize> = None;
657+
let mut error: Option<String> = None;
658+
659+
if healthy {
660+
let passive_declare = self.channel.queue_declare(
661+
&self.queue,
662+
lapin::options::QueueDeclareOptions {
663+
passive: true,
664+
..Default::default()
665+
},
666+
lapin::types::FieldTable::default(),
667+
);
668+
match tokio::time::timeout(Duration::from_secs(2), passive_declare).await {
669+
Ok(Ok(q)) => pending = Some(q.message_count() as usize),
670+
Ok(Err(e)) => {
671+
healthy = false;
672+
error = Some(e.to_string());
673+
}
674+
Err(e) => {
675+
healthy = false;
676+
error = Some(e.to_string());
677+
}
678+
}
679+
} else {
680+
error = Some(format!(
681+
"Connection: '{:?}', Channel: '{:?}'",
682+
conn_status.state(),
683+
chan_status.state()
684+
));
685+
}
686+
687+
EndpointStatus {
688+
healthy,
689+
target: self.queue.clone(),
690+
pending,
691+
error,
692+
capacity: Some(self.prefetch as usize),
693+
..Default::default()
694+
}
695+
}
696+
623697
fn as_any(&self) -> &dyn Any {
624698
self
625699
}
@@ -631,7 +705,16 @@ async fn handle_replies(
631705
dispositions: &[MessageDisposition],
632706
) {
633707
for ((reply_to, correlation_id), disposition) in reply_infos.iter().zip(dispositions.iter()) {
634-
if let (Some(rt), MessageDisposition::Reply(resp)) = (reply_to, disposition) {
708+
let payload = match (disposition, reply_to) {
709+
(MessageDisposition::Reply(resp), Some(_)) => Some(resp.payload.clone()),
710+
(MessageDisposition::Reply(_), None) => {
711+
tracing::warn!("MessageDisposition::Reply received but no reply_to address found in original message");
712+
None
713+
}
714+
_ => None,
715+
};
716+
717+
if let (Some(rt), Some(body)) = (reply_to, payload) {
635718
let mut props = BasicProperties::default();
636719
if let Some(cid) = correlation_id {
637720
props = props.with_correlation_id(cid.clone().into());
@@ -643,7 +726,7 @@ async fn handle_replies(
643726
"", // Default exchange
644727
rt,
645728
BasicPublishOptions::default(),
646-
&resp.payload,
729+
&body,
647730
props,
648731
)
649732
.await

0 commit comments

Comments
 (0)