Skip to content

Commit 1848d3c

Browse files
psteinroeclaude
andcommitted
refactor: move events by value instead of borrowing
Use into_iter() and drain() to consume events by value rather than borrowing. Events are now moved into the grouped structure and drained into batch chunks, freeing memory as they're processed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent ca5bdff commit 1848d3c

1 file changed

Lines changed: 8 additions & 5 deletions

File tree

src/sink/sqs.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,10 @@ impl Sink for SqsSink {
172172
}
173173

174174
// Group events by queue URL for batch sending.
175-
let mut events_by_queue: HashMap<String, Vec<(usize, &TriggeredEvent)>> = HashMap::new();
175+
let mut events_by_queue: HashMap<String, Vec<(usize, TriggeredEvent)>> = HashMap::new();
176176

177-
for (idx, event) in events.iter().enumerate() {
178-
let queue_url = self.resolve_queue_url(event).ok_or_else(|| {
177+
for (idx, event) in events.into_iter().enumerate() {
178+
let queue_url = self.resolve_queue_url(&event).ok_or_else(|| {
179179
etl::etl_error!(
180180
etl::error::ErrorKind::ConfigError,
181181
"No queue URL configured",
@@ -192,9 +192,12 @@ impl Sink for SqsSink {
192192
// Prepare all batch requests.
193193
let mut batch_futures = Vec::new();
194194

195-
for (queue_url, queue_events) in events_by_queue {
195+
for (queue_url, mut queue_events) in events_by_queue {
196196
// Process in chunks of SQS_MAX_BATCH_SIZE (SQS limit is 10 per batch).
197-
for chunk in queue_events.chunks(SQS_MAX_BATCH_SIZE) {
197+
while !queue_events.is_empty() {
198+
let chunk_size = queue_events.len().min(SQS_MAX_BATCH_SIZE);
199+
let chunk: Vec<_> = queue_events.drain(..chunk_size).collect();
200+
198201
let mut entries = Vec::with_capacity(chunk.len());
199202

200203
for (idx, event) in chunk {

0 commit comments

Comments
 (0)