Skip to content

Commit e77b3ac

Browse files
committed
Ensure we always poll even if we get a timeout from flush.
1 parent ab7b4af commit e77b3ac

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

src/producer/base_producer.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -503,13 +503,15 @@ where
503503
let ret = unsafe {
504504
rdsys::rd_kafka_flush(self.client().native_ptr(), deadline.remaining_millis_i32())
505505
};
506-
if ret.is_error() {
507-
return Err(KafkaError::Flush(ret.into()));
508-
} else if let Deadline::Never = &deadline {
506+
if let Deadline::Never = &deadline {
509507
self.poll(Timeout::After(Duration::ZERO));
510508
} else {
511509
self.poll(&deadline);
512510
}
511+
if ret.is_error() {
512+
return Err(KafkaError::Flush(ret.into()));
513+
};
514+
513515
}
514516
Ok(())
515517
}

0 commit comments

Comments
 (0)