Skip to content

Commit 8226749

Browse files
psteinroeclaude
andcommitted
fix: use iter() for Kafka since FutureRecord borrows data
FutureRecord borrows from the tuple values (topic, key, payload). Using into_iter() would move these into the closure, but the async block can't outlive these local variables. Using iter() keeps the prepared Vec alive while futures execute. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5c9bec4 commit 8226749

1 file changed

Lines changed: 4 additions & 2 deletions

File tree

src/sink/kafka.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,12 @@ impl Sink for KafkaSink {
209209
.collect::<EtlResult<Vec<_>>>()?;
210210

211211
// Send all messages concurrently.
212+
// Note: FutureRecord borrows from the tuple values, so we must use iter()
213+
// to keep prepared alive while the futures execute.
212214
let futures: Vec<_> = prepared
213-
.into_iter()
215+
.iter()
214216
.map(|(topic, key, payload)| {
215-
let record = FutureRecord::to(&topic)
217+
let record = FutureRecord::to(topic)
216218
.key(key.as_str())
217219
.payload(payload.as_slice());
218220
let producer = &self.producer;

0 commit comments

Comments
 (0)