Skip to content

Commit 2ac3fe8

Browse files
psteinroeclaude
andcommitted
refactor: build records upfront with fail-fast, use iterators
- Move events instead of borrowing - Build PutRecordsRequestEntry upfront with fail-fast on errors - Use flat_map for cleaner stream/chunk handling 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 23d656b commit 2ac3fe8

1 file changed

Lines changed: 56 additions & 52 deletions

File tree

src/sink/kinesis.rs

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -171,62 +171,67 @@ impl Sink for KinesisSink {
171171
return Ok(());
172172
}
173173

174-
// Group events by stream name for batch sending.
175-
let mut events_by_stream: HashMap<String, Vec<&TriggeredEvent>> = HashMap::new();
176-
177-
for event in &events {
178-
let stream_name = self.resolve_stream_name(event).ok_or_else(|| {
179-
etl::etl_error!(
180-
etl::error::ErrorKind::ConfigError,
181-
"No stream name configured",
182-
"Stream name must be provided in sink config or event metadata"
183-
)
184-
})?;
185-
186-
events_by_stream
187-
.entry(stream_name.to_string())
188-
.or_default()
189-
.push(event);
190-
}
191-
192-
// Prepare all batch requests.
193-
let mut batch_futures = Vec::new();
194-
195-
for (stream_name, stream_events) in events_by_stream {
196-
// Process in chunks of KINESIS_MAX_BATCH_SIZE (Kinesis limit is 500 per batch).
197-
for chunk in stream_events.chunks(KINESIS_MAX_BATCH_SIZE) {
198-
let mut records = Vec::with_capacity(chunk.len());
199-
let chunk_len = chunk.len();
200-
201-
for event in chunk {
202-
let data = serde_json::to_vec(&event.payload).map_err(|e| {
174+
// Build records upfront (fail fast on config/serialization errors).
175+
let records_with_stream: Vec<_> = events
176+
.into_iter()
177+
.map(|event| {
178+
let stream_name = self.resolve_stream_name(&event).ok_or_else(|| {
179+
etl::etl_error!(
180+
etl::error::ErrorKind::ConfigError,
181+
"No stream name configured",
182+
"Stream name must be provided in sink config or event metadata"
183+
)
184+
})?;
185+
186+
let data = serde_json::to_vec(&event.payload).map_err(|e| {
187+
etl::etl_error!(
188+
etl::error::ErrorKind::InvalidData,
189+
"Failed to serialize payload to JSON",
190+
e.to_string()
191+
)
192+
})?;
193+
194+
let record = PutRecordsRequestEntry::builder()
195+
.partition_key(event.id.id)
196+
.data(aws_sdk_kinesis::primitives::Blob::new(data))
197+
.build()
198+
.map_err(|e| {
203199
etl::etl_error!(
204200
etl::error::ErrorKind::InvalidData,
205-
"Failed to serialize payload to JSON",
201+
"Failed to build record entry",
206202
e.to_string()
207203
)
208204
})?;
209205

210-
// Use event ID as partition key.
211-
let record = PutRecordsRequestEntry::builder()
212-
.partition_key(&event.id.id)
213-
.data(aws_sdk_kinesis::primitives::Blob::new(data))
214-
.build()
215-
.map_err(|e| {
216-
etl::etl_error!(
217-
etl::error::ErrorKind::InvalidData,
218-
"Failed to build record entry",
219-
e.to_string()
220-
)
221-
})?;
206+
Ok((stream_name.to_string(), record))
207+
})
208+
.collect::<EtlResult<Vec<_>>>()?;
222209

223-
records.push(record);
224-
}
210+
// Group records by stream name.
211+
let mut records_by_stream: HashMap<String, Vec<PutRecordsRequestEntry>> = HashMap::new();
212+
for (stream_name, record) in records_with_stream {
213+
records_by_stream
214+
.entry(stream_name)
215+
.or_default()
216+
.push(record);
217+
}
225218

226-
// Create future for this batch.
227-
let client = self.client.clone();
228-
let stream_name = stream_name.clone();
229-
batch_futures.push(async move {
219+
// Create batch futures for all streams and chunks.
220+
let batch_futures: Vec<_> = records_by_stream
221+
.into_iter()
222+
.flat_map(|(stream_name, records)| {
223+
records
224+
.into_iter()
225+
.collect::<Vec<_>>()
226+
.chunks(KINESIS_MAX_BATCH_SIZE)
227+
.map(|chunk| chunk.to_vec())
228+
.map(|chunk| (stream_name.clone(), chunk))
229+
.collect::<Vec<_>>()
230+
})
231+
.map(|(stream_name, records)| {
232+
let client = &self.client;
233+
let chunk_len = records.len();
234+
async move {
230235
let result = client
231236
.put_records()
232237
.stream_name(&stream_name)
@@ -244,7 +249,6 @@ impl Sink for KinesisSink {
244249
// Check for partial failures.
245250
let failed_count = result.failed_record_count.unwrap_or(0);
246251
if failed_count > 0 {
247-
// Find first failed record for error details.
248252
let first_error = result
249253
.records
250254
.iter()
@@ -269,9 +273,9 @@ impl Sink for KinesisSink {
269273
}
270274

271275
Ok(())
272-
});
273-
}
274-
}
276+
}
277+
})
278+
.collect();
275279

276280
// Send all batches concurrently.
277281
try_join_all(batch_futures).await?;

0 commit comments

Comments
 (0)