Skip to content

Create a new Delivery struct that includes Timestamp #555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 3, 2025
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
* Address wakeup races introduced by pivoting to the event API.
* Update `BaseProducer::poll` to not return early, and instead continue
looping until the passed timeout is reached.
* **Breaking change.** Change signature for `OwnedDeliveryResult`. The
`Ok` variant is now a `Delivery` struct, rather than a tuple. This allows
or including `Timestamp` as a result field. It means that adding values
in the future will not require a breaking change.

## 0.36.2 (2024-01-16)

Expand Down
2 changes: 1 addition & 1 deletion src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};

/// Timestamp of a Kafka message.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Copy)]
pub enum Timestamp {
/// Timestamp not available.
NotAvailable,
Expand Down
29 changes: 21 additions & 8 deletions src/producer/future_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,23 @@ pub struct FutureProducerContext<C: ClientContext + 'static> {
wrapped_context: C,
}

/// Represents the result of message production as performed from the
/// `FutureProducer`.
/// Contains information about a successfully delivered message
#[derive(Debug, PartialEq, Eq)]
pub struct Delivery {
/// The partition the message was delivered to
pub partition: i32,
/// The offset within the partition
pub offset: i64,
/// The timestamp associated with the message
pub timestamp: Timestamp,
}

/// Represents the result of message production as performed from the FutureProducer.
///
/// If message delivery was successful, `OwnedDeliveryResult` will return the
/// partition and offset of the message. If the message failed to be delivered
/// an error will be returned, together with an owned copy of the original
/// message.
pub type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
/// If message delivery was successful, returns `DeliveredMessage` containing the partition,
/// offset and timestamp. If the message failed to be delivered, returns the error and
/// an owned copy of the original message.
pub type OwnedDeliveryResult = Result<Delivery, (KafkaError, OwnedMessage)>;

// Delegates all the methods calls to the wrapped context.
impl<C: ClientContext + 'static> ClientContext for FutureProducerContext<C> {
Expand Down Expand Up @@ -183,7 +192,11 @@ where
tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
) {
let owned_delivery_result = match *delivery_result {
Ok(ref message) => Ok((message.partition(), message.offset())),
Ok(ref message) => Ok(Delivery {
partition: message.partition(),
offset: message.offset(),
timestamp: message.timestamp(),
}),
Err((ref error, ref message)) => Err((error.clone(), message.detach())),
};
let _ = tx.send(owned_delivery_result); // TODO: handle error
Expand Down
8 changes: 5 additions & 3 deletions tests/test_high_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::message::{Header, Headers, Message, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::Timestamp;

use crate::utils::*;

Expand Down Expand Up @@ -44,9 +45,10 @@ async fn test_future_producer_send() {
let results: Vec<_> = results.collect().await;
assert!(results.len() == 10);
for (i, result) in results.into_iter().enumerate() {
let (partition, offset) = result.unwrap();
assert_eq!(partition, 1);
assert_eq!(offset, i as i64);
let delivered = result.unwrap();
assert_eq!(delivered.partition, 1);
assert_eq!(delivered.offset, i as i64);
assert!(delivered.timestamp < Timestamp::now());
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
let mut message_map = HashMap::new();
for (id, future) in futures {
match future.await {
Ok((partition, offset)) => message_map.insert((partition, offset), id),
Ok(delivered) => message_map.insert((delivered.partition, delivered.offset), id),
Err((kafka_error, _message)) => panic!("Delivery failed: {}", kafka_error),
};
}
Expand Down
Loading