Skip to content

Commit 248dc6e

Browse files
committed
feat(sink): add AWS SNS sink for topic publishing
1 parent 3505dc7 commit 248dc6e

8 files changed

Lines changed: 691 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ sink-nats = ["dep:async-nats"]
1010
sink-rabbitmq = ["dep:lapin"]
1111
sink-redis-streams = ["dep:redis"]
1212
sink-redis-strings = ["dep:redis"]
13+
sink-sns = ["dep:aws-sdk-sns", "dep:aws-config"]
1314
sink-sqs = ["dep:aws-sdk-sqs", "dep:aws-config"]
1415
sink-webhook = ["dep:reqwest"]
1516
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]
@@ -56,6 +57,7 @@ uuid = { version = "1.19.0", default-features = false, features = ["v4"]
5657
# Optional sink dependencies.
5758
async-nats = { version = "0.38", optional = true }
5859
aws-config = { version = "1.5", optional = true }
60+
aws-sdk-sns = { version = "1.60", optional = true }
5961
aws-sdk-sqs = { version = "1.60", optional = true }
6062
lapin = { version = "2.5", optional = true }
6163
rdkafka = { version = "0.36", optional = true, default-features = false, features = ["tokio", "cmake-build"] }
@@ -83,6 +85,7 @@ testcontainers-modules = { version = "0.11", optional = true, features = [
8385
"rabbitmq",
8486
"kafka",
8587
"elasticmq",
88+
"localstack",
8689
"blocking",
8790
] }
8891

src/config/sink.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ use crate::sink::webhook::WebhookSinkConfig;
2121
#[cfg(feature = "sink-sqs")]
2222
use crate::sink::sqs::SqsSinkConfig;
2323

24+
#[cfg(feature = "sink-sns")]
25+
use crate::sink::sns::SnsSinkConfig;
26+
2427
/// Sink destination configuration.
2528
///
2629
/// Determines where replicated events are sent.
@@ -64,4 +67,9 @@ pub enum SinkConfig {
6467
#[cfg(feature = "sink-sqs")]
6568
#[serde(rename = "sqs")]
6669
Sqs(SqsSinkConfig),
70+
71+
/// AWS SNS sink for topic publishing.
72+
#[cfg(feature = "sink-sns")]
73+
#[serde(rename = "sns")]
74+
Sns(SnsSinkConfig),
6775
}

src/core.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
168168
})?;
169169
AnySink::Sqs(s)
170170
}
171+
172+
#[cfg(feature = "sink-sns")]
173+
SinkConfig::Sns(cfg) => {
174+
use crate::sink::sns::SnsSink;
175+
let s = SnsSink::new(cfg.clone()).await.map_err(|e| {
176+
etl::etl_error!(
177+
etl::error::ErrorKind::InvalidData,
178+
"Failed to create SNS sink",
179+
e.to_string()
180+
)
181+
})?;
182+
AnySink::Sns(s)
183+
}
171184
};
172185

173186
// Create PgStream as an ETL destination
@@ -248,6 +261,11 @@ fn log_sink_config(config: &SinkConfig) {
248261
SinkConfig::Sqs(_cfg) => {
249262
debug!("using sqs sink");
250263
}
264+
265+
#[cfg(feature = "sink-sns")]
266+
SinkConfig::Sns(_cfg) => {
267+
debug!("using sns sink");
268+
}
251269
}
252270
}
253271

src/sink/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub mod kafka;
2222
#[cfg(feature = "sink-sqs")]
2323
pub mod sqs;
2424

25+
#[cfg(feature = "sink-sns")]
26+
pub mod sns;
27+
2528
pub use base::Sink;
2629

2730
use etl::error::EtlResult;
@@ -48,6 +51,9 @@ use kafka::KafkaSink;
4851
#[cfg(feature = "sink-sqs")]
4952
use sqs::SqsSink;
5053

54+
#[cfg(feature = "sink-sns")]
55+
use sns::SnsSink;
56+
5157
use crate::types::TriggeredEvent;
5258

5359
/// Wrapper enum for all supported sink types.
@@ -86,6 +92,10 @@ pub enum AnySink {
8692
/// AWS SQS sink for queue messaging.
8793
#[cfg(feature = "sink-sqs")]
8894
Sqs(SqsSink),
95+
96+
/// AWS SNS sink for topic publishing.
97+
#[cfg(feature = "sink-sns")]
98+
Sns(SnsSink),
8999
}
90100

91101
impl Sink for AnySink {
@@ -117,6 +127,9 @@ impl Sink for AnySink {
117127

118128
#[cfg(feature = "sink-sqs")]
119129
AnySink::Sqs(sink) => sink.publish_events(events).await,
130+
131+
#[cfg(feature = "sink-sns")]
132+
AnySink::Sns(sink) => sink.publish_events(events).await,
120133
}
121134
}
122135
}

0 commit comments

Comments
 (0)