Skip to content

Commit cf3a3c8

Browse files
authored
feat(sink): add AWS Kinesis data stream sink
## Summary - Add `KinesisSink` that publishes events to AWS Kinesis streams - Sends `event.payload` as JSON data blob - Supports dynamic stream name and partition key via event metadata ## Configuration ```yaml sink: type: kinesis region: us-east-1 stream_name: events # optional default ``` ## Routing Stream and partition key are resolved from: 1. `stream` and `partition_key` fields in event metadata 2. Fallback to config values
1 parent 63a0af0 commit cf3a3c8

7 files changed

Lines changed: 650 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 47 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ version = "0.1.0"
66
[features]
77
default = []
88
sink-kafka = ["dep:rdkafka"]
9+
sink-kinesis = ["dep:aws-sdk-kinesis", "dep:aws-config"]
910
sink-nats = ["dep:async-nats"]
1011
sink-rabbitmq = ["dep:lapin"]
1112
sink-redis-streams = ["dep:redis"]
@@ -57,6 +58,7 @@ uuid = { version = "1.19.0", default-features = false, features = ["v4"]
5758
# Optional sink dependencies.
5859
async-nats = { version = "0.38", optional = true }
5960
aws-config = { version = "1.5", optional = true }
61+
aws-sdk-kinesis = { version = "1.60", optional = true }
6062
aws-sdk-sns = { version = "1.60", optional = true }
6163
aws-sdk-sqs = { version = "1.60", optional = true }
6264
lapin = { version = "2.5", optional = true }

src/config/sink.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use crate::sink::sqs::SqsSinkConfig;
2424
#[cfg(feature = "sink-sns")]
2525
use crate::sink::sns::SnsSinkConfig;
2626

27+
#[cfg(feature = "sink-kinesis")]
28+
use crate::sink::kinesis::KinesisSinkConfig;
29+
2730
/// Sink destination configuration.
2831
///
2932
/// Determines where replicated events are sent.
@@ -72,4 +75,9 @@ pub enum SinkConfig {
7275
#[cfg(feature = "sink-sns")]
7376
#[serde(rename = "sns")]
7477
Sns(SnsSinkConfig),
78+
79+
/// AWS Kinesis sink for data stream publishing.
80+
#[cfg(feature = "sink-kinesis")]
81+
#[serde(rename = "kinesis")]
82+
Kinesis(KinesisSinkConfig),
7583
}

src/core.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
181181
})?;
182182
AnySink::Sns(s)
183183
}
184+
185+
#[cfg(feature = "sink-kinesis")]
186+
SinkConfig::Kinesis(cfg) => {
187+
use crate::sink::kinesis::KinesisSink;
188+
let s = KinesisSink::new(cfg.clone()).await.map_err(|e| {
189+
etl::etl_error!(
190+
etl::error::ErrorKind::InvalidData,
191+
"Failed to create Kinesis sink",
192+
e.to_string()
193+
)
194+
})?;
195+
AnySink::Kinesis(s)
196+
}
184197
};
185198

186199
// Create PgStream as an ETL destination
@@ -266,6 +279,11 @@ fn log_sink_config(config: &SinkConfig) {
266279
SinkConfig::Sns(_cfg) => {
267280
debug!("using sns sink");
268281
}
282+
283+
#[cfg(feature = "sink-kinesis")]
284+
SinkConfig::Kinesis(_cfg) => {
285+
debug!("using kinesis sink");
286+
}
269287
}
270288
}
271289

0 commit comments

Comments
 (0)