Skip to content

Commit c8e48e3

Browse files
committed
Various BaseProducer::flush fixes (#748)
* Refactor `BaseProducer::poll` to not return early, but instead continue processing events until the passed timeout. Refactor `BaseProducer::flush` to spend most time in `librdkafka`, and whatever is left in `BaseProducer::poll`. * Simplify and rely on cast truncation. * Fix logic error so that we always poll at least once even when timeout is `Duration::ZERO`. * Introduce Deadline type to simplify `BaseProducer::flush` and `BaseProducer::poll`. * Add `From<Timeout>` impl for `Deadline` * Ensure we always call `poll_event` at least once, even if we have `Timeout::After<Duration::ZERO>` for a non-blocking call. * Allow Deadline to express `Timeout::Never` losslessly. * Refactor poll loop to be more idiomatic. * Centralize clamping logic to Deadline. * Remove extraneous From impl. * Simplify `BaseProducer::poll` to rely on `From` impl. * Don't block forever in poll when flushing. * Remove this clamp, in favor of relying on remaining_millis_i32. * Ensure we always poll even if we get a timeout from flush. * Update changelog reflecting behavior change in `BaseProducer::poll`.
1 parent bd70a6e commit c8e48e3

File tree

4 files changed

+91
-34
lines changed

4 files changed

+91
-34
lines changed

changelog.md

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
99
* Fix test dependency on docker compose.
1010
* Backwards incompatible: `ClientContext::log` now accepts a single `LogRecord`
1111
* Add `LogRecord::context` to contain the logging context for the given message
12+
* Address wakeup races introduced by pivoting to the event API.
13+
* Update `BaseProducer::poll` to not return early, and instead continue
14+
looping until the passed timeout is reached.
1215

1316
## 0.36.2 (2024-01-16)
1417

src/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,10 @@ impl<C: ClientContext> Client<C> {
297297
&self.context
298298
}
299299

300-
pub(crate) fn poll_event(
300+
pub(crate) fn poll_event<T: Into<Timeout>>(
301301
&self,
302302
queue: &NativeQueue,
303-
timeout: Timeout,
303+
timeout: T,
304304
) -> EventPollResult<NativeEvent> {
305305
let event = unsafe { NativeEvent::from_ptr(queue.poll(timeout)) };
306306
if let Some(ev) = event {

src/producer/base_producer.rs

+30-31
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ use crate::producer::{
6767
DefaultProducerContext, Partitioner, Producer, ProducerContext, PurgeConfig,
6868
};
6969
use crate::topic_partition_list::TopicPartitionList;
70-
use crate::util::{IntoOpaque, NativePtr, Timeout};
70+
use crate::util::{Deadline, IntoOpaque, NativePtr, Timeout};
7171

7272
pub use crate::message::DeliveryResult;
7373

@@ -338,7 +338,6 @@ where
338338
client: Client<C>,
339339
queue: NativeQueue,
340340
_partitioner: PhantomData<Part>,
341-
min_poll_interval: Timeout,
342341
}
343342

344343
impl<C, Part> BaseProducer<C, Part>
@@ -353,7 +352,6 @@ where
353352
client,
354353
queue,
355354
_partitioner: PhantomData,
356-
min_poll_interval: Timeout::After(Duration::from_millis(100)),
357355
}
358356
}
359357

@@ -362,19 +360,25 @@ where
362360
/// Regular calls to `poll` are required to process the events and execute
363361
/// the message delivery callbacks.
364362
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
365-
let event = self.client().poll_event(&self.queue, timeout.into());
366-
if let EventPollResult::Event(ev) = event {
367-
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
368-
match evtype {
369-
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
370-
_ => {
371-
let evname = unsafe {
372-
let evname = rdsys::rd_kafka_event_name(ev.ptr());
373-
CStr::from_ptr(evname).to_string_lossy()
374-
};
375-
warn!("Ignored event '{}' on base producer poll", evname);
363+
let deadline: Deadline = timeout.into().into();
364+
loop {
365+
let event = self.client().poll_event(&self.queue, &deadline);
366+
if let EventPollResult::Event(ev) = event {
367+
let evtype = unsafe { rdsys::rd_kafka_event_type(ev.ptr()) };
368+
match evtype {
369+
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
370+
_ => {
371+
let evname = unsafe {
372+
let evname = rdsys::rd_kafka_event_name(ev.ptr());
373+
CStr::from_ptr(evname).to_string_lossy()
374+
};
375+
warn!("Ignored event '{}' on base producer poll", evname);
376+
}
376377
}
377378
}
379+
if deadline.elapsed() {
380+
break;
381+
}
378382
}
379383
}
380384

@@ -494,26 +498,21 @@ where
494498
// As this library uses the rdkafka Event API, flush will not call rd_kafka_poll() but instead wait for
495499
// the librdkafka-handled message count to reach zero. Runs until value reaches zero or timeout.
496500
fn flush<T: Into<Timeout>>(&self, timeout: T) -> KafkaResult<()> {
497-
let mut timeout = timeout.into();
498-
loop {
499-
let op_timeout = std::cmp::min(timeout, self.min_poll_interval);
500-
if self.in_flight_count() > 0 {
501-
unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) };
502-
self.poll(op_timeout);
501+
let deadline: Deadline = timeout.into().into();
502+
while self.in_flight_count() > 0 && !deadline.elapsed() {
503+
let ret = unsafe {
504+
rdsys::rd_kafka_flush(self.client().native_ptr(), deadline.remaining_millis_i32())
505+
};
506+
if let Deadline::Never = &deadline {
507+
self.poll(Timeout::After(Duration::ZERO));
503508
} else {
504-
return Ok(());
509+
self.poll(&deadline);
505510
}
506-
507-
if op_timeout >= timeout {
508-
let ret = unsafe { rdsys::rd_kafka_flush(self.native_ptr(), 0) };
509-
if ret.is_error() {
510-
return Err(KafkaError::Flush(ret.into()));
511-
} else {
512-
return Ok(());
513-
}
514-
}
515-
timeout -= op_timeout;
511+
if ret.is_error() {
512+
return Err(KafkaError::Flush(ret.into()));
513+
};
516514
}
515+
Ok(())
517516
}
518517

519518
fn purge(&self, flags: PurgeConfig) {

src/util.rs

+56-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Utility functions and types.
22
3+
use std::cmp;
34
use std::ffi::CStr;
45
use std::fmt;
56
use std::future::Future;
@@ -12,7 +13,7 @@ use std::slice;
1213
use std::sync::Arc;
1314
#[cfg(feature = "naive-runtime")]
1415
use std::thread;
15-
use std::time::{Duration, SystemTime, UNIX_EPOCH};
16+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
1617

1718
#[cfg(feature = "naive-runtime")]
1819
use futures_channel::oneshot;
@@ -31,6 +32,40 @@ pub fn get_rdkafka_version() -> (i32, String) {
3132
(version_number, c_str.to_string_lossy().into_owned())
3233
}
3334

35+
pub(crate) enum Deadline {
36+
At(Instant),
37+
Never,
38+
}
39+
40+
impl Deadline {
41+
// librdkafka's flush api requires an i32 millisecond timeout
42+
const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);
43+
44+
pub(crate) fn new(duration: Option<Duration>) -> Self {
45+
if let Some(d) = duration {
46+
Self::At(Instant::now() + d)
47+
} else {
48+
Self::Never
49+
}
50+
}
51+
52+
pub(crate) fn remaining(&self) -> Duration {
53+
if let Deadline::At(i) = self {
54+
*i - Instant::now()
55+
} else {
56+
Duration::MAX
57+
}
58+
}
59+
60+
pub(crate) fn remaining_millis_i32(&self) -> i32 {
61+
cmp::min(Deadline::MAX_FLUSH_DURATION, self.remaining()).as_millis() as i32
62+
}
63+
64+
pub(crate) fn elapsed(&self) -> bool {
65+
self.remaining() <= Duration::ZERO
66+
}
67+
}
68+
3469
/// Specifies a timeout for a Kafka operation.
3570
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
3671
pub enum Timeout {
@@ -76,6 +111,26 @@ impl std::ops::SubAssign for Timeout {
76111
}
77112
}
78113

114+
impl From<Timeout> for Deadline {
115+
fn from(t: Timeout) -> Deadline {
116+
if let Timeout::After(dur) = t {
117+
Deadline::new(Some(dur))
118+
} else {
119+
Deadline::new(None)
120+
}
121+
}
122+
}
123+
124+
impl From<&Deadline> for Timeout {
125+
fn from(d: &Deadline) -> Timeout {
126+
if let Deadline::Never = d {
127+
Timeout::Never
128+
} else {
129+
Timeout::After(d.remaining())
130+
}
131+
}
132+
}
133+
79134
impl From<Duration> for Timeout {
80135
fn from(d: Duration) -> Timeout {
81136
Timeout::After(d)

0 commit comments

Comments
 (0)