Skip to content

BaseProducer Poll and Flush no longer clearing queue after Move to Event-based API commit #676

Open
@brentmjohnson

Description

@brentmjohnson

Upgraded to 0.36.2, and identified that rdkafka::error::RDKafkaErrorCode::QueueFull is continually returned after each send, even when explicitly calling BaseProducer.poll().

Additionally, explicit calls to BaseProducer.flush() leaves un-published events in queue - these can be observed when re-using the BaseProducer after flushing - ending thread - starting new thread using the same thread-safe BaseProducer.

Related code:

loop {
    let producer_future = kafka_producer.lock().unwrap().send(
        BaseRecord::to(topic_name.as_ref().unwrap())
            .key(&())
            .payload(&interval_subscription),
    );
    match producer_future {
        Ok(_) => break,
        Err((KafkaError::MessageProduction(rdkafka::error::RDKafkaErrorCode::QueueFull), _)) => {
            let poll_amt = kafka_producer.lock().unwrap().poll(Timeout::Never);
            println!("poll: {:?}", poll_amt);
        },
        Err((e, _)) => {
            println!("Error {:?}", e);
            break;
        }
    }
}

Downgrading to 0.35.0 to avoid 19f32bf resolves this.

Have seen a few other issues that may be related, like #638

Let me know if more information would be helpful, but I believe this can be observed with some minor changes to the kafka-benchmark project: https://github.com/fede1024/kafka-benchmark/blob/c04d8cee98b0aa47e1580a27d4db5ba90e6318b6/src/producer/mod.rs#L67-L87

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions