Skip to content

Commit 9e96edb

Browse files
psteinroeclaude
andcommitted
feat(sink): add AWS SNS sink for topic publishing
Adds a new SNS sink that publishes events to an Amazon SNS topic. Features: - Publishes events as JSON messages to configured topic ARN - Supports custom endpoint URL for LocalStack testing - Optional explicit credentials (falls back to default chain) - WithoutSecrets config pattern for safe logging - LocalStack container helper for integration tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 80a1ebf commit 9e96edb

8 files changed

Lines changed: 1211 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 569 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ version = "0.1.0"
77
default = []
88
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]
99

10+
# Sink features.
11+
sink-sns = ["dep:aws-sdk-sns", "dep:aws-config"]
12+
1013
[dependencies]
1114
anyhow = { version = "1.0.98", default-features = false, features = ["std"] }
1215
chrono = { version = "0.4.41", default-features = false }
@@ -54,13 +57,18 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
5457
] }
5558

5659

60+
# Optional sink dependencies.
61+
aws-config = { version = "1.6", optional = true, default-features = false, features = ["behavior-version-latest", "rt-tokio", "rustls"] }
62+
aws-sdk-sns = { version = "1.74", optional = true, default-features = false, features = ["behavior-version-latest", "rt-tokio", "rustls"] }
63+
5764
ctor = { version = "0.4", optional = true }
5865
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
59-
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] }
66+
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "localstack", "blocking"] }
6067

6168
[dev-dependencies]
62-
temp-env = "0.3"
63-
tempfile = "3.13"
69+
aws-sdk-sqs = { version = "1.74", default-features = false, features = ["behavior-version-latest", "rt-tokio", "rustls"] }
70+
temp-env = "0.3"
71+
tempfile = "3.13"
6472

6573
[lints.clippy]
6674
fallible_impl_from = "deny"

src/config/sink.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use serde::Deserialize;
22

3+
#[cfg(feature = "sink-sns")]
4+
use crate::sink::sns::SnsSinkConfig;
5+
36
/// Sink destination configuration.
47
///
58
/// Determines where replicated events are sent.
@@ -8,4 +11,8 @@ use serde::Deserialize;
811
pub enum SinkConfig {
912
/// In-memory sink for testing and development.
1013
Memory,
14+
15+
/// AWS SNS sink for topic publishing.
16+
#[cfg(feature = "sink-sns")]
17+
Sns(SnsSinkConfig),
1118
}

src/core.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
7777
// Create sink based on configuration.
7878
let sink = match &config.sink {
7979
SinkConfig::Memory => AnySink::Memory(MemorySink::new()),
80+
81+
#[cfg(feature = "sink-sns")]
82+
SinkConfig::Sns(cfg) => {
83+
use crate::sink::sns::SnsSink;
84+
let s = SnsSink::new(cfg.clone()).await.map_err(|e| {
85+
etl::etl_error!(
86+
etl::error::ErrorKind::InvalidData,
87+
"Failed to create SNS sink",
88+
e.to_string()
89+
)
90+
})?;
91+
AnySink::Sns(s)
92+
}
8093
};
8194

8295
// Create PgStream as an ETL destination
@@ -122,6 +135,13 @@ fn log_sink_config(config: &SinkConfig) {
122135
SinkConfig::Memory => {
123136
debug!("using memory sink");
124137
}
138+
139+
#[cfg(feature = "sink-sns")]
140+
SinkConfig::Sns(cfg) => {
141+
use crate::sink::sns::SnsSinkConfigWithoutSecrets;
142+
let safe_cfg: SnsSinkConfigWithoutSecrets = cfg.into();
143+
debug!(config = ?safe_cfg, "using sns sink");
144+
}
125145
}
126146
}
127147

src/sink/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
mod base;
22
pub mod memory;
33

4+
#[cfg(feature = "sink-sns")]
5+
pub mod sns;
6+
47
pub use base::Sink;
58

69
use etl::error::EtlResult;
710
use memory::MemorySink;
811

12+
#[cfg(feature = "sink-sns")]
13+
use sns::SnsSink;
14+
915
use crate::types::TriggeredEvent;
1016

1117
/// Wrapper enum for all supported sink types.
@@ -16,6 +22,10 @@ use crate::types::TriggeredEvent;
1622
pub enum AnySink {
1723
/// In-memory sink for testing and development.
1824
Memory(MemorySink),
25+
26+
#[cfg(feature = "sink-sns")]
27+
/// AWS SNS sink for topic publishing.
28+
Sns(SnsSink),
1929
}
2030

2131
impl Sink for AnySink {
@@ -26,6 +36,9 @@ impl Sink for AnySink {
2636
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
2737
match self {
2838
AnySink::Memory(sink) => sink.publish_events(events).await,
39+
40+
#[cfg(feature = "sink-sns")]
41+
AnySink::Sns(sink) => sink.publish_events(events).await,
2942
}
3043
}
3144
}

src/sink/sns.rs

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
//! AWS SNS sink for publishing events to an Amazon SNS topic.
2+
//!
3+
//! Publishes each event as a JSON message to the configured topic ARN.
4+
//! The sink uses the AWS SDK for async message delivery.
5+
//!
6+
//! # Payload Extensions
7+
//!
8+
//! This sink does not require any specific payload extensions. However, you can
9+
//! use payload extensions to add SNS message attributes:
10+
//!
11+
//! ```sql
12+
//! payload_extensions = '[
13+
//! {"key": "message_group_id", "value": "orders"},
14+
//! {"key": "subject", "value": "New Order"}
15+
//! ]'
16+
//! ```
17+
18+
use aws_sdk_sns::Client;
19+
use etl::error::EtlResult;
20+
use serde::{Deserialize, Serialize};
21+
use std::sync::Arc;
22+
use tracing::info;
23+
24+
use crate::sink::Sink;
25+
use crate::types::TriggeredEvent;
26+
27+
/// Configuration for the AWS SNS sink.
28+
///
29+
/// This intentionally does not implement [`Serialize`] to avoid accidentally
30+
/// leaking secrets (AWS credentials, endpoint URLs) in serialized forms.
31+
#[derive(Clone, Debug, Deserialize)]
32+
pub struct SnsSinkConfig {
33+
/// SNS topic ARN to publish messages to.
34+
pub topic_arn: String,
35+
36+
/// AWS region (e.g., "us-east-1").
37+
pub region: String,
38+
39+
/// Optional custom endpoint URL for LocalStack testing.
40+
#[serde(default)]
41+
pub endpoint_url: Option<String>,
42+
43+
/// Optional AWS access key ID (uses default credentials chain if not set).
44+
#[serde(default)]
45+
pub access_key_id: Option<String>,
46+
47+
/// Optional AWS secret access key (uses default credentials chain if not set).
48+
#[serde(default)]
49+
pub secret_access_key: Option<String>,
50+
}
51+
52+
/// Configuration for the AWS SNS sink without sensitive data.
53+
///
54+
/// Safe to serialize and log. Use this for debugging and metrics.
55+
#[derive(Clone, Debug, Serialize, Deserialize)]
56+
pub struct SnsSinkConfigWithoutSecrets {
57+
/// SNS topic ARN to publish messages to.
58+
pub topic_arn: String,
59+
60+
/// AWS region.
61+
pub region: String,
62+
63+
/// Whether a custom endpoint is configured.
64+
pub has_custom_endpoint: bool,
65+
66+
/// Whether explicit credentials are configured.
67+
pub has_explicit_credentials: bool,
68+
}
69+
70+
impl From<SnsSinkConfig> for SnsSinkConfigWithoutSecrets {
71+
fn from(config: SnsSinkConfig) -> Self {
72+
Self {
73+
topic_arn: config.topic_arn,
74+
region: config.region,
75+
has_custom_endpoint: config.endpoint_url.is_some(),
76+
has_explicit_credentials: config.access_key_id.is_some(),
77+
}
78+
}
79+
}
80+
81+
impl From<&SnsSinkConfig> for SnsSinkConfigWithoutSecrets {
82+
fn from(config: &SnsSinkConfig) -> Self {
83+
Self {
84+
topic_arn: config.topic_arn.clone(),
85+
region: config.region.clone(),
86+
has_custom_endpoint: config.endpoint_url.is_some(),
87+
has_explicit_credentials: config.access_key_id.is_some(),
88+
}
89+
}
90+
}
91+
92+
/// Sink that publishes events to an AWS SNS topic.
93+
///
94+
/// Each event is serialized as JSON and published to the configured topic.
95+
/// The sink uses the AWS SDK with automatic retry handling.
96+
#[derive(Clone)]
97+
pub struct SnsSink {
98+
/// AWS SNS client.
99+
client: Arc<Client>,
100+
101+
/// Topic ARN to publish messages to.
102+
topic_arn: String,
103+
}
104+
105+
impl SnsSink {
106+
/// Creates a new SNS sink from configuration.
107+
///
108+
/// # Errors
109+
///
110+
/// Returns an error if the AWS client cannot be configured.
111+
pub async fn new(
112+
config: SnsSinkConfig,
113+
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
114+
let region = aws_sdk_sns::config::Region::new(config.region);
115+
116+
let mut sdk_config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
117+
.region(region);
118+
119+
// Configure custom endpoint if provided (for LocalStack).
120+
if let Some(endpoint) = &config.endpoint_url {
121+
sdk_config_loader = sdk_config_loader.endpoint_url(endpoint);
122+
}
123+
124+
// Configure explicit credentials if provided.
125+
if let (Some(access_key), Some(secret_key)) =
126+
(&config.access_key_id, &config.secret_access_key)
127+
{
128+
sdk_config_loader = sdk_config_loader.credentials_provider(
129+
aws_sdk_sns::config::Credentials::new(
130+
access_key.clone(),
131+
secret_key.clone(),
132+
None,
133+
None,
134+
"pgstream",
135+
),
136+
);
137+
}
138+
139+
let sdk_config = sdk_config_loader.load().await;
140+
let client = Client::new(&sdk_config);
141+
142+
Ok(Self {
143+
client: Arc::new(client),
144+
topic_arn: config.topic_arn,
145+
})
146+
}
147+
}
148+
149+
impl Sink for SnsSink {
150+
fn name() -> &'static str {
151+
"sns"
152+
}
153+
154+
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
155+
if events.is_empty() {
156+
return Ok(());
157+
}
158+
159+
info!(
160+
"publishing {} events to SNS topic '{}'",
161+
events.len(),
162+
self.topic_arn
163+
);
164+
165+
for event in &events {
166+
// Build JSON object manually since TriggeredEvent doesn't implement Serialize.
167+
let mut json_obj = serde_json::json!({
168+
"id": event.id.id,
169+
"created_at": event.id.created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
170+
"payload": event.payload,
171+
"stream_id": format!("{:?}", event.stream_id),
172+
});
173+
174+
// Add optional fields.
175+
if let Some(ref metadata) = event.metadata {
176+
json_obj["metadata"] = metadata.clone();
177+
}
178+
if let Some(lsn) = event.lsn {
179+
json_obj["lsn"] = serde_json::json!(lsn.to_string());
180+
}
181+
182+
let message = serde_json::to_string(&json_obj).map_err(|e| {
183+
etl::etl_error!(
184+
etl::error::ErrorKind::InvalidData,
185+
"Failed to serialize event to JSON",
186+
e.to_string()
187+
)
188+
})?;
189+
190+
// Publish message to SNS topic.
191+
self.client
192+
.publish()
193+
.topic_arn(&self.topic_arn)
194+
.message(&message)
195+
.send()
196+
.await
197+
.map_err(|e| {
198+
etl::etl_error!(
199+
etl::error::ErrorKind::InvalidData,
200+
"Failed to publish message to SNS",
201+
e.to_string()
202+
)
203+
})?;
204+
}
205+
206+
Ok(())
207+
}
208+
}
209+
210+
#[cfg(test)]
211+
mod tests {
212+
use super::*;
213+
214+
#[test]
215+
fn test_sink_name() {
216+
assert_eq!(SnsSink::name(), "sns");
217+
}
218+
219+
#[test]
220+
fn test_config_without_secrets() {
221+
let config = SnsSinkConfig {
222+
topic_arn: "arn:aws:sns:us-east-1:123456789:my-topic".to_string(),
223+
region: "us-east-1".to_string(),
224+
endpoint_url: Some("http://localhost:4566".to_string()),
225+
access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()),
226+
secret_access_key: Some("secret".to_string()),
227+
};
228+
229+
let without_secrets: SnsSinkConfigWithoutSecrets = (&config).into();
230+
231+
assert_eq!(
232+
without_secrets.topic_arn,
233+
"arn:aws:sns:us-east-1:123456789:my-topic"
234+
);
235+
assert_eq!(without_secrets.region, "us-east-1");
236+
assert!(without_secrets.has_custom_endpoint);
237+
assert!(without_secrets.has_explicit_credentials);
238+
}
239+
}

0 commit comments

Comments
 (0)