Skip to content

Commit f6163e1

Browse files
psteinroeclaude
andcommitted
refactor: update SNS sink per FEEDBACK.md
- Send only event.payload (not full event structure) - Make topic_arn optional in config (can come from event.metadata) - Add dynamic topic ARN resolution from event.metadata - Use ErrorKind::DestinationError for publish failures - Use ErrorKind::ConfigError for missing topic ARN - Remove verbose info! logging on every publish - Update tests to verify payload-only output 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9e96edb commit f6163e1

2 files changed

Lines changed: 63 additions & 59 deletions

File tree

src/sink/sns.rs

Lines changed: 41 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
//! AWS SNS sink for publishing events to an Amazon SNS topic.
22
//!
3-
//! Publishes each event as a JSON message to the configured topic ARN.
4-
//! The sink uses the AWS SDK for async message delivery.
3+
//! Publishes each event's payload as JSON to a topic ARN determined by:
4+
//! 1. `topic` key in event metadata (from subscription's metadata/metadata_extensions)
5+
//! 2. Fallback to `topic_arn` in sink config
56
//!
6-
//! # Payload Extensions
7+
//! # Dynamic Routing
78
//!
8-
//! This sink does not require any specific payload extensions. However, you can
9-
//! use payload extensions to add SNS message attributes:
9+
//! The target topic ARN can be configured per-event using metadata_extensions:
1010
//!
1111
//! ```sql
12-
//! payload_extensions = '[
13-
//! {"key": "message_group_id", "value": "orders"},
14-
//! {"key": "subject", "value": "New Order"}
12+
//! metadata_extensions = '[
13+
//! {"json_path": "topic", "expression": "''arn:aws:sns:us-east-1:123456789:'' || topic_name"}
1514
//! ]'
1615
//! ```
1716
1817
use aws_sdk_sns::Client;
1918
use etl::error::EtlResult;
2019
use serde::{Deserialize, Serialize};
2120
use std::sync::Arc;
22-
use tracing::info;
2321

2422
use crate::sink::Sink;
2523
use crate::types::TriggeredEvent;
@@ -30,8 +28,8 @@ use crate::types::TriggeredEvent;
3028
/// leaking secrets (AWS credentials, endpoint URLs) in serialized forms.
3129
#[derive(Clone, Debug, Deserialize)]
3230
pub struct SnsSinkConfig {
33-
/// SNS topic ARN to publish messages to.
34-
pub topic_arn: String,
31+
/// SNS topic ARN to publish messages to. Optional if provided via event metadata.
32+
pub topic_arn: Option<String>,
3533

3634
/// AWS region (e.g., "us-east-1").
3735
pub region: String,
@@ -54,8 +52,8 @@ pub struct SnsSinkConfig {
5452
/// Safe to serialize and log. Use this for debugging and metrics.
5553
#[derive(Clone, Debug, Serialize, Deserialize)]
5654
pub struct SnsSinkConfigWithoutSecrets {
57-
/// SNS topic ARN to publish messages to.
58-
pub topic_arn: String,
55+
/// SNS topic ARN to publish messages to (if configured).
56+
pub topic_arn: Option<String>,
5957

6058
/// AWS region.
6159
pub region: String,
@@ -91,15 +89,15 @@ impl From<&SnsSinkConfig> for SnsSinkConfigWithoutSecrets {
9189

9290
/// Sink that publishes events to an AWS SNS topic.
9391
///
94-
/// Each event is serialized as JSON and published to the configured topic.
92+
/// Each event's payload is serialized as JSON and published to the configured topic.
9593
/// The sink uses the AWS SDK with automatic retry handling.
9694
#[derive(Clone)]
9795
pub struct SnsSink {
9896
/// AWS SNS client.
9997
client: Arc<Client>,
10098

101-
/// Topic ARN to publish messages to.
102-
topic_arn: String,
99+
/// Default topic ARN. Can be overridden per-event via metadata.
100+
topic_arn: Option<String>,
103101
}
104102

105103
impl SnsSink {
@@ -144,6 +142,18 @@ impl SnsSink {
144142
topic_arn: config.topic_arn,
145143
})
146144
}
145+
146+
/// Resolves the topic ARN for an event from metadata or config.
147+
fn resolve_topic_arn<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
148+
// First check event metadata for dynamic topic
149+
if let Some(ref metadata) = event.metadata {
150+
if let Some(topic) = metadata.get("topic").and_then(|v| v.as_str()) {
151+
return Some(topic);
152+
}
153+
}
154+
// Fall back to config topic ARN
155+
self.topic_arn.as_deref()
156+
}
147157
}
148158

149159
impl Sink for SnsSink {
@@ -156,47 +166,35 @@ impl Sink for SnsSink {
156166
return Ok(());
157167
}
158168

159-
info!(
160-
"publishing {} events to SNS topic '{}'",
161-
events.len(),
162-
self.topic_arn
163-
);
164-
165169
for event in &events {
166-
// Build JSON object manually since TriggeredEvent doesn't implement Serialize.
167-
let mut json_obj = serde_json::json!({
168-
"id": event.id.id,
169-
"created_at": event.id.created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
170-
"payload": event.payload,
171-
"stream_id": format!("{:?}", event.stream_id),
172-
});
173-
174-
// Add optional fields.
175-
if let Some(ref metadata) = event.metadata {
176-
json_obj["metadata"] = metadata.clone();
177-
}
178-
if let Some(lsn) = event.lsn {
179-
json_obj["lsn"] = serde_json::json!(lsn.to_string());
180-
}
170+
// Resolve topic ARN from event metadata or config.
171+
let topic_arn = self.resolve_topic_arn(event).ok_or_else(|| {
172+
etl::etl_error!(
173+
etl::error::ErrorKind::ConfigError,
174+
"No topic ARN configured",
175+
"Topic ARN must be provided in sink config or event metadata"
176+
)
177+
})?;
181178

182-
let message = serde_json::to_string(&json_obj).map_err(|e| {
179+
// Serialize payload to JSON.
180+
let message = serde_json::to_string(&event.payload).map_err(|e| {
183181
etl::etl_error!(
184182
etl::error::ErrorKind::InvalidData,
185-
"Failed to serialize event to JSON",
183+
"Failed to serialize payload to JSON",
186184
e.to_string()
187185
)
188186
})?;
189187

190188
// Publish message to SNS topic.
191189
self.client
192190
.publish()
193-
.topic_arn(&self.topic_arn)
191+
.topic_arn(topic_arn)
194192
.message(&message)
195193
.send()
196194
.await
197195
.map_err(|e| {
198196
etl::etl_error!(
199-
etl::error::ErrorKind::InvalidData,
197+
etl::error::ErrorKind::DestinationError,
200198
"Failed to publish message to SNS",
201199
e.to_string()
202200
)
@@ -219,7 +217,7 @@ mod tests {
219217
#[test]
220218
fn test_config_without_secrets() {
221219
let config = SnsSinkConfig {
222-
topic_arn: "arn:aws:sns:us-east-1:123456789:my-topic".to_string(),
220+
topic_arn: Some("arn:aws:sns:us-east-1:123456789:my-topic".to_string()),
223221
region: "us-east-1".to_string(),
224222
endpoint_url: Some("http://localhost:4566".to_string()),
225223
access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()),
@@ -230,7 +228,7 @@ mod tests {
230228

231229
assert_eq!(
232230
without_secrets.topic_arn,
233-
"arn:aws:sns:us-east-1:123456789:my-topic"
231+
Some("arn:aws:sns:us-east-1:123456789:my-topic".to_string())
234232
);
235233
assert_eq!(without_secrets.region, "us-east-1");
236234
assert!(without_secrets.has_custom_endpoint);

tests/sns_sink_tests.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ async fn test_sns_sink_publishes_events() {
203203

204204
// Create sink.
205205
let config = SnsSinkConfig {
206-
topic_arn: topic_arn.clone(),
206+
topic_arn: Some(topic_arn.clone()),
207207
region: "us-east-1".to_string(),
208208
endpoint_url: Some(endpoint.clone()),
209209
access_key_id: Some("test".to_string()),
@@ -221,14 +221,16 @@ async fn test_sns_sink_publishes_events() {
221221
assert_eq!(messages.len(), 2, "Expected 2 messages");
222222

223223
// SNS wraps messages in an envelope, so we need to parse the outer JSON.
224+
// Only payload is sent now.
224225
for msg in &messages {
225226
let envelope: serde_json::Value =
226227
serde_json::from_str(msg).expect("Failed to parse envelope");
227228
let inner_msg = envelope["Message"].as_str().expect("Missing Message field");
228-
let event: serde_json::Value =
229-
serde_json::from_str(inner_msg).expect("Failed to parse event");
230-
assert!(event["id"].is_string());
231-
assert!(event["payload"]["key"].is_string());
229+
let payload: serde_json::Value =
230+
serde_json::from_str(inner_msg).expect("Failed to parse payload");
231+
// Payload fields from make_test_event
232+
assert!(payload["key"].is_string());
233+
assert!(payload["value"].is_string());
232234
}
233235
}
234236

@@ -240,7 +242,7 @@ async fn test_sns_sink_handles_empty_batch() {
240242
let topic_arn = create_sns_topic(&endpoint).await;
241243

242244
let config = SnsSinkConfig {
243-
topic_arn,
245+
topic_arn: Some(topic_arn),
244246
region: "us-east-1".to_string(),
245247
endpoint_url: Some(endpoint),
246248
access_key_id: Some("test".to_string()),
@@ -256,7 +258,7 @@ async fn test_sns_sink_handles_empty_batch() {
256258
}
257259

258260
#[tokio::test(flavor = "multi_thread")]
259-
async fn test_sns_sink_includes_metadata() {
261+
async fn test_sns_sink_sends_only_payload() {
260262
let port = ensure_localstack().await;
261263
let endpoint = format!("http://127.0.0.1:{}", port);
262264

@@ -266,7 +268,7 @@ async fn test_sns_sink_includes_metadata() {
266268
subscribe_sqs_to_sns(&endpoint, &topic_arn, &queue_arn).await;
267269

268270
let config = SnsSinkConfig {
269-
topic_arn,
271+
topic_arn: Some(topic_arn),
270272
region: "us-east-1".to_string(),
271273
endpoint_url: Some(endpoint.clone()),
272274
access_key_id: Some("test".to_string()),
@@ -275,11 +277,11 @@ async fn test_sns_sink_includes_metadata() {
275277

276278
let sink = SnsSink::new(config).await.expect("Failed to create sink");
277279

278-
// Create event with metadata.
280+
// Create event with metadata (metadata is used for routing, not sent).
279281
let event = TriggeredEvent {
280282
id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()),
281283
stream_id: StreamId::default(),
282-
payload: serde_json::json!({ "action": "created" }),
284+
payload: serde_json::json!({ "action": "created", "data": "test" }),
283285
metadata: Some(serde_json::json!({ "user_id": 123, "source": "api" })),
284286
lsn: Some(PgLsn::from(99999u64)),
285287
};
@@ -294,12 +296,16 @@ async fn test_sns_sink_includes_metadata() {
294296
let envelope: serde_json::Value =
295297
serde_json::from_str(&messages[0]).expect("Failed to parse envelope");
296298
let inner_msg = envelope["Message"].as_str().expect("Missing Message field");
297-
let published: serde_json::Value =
298-
serde_json::from_str(inner_msg).expect("Failed to parse event");
299-
300-
assert_eq!(published["metadata"]["user_id"], 123);
301-
assert_eq!(published["metadata"]["source"], "api");
302-
assert!(published["lsn"].is_string());
299+
let payload: serde_json::Value =
300+
serde_json::from_str(inner_msg).expect("Failed to parse payload");
301+
302+
// Only payload is sent - verify payload fields, not id/metadata/lsn.
303+
assert_eq!(payload["action"], "created");
304+
assert_eq!(payload["data"], "test");
305+
// These should NOT be in the message since we only send payload.
306+
assert!(payload.get("id").is_none());
307+
assert!(payload.get("metadata").is_none());
308+
assert!(payload.get("lsn").is_none());
303309
}
304310

305311
#[test]

0 commit comments

Comments
 (0)