Skip to content

Commit 48aee50

Browse files
psteinroeclaude
andcommitted
refactor: update RabbitMQ sink for dynamic routing and payload-only output
- Make exchange and routing_key config optional (can come from event metadata) - Add resolve_exchange() and resolve_routing_key() for dynamic routing - Send only event.payload instead of full event envelope - Remove info! logging - Use DestinationError for publish failures, ConfigError for missing config - Update tests to verify payload-only content and metadata-based routing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3db5380 commit 48aee50

2 files changed

Lines changed: 223 additions & 89 deletions

File tree

src/sink/rabbitmq.rs

Lines changed: 107 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
//! RabbitMQ sink for publishing events to a message queue.
22
//!
3-
//! Publishes each event as a JSON message to the configured exchange
4-
//! with the specified routing key.
3+
//! Publishes each event's payload as a JSON message to an exchange determined by:
4+
//! 1. `exchange` key in event metadata (from subscription's metadata/metadata_extensions)
5+
//! 2. Fallback to `exchange` in sink config
56
//!
6-
//! # Payload Extensions
7+
//! The routing key is determined by:
8+
//! 1. `routing_key` key in event metadata
9+
//! 2. Fallback to `routing_key` in sink config
710
//!
8-
//! This sink does not require any specific payload extensions. However, you can
9-
//! use payload extensions to add routing metadata that can be used with RabbitMQ's
10-
//! topic exchange routing:
11+
//! # Dynamic Routing
12+
//!
13+
//! The target exchange and routing key can be configured per-event using metadata_extensions:
1114
//!
1215
//! ```sql
13-
//! payload_extensions = '[
14-
//! {"key": "routing_key", "value": "orders.created"}
16+
//! metadata_extensions = '[
17+
//! {"json_path": "exchange", "expression": "''events''"},
18+
//! {"json_path": "routing_key", "expression": "table_name || ''.'' || operation"}
1519
//! ]'
1620
//! ```
1721
@@ -24,7 +28,6 @@ use lapin::{
2428
use serde::{Deserialize, Serialize};
2529
use std::sync::Arc;
2630
use tokio::sync::Mutex;
27-
use tracing::info;
2831

2932
use crate::sink::Sink;
3033
use crate::types::TriggeredEvent;
@@ -39,11 +42,13 @@ pub struct RabbitmqSinkConfig {
3942
/// Contains credentials and should be treated as sensitive.
4043
pub url: String,
4144

42-
/// Exchange name to publish messages to.
43-
pub exchange: String,
45+
/// Exchange name to publish messages to. Optional if provided via event metadata.
46+
#[serde(default)]
47+
pub exchange: Option<String>,
4448

45-
/// Routing key for message routing.
46-
pub routing_key: String,
49+
/// Routing key for message routing. Optional if provided via event metadata.
50+
#[serde(default)]
51+
pub routing_key: Option<String>,
4752

4853
/// Optional queue name to bind to the exchange.
4954
/// If provided, the queue will be declared and bound.
@@ -56,11 +61,11 @@ pub struct RabbitmqSinkConfig {
5661
/// Safe to serialize and log. Use this for debugging and metrics.
5762
#[derive(Clone, Debug, Serialize, Deserialize)]
5863
pub struct RabbitmqSinkConfigWithoutSecrets {
59-
/// Exchange name to publish messages to.
60-
pub exchange: String,
64+
/// Exchange name to publish messages to (if configured).
65+
pub exchange: Option<String>,
6166

62-
/// Routing key for message routing.
63-
pub routing_key: String,
67+
/// Routing key for message routing (if configured).
68+
pub routing_key: Option<String>,
6469

6570
/// Optional queue name to bind to the exchange.
6671
pub queue: Option<String>,
@@ -88,19 +93,18 @@ impl From<&RabbitmqSinkConfig> for RabbitmqSinkConfigWithoutSecrets {
8893

8994
/// Sink that publishes events to a RabbitMQ exchange.
9095
///
91-
/// Each event is serialized as JSON and published to the configured exchange
92-
/// with the specified routing key. The sink maintains a persistent connection
93-
/// and channel for publishing.
96+
/// Each event's payload is serialized as JSON and published to the exchange.
97+
/// The sink maintains a persistent connection and channel for publishing.
9498
#[derive(Clone)]
9599
pub struct RabbitmqSink {
96100
/// Shared channel for publishing messages.
97101
channel: Arc<Mutex<Channel>>,
98102

99-
/// Exchange name to publish messages to.
100-
exchange: String,
103+
/// Default exchange name. Can be overridden per-event via metadata.
104+
exchange: Option<String>,
101105

102-
/// Routing key for message routing.
103-
routing_key: String,
106+
/// Default routing key. Can be overridden per-event via metadata.
107+
routing_key: Option<String>,
104108
}
105109

106110
impl RabbitmqSink {
@@ -120,41 +124,45 @@ impl RabbitmqSink {
120124

121125
let channel = connection.create_channel().await?;
122126

123-
// Declare the exchange as a topic exchange.
124-
channel
125-
.exchange_declare(
126-
&config.exchange,
127-
lapin::ExchangeKind::Topic,
128-
ExchangeDeclareOptions {
129-
durable: true,
130-
..Default::default()
131-
},
132-
FieldTable::default(),
133-
)
134-
.await?;
135-
136-
// If a queue is specified, declare and bind it.
137-
if let Some(ref queue_name) = config.queue {
127+
// Declare the exchange as a topic exchange if provided.
128+
if let Some(ref exchange) = config.exchange {
138129
channel
139-
.queue_declare(
140-
queue_name,
141-
QueueDeclareOptions {
130+
.exchange_declare(
131+
exchange,
132+
lapin::ExchangeKind::Topic,
133+
ExchangeDeclareOptions {
142134
durable: true,
143135
..Default::default()
144136
},
145137
FieldTable::default(),
146138
)
147139
.await?;
148140

149-
channel
150-
.queue_bind(
151-
queue_name,
152-
&config.exchange,
153-
&config.routing_key,
154-
QueueBindOptions::default(),
155-
FieldTable::default(),
156-
)
157-
.await?;
141+
// If a queue is specified, declare and bind it.
142+
if let Some(ref queue_name) = config.queue {
143+
if let Some(ref routing_key) = config.routing_key {
144+
channel
145+
.queue_declare(
146+
queue_name,
147+
QueueDeclareOptions {
148+
durable: true,
149+
..Default::default()
150+
},
151+
FieldTable::default(),
152+
)
153+
.await?;
154+
155+
channel
156+
.queue_bind(
157+
queue_name,
158+
exchange,
159+
routing_key,
160+
QueueBindOptions::default(),
161+
FieldTable::default(),
162+
)
163+
.await?;
164+
}
165+
}
158166
}
159167

160168
Ok(Self {
@@ -163,6 +171,30 @@ impl RabbitmqSink {
163171
routing_key: config.routing_key,
164172
})
165173
}
174+
175+
/// Resolves the exchange name for an event from metadata or config.
176+
fn resolve_exchange<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
177+
// First check event metadata for dynamic exchange.
178+
if let Some(ref metadata) = event.metadata {
179+
if let Some(exchange) = metadata.get("exchange").and_then(|v| v.as_str()) {
180+
return Some(exchange);
181+
}
182+
}
183+
// Fall back to config exchange.
184+
self.exchange.as_deref()
185+
}
186+
187+
/// Resolves the routing key for an event from metadata or config.
188+
fn resolve_routing_key<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
189+
// First check event metadata for dynamic routing key.
190+
if let Some(ref metadata) = event.metadata {
191+
if let Some(routing_key) = metadata.get("routing_key").and_then(|v| v.as_str()) {
192+
return Some(routing_key);
193+
}
194+
}
195+
// Fall back to config routing key.
196+
self.routing_key.as_deref()
197+
}
166198
}
167199

168200
impl Sink for RabbitmqSink {
@@ -175,44 +207,41 @@ impl Sink for RabbitmqSink {
175207
return Ok(());
176208
}
177209

178-
info!(
179-
"publishing {} events to RabbitMQ exchange '{}'",
180-
events.len(),
181-
self.exchange
182-
);
183-
184210
let channel = self.channel.lock().await;
185211

186212
for event in &events {
187-
// Build JSON object manually since TriggeredEvent doesn't implement Serialize.
188-
let mut json_obj = serde_json::json!({
189-
"id": event.id.id,
190-
"created_at": event.id.created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
191-
"payload": event.payload,
192-
"stream_id": format!("{:?}", event.stream_id),
193-
});
194-
195-
// Add optional fields.
196-
if let Some(ref metadata) = event.metadata {
197-
json_obj["metadata"] = metadata.clone();
198-
}
199-
if let Some(lsn) = event.lsn {
200-
json_obj["lsn"] = serde_json::json!(lsn.to_string());
201-
}
213+
// Resolve exchange from event metadata or config.
214+
let exchange = self.resolve_exchange(event).ok_or_else(|| {
215+
etl::etl_error!(
216+
etl::error::ErrorKind::ConfigError,
217+
"No exchange configured",
218+
"Exchange must be provided in sink config or event metadata"
219+
)
220+
})?;
221+
222+
// Resolve routing key from event metadata or config.
223+
let routing_key = self.resolve_routing_key(event).ok_or_else(|| {
224+
etl::etl_error!(
225+
etl::error::ErrorKind::ConfigError,
226+
"No routing_key configured",
227+
"Routing key must be provided in sink config or event metadata"
228+
)
229+
})?;
202230

203-
let payload = serde_json::to_vec(&json_obj).map_err(|e| {
231+
// Serialize payload to JSON.
232+
let payload = serde_json::to_vec(&event.payload).map_err(|e| {
204233
etl::etl_error!(
205234
etl::error::ErrorKind::InvalidData,
206-
"Failed to serialize event to JSON",
235+
"Failed to serialize payload to JSON",
207236
e.to_string()
208237
)
209238
})?;
210239

211240
// Publish to the exchange with routing key.
212241
channel
213242
.basic_publish(
214-
&self.exchange,
215-
&self.routing_key,
243+
exchange,
244+
routing_key,
216245
BasicPublishOptions::default(),
217246
&payload,
218247
BasicProperties::default()
@@ -222,15 +251,15 @@ impl Sink for RabbitmqSink {
222251
.await
223252
.map_err(|e| {
224253
etl::etl_error!(
225-
etl::error::ErrorKind::InvalidData,
254+
etl::error::ErrorKind::DestinationError,
226255
"Failed to publish event to RabbitMQ",
227256
e.to_string()
228257
)
229258
})?
230259
.await
231260
.map_err(|e| {
232261
etl::etl_error!(
233-
etl::error::ErrorKind::InvalidData,
262+
etl::error::ErrorKind::DestinationError,
234263
"Failed to confirm RabbitMQ publish",
235264
e.to_string()
236265
)

0 commit comments

Comments
 (0)