Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.

feat: Errors consumer polls producer #52

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b122950
feat: Errors consumer polls producer
lynnagara Jun 23, 2022
808840c
ignore clippy
lynnagara Jun 23, 2022
09585e6
Merge branch 'main' into poll-producer
lynnagara Jun 23, 2022
0dceabe
Merge branch 'main' into poll-producer
lynnagara Jun 23, 2022
8b46b4e
fix clippy
lynnagara Jun 23, 2022
fb58453
only poll every 100ms
lynnagara Jun 23, 2022
98452e7
switch to threaded producer
lynnagara Jun 23, 2022
5017ae8
switch to threaded producer
lynnagara Jun 23, 2022
377e4a5
print stuff
lynnagara Jun 23, 2022
c212505
commit offsets
lynnagara Jun 23, 2022
6ecb657
get timestamp from message
lynnagara Jun 23, 2022
678ebfa
linting
lynnagara Jun 23, 2022
8c2b18e
Merge branch 'main' into poll-producer
lynnagara Jun 23, 2022
09241f3
wait for producer callback to commit offsets
lynnagara Jun 24, 2022
8def002
Merge remote-tracking branch 'origin/main' into poll-producer
lynnagara Jun 24, 2022
b492729
turn off logging
lynnagara Jun 24, 2022
f605b91
commit the right offset
lynnagara Jun 24, 2022
47f6a02
update last commit time
lynnagara Jun 24, 2022
3ef068d
remove condition to debug
lynnagara Jun 24, 2022
782fc69
Revert "remove condition to debug"
lynnagara Jun 24, 2022
8ff5c82
Merge branch 'main' into poll-producer
lynnagara Jun 24, 2022
fad5c1a
fix merge issues
lynnagara Jun 24, 2022
0a5e5e6
rewrite committing
lynnagara Jun 24, 2022
570b3c6
Merge remote-tracking branch 'origin/main' into poll-producer
lynnagara Jun 27, 2022
fee1448
fix test
lynnagara Jun 27, 2022
9a5c5dc
unwrap_or_else
lynnagara Jun 27, 2022
48b659e
don't attempt to commit if there is nothing to commit
lynnagara Jun 28, 2022
6dd7e7f
fix bug in committing
lynnagara Jun 28, 2022
7eac755
Merge remote-tracking branch 'origin/main' into poll-producer
lynnagara Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/base_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::KafkaConsumer;
use rust_arroyo::processing::strategies::{
CommitRequest, MessageRejected, ProcessingStrategy, ProcessingStrategyFactory,
CommitRequest, ProcessingError, ProcessingStrategy, ProcessingStrategyFactory,
};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::{Message, Partition, Position, Topic};
Expand All @@ -30,7 +30,7 @@ impl ProcessingStrategy<KafkaPayload> for TestStrategy {
}
}

fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), MessageRejected> {
fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), ProcessingError> {
println!("SUBMIT {}", message);
self.partitions.insert(
message.partition,
Expand Down
11 changes: 10 additions & 1 deletion src/backends/kafka/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rdkafka::error::{KafkaError, RDKafkaErrorCode};

use crate::backends::ConsumerError;
use crate::backends::{ConsumerError, ProducerError};

impl From<KafkaError> for ConsumerError {
fn from(err: KafkaError) -> Self {
Expand All @@ -14,3 +14,12 @@ impl From<KafkaError> for ConsumerError {
}
}
}

impl From<KafkaError> for ProducerError {
fn from(err: KafkaError) -> Self {
match err {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => ProducerError::QueueFull,
other => ProducerError::Other(Box::new(other)),
}
}
}
102 changes: 87 additions & 15 deletions src/backends/kafka/producer.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,75 @@
use crate::backends::kafka::config::KafkaConfig;
use crate::backends::kafka::types::KafkaPayload;
use crate::backends::Producer;
use crate::backends::Producer as ArroyoProducer;
use crate::backends::ProducerError;
use crate::types::TopicOrPartition;
use rdkafka::client::ClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::producer::{BaseProducer, BaseRecord};
use rdkafka::config::FromClientConfigAndContext;
use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
use rdkafka::producer::{DeliveryResult, ProducerContext};
use rdkafka::util::IntoOpaque;
use std::sync::Mutex;
use std::time::Duration;

pub struct KafkaProducer {
producer: Option<BaseProducer>,
struct CustomContext<T> {
callbacks: Mutex<Box<dyn DeliveryCallbacks<T>>>,
}

impl KafkaProducer {
pub fn new(config: KafkaConfig) -> Self {
impl<T> ClientContext for CustomContext<T> {}

impl<T: IntoOpaque> ProducerContext for CustomContext<T> {
type DeliveryOpaque = T;

fn delivery(
&self,
delivery_result: &DeliveryResult<'_>,
delivery_opaque: Self::DeliveryOpaque,
) {
match delivery_result.as_ref() {
Ok(_) => {
self.callbacks.lock().unwrap().on_delivery(delivery_opaque);
}
Err(_) => println!("Failed to deliver message"),
}
}
}

// TODO: We should probably use async functions instead of these complicated callbacks
// Keeping it this way in line with the Kafka consumer implementation
pub trait DeliveryCallbacks<T>: Send + Sync {
fn on_delivery(&mut self, msg_id: T);
}

pub struct KafkaProducer<T: 'static + IntoOpaque> {
producer: Option<ThreadedProducer<CustomContext<T>>>,
}

impl<T: 'static + IntoOpaque> KafkaProducer<T> {
pub fn new(config: KafkaConfig, delivery_callbacks: Box<dyn DeliveryCallbacks<T>>) -> Self {
let config_obj: ClientConfig = config.into();
let base_producer: BaseProducer<_> = config_obj.create().unwrap();

let producer = ThreadedProducer::from_config_and_context(
&config_obj,
CustomContext {
callbacks: Mutex::new(delivery_callbacks),
},
)
.unwrap();

Self {
producer: Some(base_producer),
producer: Some(producer),
}
}
}

impl Producer<KafkaPayload> for KafkaProducer {
fn produce(&self, destination: &TopicOrPartition, payload: &KafkaPayload) {
impl<T: IntoOpaque> ArroyoProducer<KafkaPayload, T> for KafkaProducer<T> {
fn produce(
&self,
destination: &TopicOrPartition,
payload: &KafkaPayload,
msg_id: T,
) -> Result<(), ProducerError> {
let topic = match destination {
TopicOrPartition::Topic(topic) => topic.name.as_ref(),
TopicOrPartition::Partition(partition) => partition.topic.name.as_ref(),
Expand All @@ -32,7 +80,9 @@ impl Producer<KafkaPayload> for KafkaProducer {
let msg_key = payload_copy.key.unwrap_or_default();
let msg_payload = payload_copy.payload.unwrap_or_default();

let mut base_record = BaseRecord::to(topic).payload(&msg_payload).key(&msg_key);
let mut base_record = BaseRecord::with_opaque_to(topic, msg_id)
.payload(&msg_payload)
.key(&msg_key);

let partition = match destination {
TopicOrPartition::Topic(_) => None,
Expand All @@ -45,16 +95,29 @@ impl Producer<KafkaPayload> for KafkaProducer {

let producer = self.producer.as_ref().expect("Not closed");

producer.send(base_record).expect("Something went wrong");
let res = producer.send(base_record);

if let Err(err) = res {
let t = err.0;
return Err(t.into());
}

Ok(())
}

fn flush(&self) {
let producer = self.producer.as_ref().unwrap();
producer.flush(Duration::from_millis(10_000));
}

fn close(&mut self) {
self.producer = None;
}
}

#[cfg(test)]
mod tests {
use super::KafkaProducer;
use super::{DeliveryCallbacks, KafkaProducer};
use crate::backends::kafka::config::KafkaConfig;
use crate::backends::kafka::types::KafkaPayload;
use crate::backends::Producer;
Expand All @@ -68,14 +131,23 @@ mod tests {
let configuration =
KafkaConfig::new_producer_config(vec!["localhost:9092".to_string()], None);

let mut producer = KafkaProducer::new(configuration);
struct MyCallbacks {}

impl DeliveryCallbacks<usize> for MyCallbacks {
fn on_delivery(&mut self, msg_id: usize) {
println!("Message ID {}", msg_id);
}
}

let mut producer: KafkaProducer<usize> =
KafkaProducer::new(configuration, Box::new(MyCallbacks {}));

let payload = KafkaPayload {
key: None,
headers: None,
payload: Some("asdf".as_bytes().to_vec()),
};
producer.produce(&destination, &payload);
producer.produce(&destination, &payload, 1).unwrap();
producer.close();
}
}
21 changes: 19 additions & 2 deletions src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ pub enum ConsumerError {
BrokerError(#[from] Box<dyn std::error::Error>),
}

#[non_exhaustive]
#[derive(Error, Debug)]
pub enum ProducerError {
#[error("Too many in flight requests")]
QueueFull,

#[error(transparent)]
Other(#[from] Box<dyn std::error::Error>),
}

/// This is basically an observer pattern to receive the callbacks from
/// the consumer when partitions are assigned/revoked.
pub trait AssignmentCallbacks: Send + Sync {
Expand Down Expand Up @@ -155,9 +165,16 @@ pub trait Consumer<'a, TPayload: Clone> {
fn closed(&self) -> bool;
}

pub trait Producer<TPayload> {
pub trait Producer<TPayload, T> {
/// Produce to a topic or partition.
fn produce(&self, destination: &TopicOrPartition, payload: &TPayload);
fn produce(
&self,
destination: &TopicOrPartition,
payload: &TPayload,
msg_id: T,
) -> Result<(), ProducerError>;

fn flush(&self);

fn close(&mut self);
}
118 changes: 96 additions & 22 deletions src/bin/consumer/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,120 @@ extern crate rust_arroyo;

use crate::rust_arroyo::backends::Producer;
use clap::{App, Arg};
use log::debug;
// use log::debug;
use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::producer::DeliveryCallbacks;
use rust_arroyo::backends::kafka::producer::KafkaProducer;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::KafkaConsumer;
use rust_arroyo::backends::AssignmentCallbacks;
use rust_arroyo::backends::ProducerError;
use rust_arroyo::processing::strategies::ProcessingStrategyFactory;
use rust_arroyo::processing::strategies::{CommitRequest, MessageRejected, ProcessingStrategy};
use rust_arroyo::processing::strategies::{CommitRequest, ProcessingError, ProcessingStrategy};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::Message;
use rust_arroyo::types::{Partition, Topic, TopicOrPartition};
use rust_arroyo::types::{Message, Partition, Position, Topic, TopicOrPartition};
use std::collections::HashMap;
use std::time::Duration;
use std::mem;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};

const COMMIT_INTERVAL: Duration = Duration::from_millis(500);

#[derive(Debug)]
struct CommitData {
partition: Partition,
position: Position,
}

struct EmptyCallbacks {}
impl AssignmentCallbacks for EmptyCallbacks {
fn on_assign(&mut self, _: HashMap<Partition, u64>) {}
fn on_revoke(&mut self, _: Vec<Partition>) {}
}

// TODO: We probably want to replace this all with async
struct ProducerCallbacks {
offsets: Arc<Mutex<HashMap<Partition, Position>>>,
}

impl DeliveryCallbacks<Box<CommitData>> for ProducerCallbacks {
fn on_delivery(&mut self, msg_data: Box<CommitData>) {
let mut offsets = self.offsets.lock().unwrap();
offsets.insert(msg_data.partition, msg_data.position);
}
}

struct Next {
destination: TopicOrPartition,
producer: KafkaProducer,
producer: KafkaProducer<Box<CommitData>>,
last_commit: SystemTime,
offsets: Arc<Mutex<HashMap<Partition, Position>>>,
}

impl Next {
pub fn new(destination: TopicOrPartition, broker: String) -> Self {
let config = KafkaConfig::new_producer_config(vec![broker], None);

let offsets = Arc::new(Mutex::new(HashMap::new()));
let producer = KafkaProducer::new(
config,
Box::new(ProducerCallbacks {
offsets: offsets.clone(),
}),
);

Self {
destination,
producer,
last_commit: SystemTime::now(),
offsets,
}
}
}

impl ProcessingStrategy<KafkaPayload> for Next {
fn poll(&mut self) -> Option<CommitRequest> {
let now = SystemTime::now();
let diff = now.duration_since(self.last_commit).unwrap();
if diff > COMMIT_INTERVAL {
println!("Committing");
let prev = mem::take(&mut self.offsets);

let mut positions_to_commit = HashMap::new();
for (k, v) in prev.lock().unwrap().iter() {
positions_to_commit.insert(k.clone(), v.clone());
}

self.last_commit = now;

return Some(CommitRequest {
positions: positions_to_commit,
});
}

None
}

fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), MessageRejected> {
self.producer.produce(&self.destination, &message.payload);
debug!("Produced message offset {}", message.offset);
fn submit(&mut self, message: Message<KafkaPayload>) -> Result<(), ProcessingError> {
let offset_to_commit = message.next_offset();
let res = self.producer.produce(
&self.destination,
&message.payload,
Box::new(CommitData {
partition: message.partition,
position: Position {
offset: offset_to_commit,
timestamp: message.timestamp,
},
}),
);

// TODO: MessageRejected should be handled by the StreamProcessor but
// is not currently.
if let Err(ProducerError::QueueFull) = res {
return Err(ProcessingError::MessageRejected);
}
Comment on lines +114 to +116
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to start using futures instead? The issue with this approach, I think, is that it puts you in a CPU tight loop while you wait for the message to be produced, while you can leave the CPU free by waiting on a fuiture.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using futures might make more sense. But this is not actually a synchronous function currently and we are not waiting. It fires a delivery callback when it's done.


Ok(())
}

Expand All @@ -52,16 +134,14 @@ struct StrategyFactory {
}
impl ProcessingStrategyFactory<KafkaPayload> for StrategyFactory {
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
let config = KafkaConfig::new_producer_config(vec![self.broker.clone()], None);
let producer = KafkaProducer::new(config);
Box::new(Next {
destination: TopicOrPartition::Topic({
Box::new(Next::new(
TopicOrPartition::Topic({
Topic {
name: self.destination_topic.clone(),
}
}),
producer,
})
self.broker.clone(),
))
}
}

Expand Down Expand Up @@ -126,13 +206,7 @@ fn main() {
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
let dest_topic = matches.value_of("dest-topic").unwrap();
// TODO: implement this
/* let _batch_size = matches
.value_of("batch_size")
.unwrap()
.parse::<usize>()
.unwrap();*/
env_logger::init();
// env_logger::init();
let config = KafkaConfig::new_consumer_config(
vec![brokers.to_string()],
group_id.to_string(),
Expand Down
Loading