Skip to content

Commit dbabfc7

Browse files
authored
feat(sink): add RabbitMQ sink for AMQP messaging
Add a RabbitMQ sink that publishes events to AMQP exchanges. Features: - Configurable exchange and routing key - Dynamic routing via metadata - Publisher confirms for delivery guarantees - Single-pass processing with borrowing for efficiency
1 parent 321b4c2 commit dbabfc7

8 files changed

Lines changed: 1297 additions & 10 deletions

File tree

Cargo.lock

Lines changed: 692 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ version = "0.1.0"
66
[features]
77
default = []
88
sink-nats = ["dep:async-nats"]
9+
sink-rabbitmq = ["dep:lapin"]
910
sink-redis-streams = ["dep:redis"]
1011
sink-redis-strings = ["dep:redis"]
1112
sink-webhook = ["dep:reqwest"]
@@ -52,6 +53,7 @@ uuid = { version = "1.19.0", default-features = false, features = ["v4"]
5253

5354
# Optional sink dependencies.
5455
async-nats = { version = "0.38", optional = true }
56+
lapin = { version = "2.5", optional = true }
5557
redis = { version = "0.27", default-features = false, features = [
5658
"tokio-comp",
5759
"connection-manager",
@@ -67,9 +69,15 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
6769
] }
6870

6971

70-
ctor = { version = "0.4", optional = true }
71-
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
72-
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "redis", "nats", "blocking"] }
72+
ctor = { version = "0.4", optional = true }
73+
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
74+
testcontainers-modules = { version = "0.11", optional = true, features = [
75+
"postgres",
76+
"redis",
77+
"nats",
78+
"rabbitmq",
79+
"blocking",
80+
] }
7381

7482
[dev-dependencies]
7583
temp-env = "0.3"

src/config/sink.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use crate::sink::redis_strings::RedisStringsSinkConfig;
99
#[cfg(feature = "sink-redis-streams")]
1010
use crate::sink::redis_streams::RedisStreamsSinkConfig;
1111

12+
#[cfg(feature = "sink-rabbitmq")]
13+
use crate::sink::rabbitmq::RabbitmqSinkConfig;
14+
1215
#[cfg(feature = "sink-webhook")]
1316
use crate::sink::webhook::WebhookSinkConfig;
1417

@@ -36,6 +39,11 @@ pub enum SinkConfig {
3639
#[serde(rename = "nats")]
3740
Nats(NatsSinkConfig),
3841

42+
/// RabbitMQ sink for AMQP messaging.
43+
#[cfg(feature = "sink-rabbitmq")]
44+
#[serde(rename = "rabbitmq")]
45+
Rabbitmq(RabbitmqSinkConfig),
46+
3947
/// Webhook sink for HTTP POST delivery.
4048
#[cfg(feature = "sink-webhook")]
4149
#[serde(rename = "webhook")]

src/core.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
117117
AnySink::Nats(s)
118118
}
119119

120+
#[cfg(feature = "sink-rabbitmq")]
121+
SinkConfig::Rabbitmq(cfg) => {
122+
use crate::sink::rabbitmq::RabbitmqSink;
123+
let s = RabbitmqSink::new(cfg.clone()).await.map_err(|e| {
124+
etl::etl_error!(
125+
etl::error::ErrorKind::InvalidData,
126+
"Failed to create RabbitMQ sink",
127+
e.to_string()
128+
)
129+
})?;
130+
AnySink::Rabbitmq(s)
131+
}
132+
120133
#[cfg(feature = "sink-webhook")]
121134
SinkConfig::Webhook(cfg) => {
122135
use crate::sink::webhook::WebhookSink;
@@ -190,6 +203,11 @@ fn log_sink_config(config: &SinkConfig) {
190203
debug!("using nats sink");
191204
}
192205

206+
#[cfg(feature = "sink-rabbitmq")]
207+
SinkConfig::Rabbitmq(_cfg) => {
208+
debug!("using rabbitmq sink");
209+
}
210+
193211
#[cfg(feature = "sink-webhook")]
194212
SinkConfig::Webhook(_cfg) => {
195213
debug!("using webhook sink");

src/sink/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub mod redis_strings;
1010
#[cfg(feature = "sink-redis-streams")]
1111
pub mod redis_streams;
1212

13+
#[cfg(feature = "sink-rabbitmq")]
14+
pub mod rabbitmq;
15+
1316
#[cfg(feature = "sink-webhook")]
1417
pub mod webhook;
1518

@@ -27,6 +30,9 @@ use redis_strings::RedisStringsSink;
2730
#[cfg(feature = "sink-redis-streams")]
2831
use redis_streams::RedisStreamsSink;
2932

33+
#[cfg(feature = "sink-rabbitmq")]
34+
use rabbitmq::RabbitmqSink;
35+
3036
#[cfg(feature = "sink-webhook")]
3137
use webhook::WebhookSink;
3238

@@ -53,6 +59,10 @@ pub enum AnySink {
5359
#[cfg(feature = "sink-nats")]
5460
Nats(NatsSink),
5561

62+
/// RabbitMQ sink for AMQP messaging.
63+
#[cfg(feature = "sink-rabbitmq")]
64+
Rabbitmq(RabbitmqSink),
65+
5666
/// Webhook sink for HTTP POST delivery.
5767
#[cfg(feature = "sink-webhook")]
5868
Webhook(WebhookSink),
@@ -76,6 +86,9 @@ impl Sink for AnySink {
7686
#[cfg(feature = "sink-nats")]
7787
AnySink::Nats(sink) => sink.publish_events(events).await,
7888

89+
#[cfg(feature = "sink-rabbitmq")]
90+
AnySink::Rabbitmq(sink) => sink.publish_events(events).await,
91+
7992
#[cfg(feature = "sink-webhook")]
8093
AnySink::Webhook(sink) => sink.publish_events(events).await,
8194
}

0 commit comments

Comments
 (0)