Skip to content

Commit 3146da8

Browse files
psteinroeclaude
andcommitted
refactor: update Kinesis sink for dynamic routing and payload-only output
- Make stream_name config optional (can come from event metadata) - Add resolve_stream_name() for dynamic routing from metadata key "stream" - Send only event.payload instead of full event envelope - Use DestinationError for publish failures, ConfigError for missing config - Update tests to verify payload-only content and metadata-based routing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5e01f86 commit 3146da8

2 files changed

Lines changed: 62 additions & 59 deletions

File tree

src/sink/kinesis.rs

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,23 @@
11
//! AWS Kinesis sink for publishing events to a Kinesis data stream.
22
//!
3-
//! Publishes each event as a data record to the configured stream.
4-
//! The sink uses the AWS SDK for async message delivery.
3+
//! Publishes each event's payload as a data record to a stream determined by:
4+
//! 1. `stream` key in event metadata (from subscription's metadata/metadata_extensions)
5+
//! 2. Fallback to `stream_name` in sink config
56
//!
6-
//! # Payload Extensions
7+
//! # Dynamic Routing
78
//!
8-
//! This sink supports the following optional payload extensions:
9+
//! The target stream can be configured per-event using metadata_extensions:
910
//!
1011
//! ```sql
11-
//! payload_extensions = '[
12-
//! {"key": "partition_key", "value": "user_123"},
13-
//! {"key": "explicit_hash_key", "value": "abc123"}
12+
//! metadata_extensions = '[
13+
//! {"json_path": "stream", "expression": "''my-stream-'' || shard_key"}
1414
//! ]'
1515
//! ```
16-
//!
17-
//! - `partition_key`: Used for shard distribution. If not provided, the event ID is used.
18-
//! - `explicit_hash_key`: Optional hash key to override partition key hashing.
1916
2017
use aws_sdk_kinesis::Client;
2118
use etl::error::EtlResult;
2219
use serde::{Deserialize, Serialize};
2320
use std::sync::Arc;
24-
use tracing::info;
2521

2622
use crate::sink::Sink;
2723
use crate::types::TriggeredEvent;
@@ -32,8 +28,8 @@ use crate::types::TriggeredEvent;
3228
/// leaking secrets (AWS credentials, endpoint URLs) in serialized forms.
3329
#[derive(Clone, Debug, Deserialize)]
3430
pub struct KinesisSinkConfig {
35-
/// Kinesis stream name to publish records to.
36-
pub stream_name: String,
31+
/// Kinesis stream name. Optional if provided via event metadata.
32+
pub stream_name: Option<String>,
3733

3834
/// AWS region (e.g., "us-east-1").
3935
pub region: String,
@@ -56,8 +52,8 @@ pub struct KinesisSinkConfig {
5652
/// Safe to serialize and log. Use this for debugging and metrics.
5753
#[derive(Clone, Debug, Serialize, Deserialize)]
5854
pub struct KinesisSinkConfigWithoutSecrets {
59-
/// Kinesis stream name.
60-
pub stream_name: String,
55+
/// Kinesis stream name (if configured).
56+
pub stream_name: Option<String>,
6157

6258
/// AWS region.
6359
pub region: String,
@@ -93,15 +89,15 @@ impl From<&KinesisSinkConfig> for KinesisSinkConfigWithoutSecrets {
9389

9490
/// Sink that publishes events to an AWS Kinesis data stream.
9591
///
96-
/// Each event is serialized as JSON and published as a data record.
92+
/// Each event's payload is serialized as JSON and published as a data record.
9793
/// The sink uses the AWS SDK with automatic retry handling.
9894
#[derive(Clone)]
9995
pub struct KinesisSink {
10096
/// AWS Kinesis client.
10197
client: Arc<Client>,
10298

103-
/// Stream name to publish records to.
104-
stream_name: String,
99+
/// Default stream name. Can be overridden per-event via metadata.
100+
stream_name: Option<String>,
105101
}
106102

107103
impl KinesisSink {
@@ -146,6 +142,18 @@ impl KinesisSink {
146142
stream_name: config.stream_name,
147143
})
148144
}
145+
146+
/// Resolves the stream name for an event from metadata or config.
147+
fn resolve_stream_name<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
148+
// First check event metadata for dynamic stream
149+
if let Some(ref metadata) = event.metadata {
150+
if let Some(stream) = metadata.get("stream").and_then(|v| v.as_str()) {
151+
return Some(stream);
152+
}
153+
}
154+
// Fall back to config stream name
155+
self.stream_name.as_deref()
156+
}
149157
}
150158

151159
impl Sink for KinesisSink {
@@ -158,33 +166,21 @@ impl Sink for KinesisSink {
158166
return Ok(());
159167
}
160168

161-
info!(
162-
"publishing {} events to Kinesis stream '{}'",
163-
events.len(),
164-
self.stream_name
165-
);
166-
167169
for event in &events {
168-
// Build JSON object manually since TriggeredEvent doesn't implement Serialize.
169-
let mut json_obj = serde_json::json!({
170-
"id": event.id.id,
171-
"created_at": event.id.created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
172-
"payload": event.payload,
173-
"stream_id": format!("{:?}", event.stream_id),
174-
});
175-
176-
// Add optional fields.
177-
if let Some(ref metadata) = event.metadata {
178-
json_obj["metadata"] = metadata.clone();
179-
}
180-
if let Some(lsn) = event.lsn {
181-
json_obj["lsn"] = serde_json::json!(lsn.to_string());
182-
}
170+
// Resolve stream name from event metadata or config.
171+
let stream_name = self.resolve_stream_name(event).ok_or_else(|| {
172+
etl::etl_error!(
173+
etl::error::ErrorKind::ConfigError,
174+
"No stream name configured",
175+
"Stream name must be provided in sink config or event metadata"
176+
)
177+
})?;
183178

184-
let data = serde_json::to_vec(&json_obj).map_err(|e| {
179+
// Serialize payload to JSON.
180+
let data = serde_json::to_vec(&event.payload).map_err(|e| {
185181
etl::etl_error!(
186182
etl::error::ErrorKind::InvalidData,
187-
"Failed to serialize event to JSON",
183+
"Failed to serialize payload to JSON",
188184
e.to_string()
189185
)
190186
})?;
@@ -195,14 +191,14 @@ impl Sink for KinesisSink {
195191
// Publish record to Kinesis stream.
196192
self.client
197193
.put_record()
198-
.stream_name(&self.stream_name)
194+
.stream_name(stream_name)
199195
.partition_key(&partition_key)
200196
.data(aws_sdk_kinesis::primitives::Blob::new(data))
201197
.send()
202198
.await
203199
.map_err(|e| {
204200
etl::etl_error!(
205-
etl::error::ErrorKind::InvalidData,
201+
etl::error::ErrorKind::DestinationError,
206202
"Failed to publish record to Kinesis",
207203
e.to_string()
208204
)
@@ -225,7 +221,7 @@ mod tests {
225221
#[test]
226222
fn test_config_without_secrets() {
227223
let config = KinesisSinkConfig {
228-
stream_name: "my-stream".to_string(),
224+
stream_name: Some("my-stream".to_string()),
229225
region: "us-east-1".to_string(),
230226
endpoint_url: Some("http://localhost:4566".to_string()),
231227
access_key_id: Some("AKIAIOSFODNN7EXAMPLE".to_string()),
@@ -234,7 +230,7 @@ mod tests {
234230

235231
let without_secrets: KinesisSinkConfigWithoutSecrets = (&config).into();
236232

237-
assert_eq!(without_secrets.stream_name, "my-stream");
233+
assert_eq!(without_secrets.stream_name, Some("my-stream".to_string()));
238234
assert_eq!(without_secrets.region, "us-east-1");
239235
assert!(without_secrets.has_custom_endpoint);
240236
assert!(without_secrets.has_explicit_credentials);

tests/kinesis_sink_tests.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ async fn test_kinesis_sink_publishes_events() {
157157

158158
// Create sink.
159159
let config = KinesisSinkConfig {
160-
stream_name: stream_name.clone(),
160+
stream_name: Some(stream_name.clone()),
161161
region: "us-east-1".to_string(),
162162
endpoint_url: Some(endpoint.clone()),
163163
access_key_id: Some("test".to_string()),
@@ -170,15 +170,20 @@ async fn test_kinesis_sink_publishes_events() {
170170
let events = vec![make_test_event("event1"), make_test_event("event2")];
171171
sink.publish_events(events).await.expect("Failed to publish");
172172

173-
// Verify records arrived.
173+
// Verify records arrived - only payload is sent.
174174
let records = get_kinesis_records(&endpoint, &stream_name, 2).await;
175175
assert_eq!(records.len(), 2, "Expected 2 records");
176176

177177
for record in &records {
178-
let event: serde_json::Value =
178+
let payload: serde_json::Value =
179179
serde_json::from_str(record).expect("Failed to parse record");
180-
assert!(event["id"].is_string());
181-
assert!(event["payload"]["key"].is_string());
180+
// Only payload fields should be present.
181+
assert!(payload["key"].is_string());
182+
assert!(payload["value"].is_string());
183+
// No envelope fields.
184+
assert!(payload.get("id").is_none());
185+
assert!(payload.get("metadata").is_none());
186+
assert!(payload.get("lsn").is_none());
182187
}
183188
}
184189

@@ -191,7 +196,7 @@ async fn test_kinesis_sink_handles_empty_batch() {
191196
create_kinesis_stream(&endpoint, &stream_name).await;
192197

193198
let config = KinesisSinkConfig {
194-
stream_name,
199+
stream_name: Some(stream_name),
195200
region: "us-east-1".to_string(),
196201
endpoint_url: Some(endpoint),
197202
access_key_id: Some("test".to_string()),
@@ -207,15 +212,16 @@ async fn test_kinesis_sink_handles_empty_batch() {
207212
}
208213

209214
#[tokio::test(flavor = "multi_thread")]
210-
async fn test_kinesis_sink_includes_metadata() {
215+
async fn test_kinesis_sink_uses_stream_from_metadata() {
211216
let port = ensure_localstack().await;
212217
let endpoint = format!("http://127.0.0.1:{}", port);
213218

214219
let stream_name = format!("test-stream-{}", Uuid::new_v4());
215220
create_kinesis_stream(&endpoint, &stream_name).await;
216221

222+
// Create sink without stream_name - will get it from metadata.
217223
let config = KinesisSinkConfig {
218-
stream_name: stream_name.clone(),
224+
stream_name: None,
219225
region: "us-east-1".to_string(),
220226
endpoint_url: Some(endpoint.clone()),
221227
access_key_id: Some("test".to_string()),
@@ -224,12 +230,12 @@ async fn test_kinesis_sink_includes_metadata() {
224230

225231
let sink = KinesisSink::new(config).await.expect("Failed to create sink");
226232

227-
// Create event with metadata.
233+
// Create event with stream in metadata.
228234
let event = TriggeredEvent {
229235
id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()),
230236
stream_id: StreamId::default(),
231237
payload: serde_json::json!({ "action": "created" }),
232-
metadata: Some(serde_json::json!({ "user_id": 123, "source": "api" })),
238+
metadata: Some(serde_json::json!({ "stream": stream_name })),
233239
lsn: Some(PgLsn::from(99999u64)),
234240
};
235241

@@ -240,12 +246,13 @@ async fn test_kinesis_sink_includes_metadata() {
240246
let records = get_kinesis_records(&endpoint, &stream_name, 1).await;
241247
assert_eq!(records.len(), 1);
242248

243-
let published: serde_json::Value =
249+
let payload: serde_json::Value =
244250
serde_json::from_str(&records[0]).expect("Failed to parse record");
245251

246-
assert_eq!(published["metadata"]["user_id"], 123);
247-
assert_eq!(published["metadata"]["source"], "api");
248-
assert!(published["lsn"].is_string());
252+
// Only payload is sent.
253+
assert_eq!(payload["action"], "created");
254+
assert!(payload.get("id").is_none());
255+
assert!(payload.get("metadata").is_none());
249256
}
250257

251258
#[test]

0 commit comments

Comments
 (0)