diff --git a/examples/base_processor.rs b/examples/base_processor.rs index da9b5ba..aa49ae3 100644 --- a/examples/base_processor.rs +++ b/examples/base_processor.rs @@ -4,7 +4,7 @@ use rust_arroyo::backends::kafka::config::KafkaConfig; use rust_arroyo::backends::kafka::types::KafkaPayload; use rust_arroyo::backends::kafka::KafkaConsumer; use rust_arroyo::processing::strategies::{ - CommitRequest, MessageRejected, ProcessingStrategy, ProcessingStrategyFactory, + CommitRequest, ProcessingError, ProcessingStrategy, ProcessingStrategyFactory, }; use rust_arroyo::processing::StreamProcessor; use rust_arroyo::types::{Message, Partition, Position, Topic}; @@ -30,7 +30,7 @@ impl ProcessingStrategy for TestStrategy { } } - fn submit(&mut self, message: Message) -> Result<(), MessageRejected> { + fn submit(&mut self, message: Message) -> Result<(), ProcessingError> { println!("SUBMIT {}", message); self.partitions.insert( message.partition, diff --git a/src/backends/kafka/errors.rs b/src/backends/kafka/errors.rs index 6d12b65..5768c91 100644 --- a/src/backends/kafka/errors.rs +++ b/src/backends/kafka/errors.rs @@ -1,6 +1,6 @@ use rdkafka::error::{KafkaError, RDKafkaErrorCode}; -use crate::backends::ConsumerError; +use crate::backends::{ConsumerError, ProducerError}; impl From for ConsumerError { fn from(err: KafkaError) -> Self { @@ -14,3 +14,12 @@ impl From for ConsumerError { } } } + +impl From for ProducerError { + fn from(err: KafkaError) -> Self { + match err { + KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => ProducerError::QueueFull, + other => ProducerError::Other(Box::new(other)), + } + } +} diff --git a/src/backends/kafka/mod.rs b/src/backends/kafka/mod.rs index a4e37bf..6990fc7 100644 --- a/src/backends/kafka/mod.rs +++ b/src/backends/kafka/mod.rs @@ -287,6 +287,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { let consumer = self.consumer.as_mut().unwrap(); let partitions = TopicPartitionList::from_topic_map(&topic_map).unwrap(); + consumer.commit(&partitions, CommitMode::Sync).unwrap(); // Clear staged offsets diff --git a/src/backends/kafka/producer.rs b/src/backends/kafka/producer.rs index 50155ea..a836ec5 100644 --- a/src/backends/kafka/producer.rs +++ b/src/backends/kafka/producer.rs @@ -1,27 +1,79 @@ use crate::backends::kafka::config::KafkaConfig; use crate::backends::kafka::types::KafkaPayload; use crate::backends::Producer as ArroyoProducer; +use crate::backends::ProducerError; use crate::types::TopicOrPartition; +use rdkafka::client::ClientContext; use rdkafka::config::ClientConfig; -use rdkafka::producer::{BaseProducer, BaseRecord, Producer}; +use rdkafka::config::FromClientConfigAndContext; +use rdkafka::producer::{BaseRecord, DeliveryResult, Producer, ProducerContext, ThreadedProducer}; +use rdkafka::util::IntoOpaque; +use std::sync::Mutex; use std::time::Duration; -pub struct KafkaProducer { - producer: Option, +struct CustomContext { + callbacks: Mutex>>, } -impl KafkaProducer { - pub fn new(config: KafkaConfig) -> Self { +struct EmptyCallbacks {} + +impl DeliveryCallbacks for EmptyCallbacks { + fn on_delivery(&mut self, _: T) {} +} + +impl ClientContext for CustomContext {} + +impl ProducerContext for CustomContext { + type DeliveryOpaque = T; + + fn delivery( + &self, + delivery_result: &DeliveryResult<'_>, + delivery_opaque: Self::DeliveryOpaque, + ) { + match delivery_result.as_ref() { + Ok(_) => { + self.callbacks.lock().unwrap().on_delivery(delivery_opaque); + } + Err(_) => println!("Failed to deliver message"), + } + } +} + +// TODO: We should probably use async functions instead of these complicated callbacks +// Keeping it this way in line with the Kafka consumer implementation +pub trait DeliveryCallbacks: Send + Sync { + fn on_delivery(&mut self, msg_id: T); +} + +pub struct KafkaProducer { + producer: Option>>, +} + +impl KafkaProducer { + pub fn new( + config: KafkaConfig, + delivery_callbacks: Option>>, + ) -> Self { let config_obj: ClientConfig = config.into(); - let base_producer: BaseProducer<_> = config_obj.create().unwrap(); + + let callbacks = delivery_callbacks.unwrap_or_else(|| Box::new(EmptyCallbacks {})); + + let producer = ThreadedProducer::from_config_and_context( + &config_obj, + CustomContext { + callbacks: Mutex::new(callbacks), + }, + ) + .unwrap(); Self { - producer: Some(base_producer), + producer: Some(producer), } } } -impl KafkaProducer { +impl KafkaProducer { pub fn poll(&self) { let producer = self.producer.as_ref().unwrap(); producer.poll(Duration::ZERO); @@ -33,8 +85,13 @@ impl KafkaProducer { } } -impl ArroyoProducer for KafkaProducer { - fn produce(&self, destination: &TopicOrPartition, payload: &KafkaPayload) { +impl ArroyoProducer for KafkaProducer { + fn produce( + &self, + destination: &TopicOrPartition, + payload: &KafkaPayload, + msg_id: T, + ) -> Result<(), ProducerError> { let topic = match destination { TopicOrPartition::Topic(topic) => topic.name.as_ref(), TopicOrPartition::Partition(partition) => partition.topic.name.as_ref(), @@ -45,7 +102,9 @@ impl ArroyoProducer for KafkaProducer { let msg_key = payload_copy.key.unwrap_or_default(); let msg_payload = payload_copy.payload.unwrap_or_default(); - let mut base_record = BaseRecord::to(topic).payload(&msg_payload).key(&msg_key); + let mut base_record = BaseRecord::with_opaque_to(topic, msg_id) + .payload(&msg_payload) + .key(&msg_key); let partition = match destination { TopicOrPartition::Topic(_) => None, @@ -58,8 +117,16 @@ impl ArroyoProducer for KafkaProducer { let producer = self.producer.as_ref().expect("Not closed"); - producer.send(base_record).expect("Something went wrong"); + let res = producer.send(base_record); + + if let Err(err) = res { + let t = err.0; + return Err(t.into()); + } + + Ok(()) } + fn close(&mut self) { self.producer = None; } @@ -67,7 +134,7 @@ impl ArroyoProducer for KafkaProducer { #[cfg(test)] mod tests { - use super::KafkaProducer; + use super::{DeliveryCallbacks, KafkaProducer}; use crate::backends::kafka::config::KafkaConfig; use crate::backends::kafka::types::KafkaPayload; use crate::backends::Producer; @@ -81,14 +148,23 @@ mod tests { let configuration = KafkaConfig::new_producer_config(vec!["localhost:9092".to_string()], None); - let mut producer = KafkaProducer::new(configuration); + struct MyCallbacks {} + + impl DeliveryCallbacks for MyCallbacks { + fn on_delivery(&mut self, msg_id: usize) { + println!("Message ID {}", msg_id); + } + } + + let mut producer: KafkaProducer = + KafkaProducer::new(configuration, Some(Box::new(MyCallbacks {}))); let payload = KafkaPayload { key: None, headers: None, payload: Some("asdf".as_bytes().to_vec()), }; - producer.produce(&destination, &payload); + producer.produce(&destination, &payload, 1).unwrap(); producer.close(); } } diff --git a/src/backends/mod.rs b/src/backends/mod.rs index 4fe3cb6..ef4a23e 100755 --- a/src/backends/mod.rs +++ b/src/backends/mod.rs @@ -32,6 +32,16 @@ pub enum ConsumerError { BrokerError(#[from] Box), } +#[non_exhaustive] +#[derive(Error, Debug)] +pub enum ProducerError { + #[error("Too many in flight requests")] + QueueFull, + + #[error(transparent)] + Other(#[from] Box), +} + /// This is basically an observer pattern to receive the callbacks from /// the consumer when partitions are assigned/revoked. pub trait AssignmentCallbacks: Send + Sync { @@ -155,9 +165,14 @@ pub trait Consumer<'a, TPayload: Clone> { fn closed(&self) -> bool; } -pub trait Producer { +pub trait Producer { /// Produce to a topic or partition. - fn produce(&self, destination: &TopicOrPartition, payload: &TPayload); + fn produce( + &self, + destination: &TopicOrPartition, + payload: &TPayload, + msg_id: T, + ) -> Result<(), ProducerError>; fn close(&mut self); } diff --git a/src/bin/consumer/errors.rs b/src/bin/consumer/errors.rs index 472aeca..9cd7a8c 100644 --- a/src/bin/consumer/errors.rs +++ b/src/bin/consumer/errors.rs @@ -2,19 +2,29 @@ extern crate rust_arroyo; use crate::rust_arroyo::backends::Producer; use clap::{App, Arg}; -use log::debug; +// use log::debug; use rust_arroyo::backends::kafka::config::KafkaConfig; +use rust_arroyo::backends::kafka::producer::DeliveryCallbacks; use rust_arroyo::backends::kafka::producer::KafkaProducer; use rust_arroyo::backends::kafka::types::KafkaPayload; use rust_arroyo::backends::kafka::KafkaConsumer; use rust_arroyo::backends::AssignmentCallbacks; +use rust_arroyo::backends::ProducerError; use rust_arroyo::processing::strategies::ProcessingStrategyFactory; -use rust_arroyo::processing::strategies::{CommitRequest, MessageRejected, ProcessingStrategy}; +use rust_arroyo::processing::strategies::{CommitRequest, ProcessingError, ProcessingStrategy}; use rust_arroyo::processing::StreamProcessor; -use rust_arroyo::types::Message; -use rust_arroyo::types::{Partition, Topic, TopicOrPartition}; +use rust_arroyo::types::{Message, Partition, Position, Topic, TopicOrPartition}; use std::collections::HashMap; -use std::time::Duration; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +const COMMIT_INTERVAL: Duration = Duration::from_millis(500); + +#[derive(Debug)] +struct CommitData { + partition: Partition, + position: Position, +} struct EmptyCallbacks {} impl AssignmentCallbacks for EmptyCallbacks { @@ -22,18 +32,89 @@ impl AssignmentCallbacks for EmptyCallbacks { fn on_revoke(&mut self, _: Vec) {} } +// TODO: We probably want to replace this all with async +struct ProducerCallbacks { + offsets: Arc>>, +} + +impl DeliveryCallbacks> for ProducerCallbacks { + fn on_delivery(&mut self, msg_data: Box) { + let mut offsets = self.offsets.lock().unwrap(); + offsets.insert(msg_data.partition, msg_data.position); + } +} + struct Next { destination: TopicOrPartition, - producer: KafkaProducer, + producer: KafkaProducer>, + last_commit: SystemTime, + offsets: Arc>>, +} + +impl Next { + pub fn new(destination: TopicOrPartition, broker: String) -> Self { + let config = KafkaConfig::new_producer_config(vec![broker], None); + + let offsets = Arc::new(Mutex::new(HashMap::new())); + let producer = KafkaProducer::new( + config, + Some(Box::new(ProducerCallbacks { + offsets: offsets.clone(), + })), + ); + + Self { + destination, + producer, + last_commit: SystemTime::now(), + offsets, + } + } } + impl ProcessingStrategy for Next { fn poll(&mut self) -> Option { + let now = SystemTime::now(); + let diff = now.duration_since(self.last_commit).unwrap(); + + if diff > COMMIT_INTERVAL { + let mut offsets = self.offsets.lock().unwrap(); + if !offsets.is_empty() { + let mut positions_to_commit = HashMap::new(); + for (k, v) in offsets.iter() { + positions_to_commit.insert(k.clone(), v.clone()); + } + offsets.clear(); + self.last_commit = now; + return Some(CommitRequest { + positions: positions_to_commit, + }); + } + } + None } - fn submit(&mut self, message: Message) -> Result<(), MessageRejected> { - self.producer.produce(&self.destination, &message.payload); - debug!("Produced message offset {}", message.offset); + fn submit(&mut self, message: Message) -> Result<(), ProcessingError> { + let offset_to_commit = message.next_offset(); + let res = self.producer.produce( + &self.destination, + &message.payload, + Box::new(CommitData { + partition: message.partition, + position: Position { + offset: offset_to_commit, + timestamp: message.timestamp, + }, + }), + ); + + // TODO: MessageRejected should be handled by the StreamProcessor but + // is not currently. + if let Err(ProducerError::QueueFull) = res { + return Err(ProcessingError::MessageRejected); + } + Ok(()) } @@ -52,16 +133,14 @@ struct StrategyFactory { } impl ProcessingStrategyFactory for StrategyFactory { fn create(&self) -> Box> { - let config = KafkaConfig::new_producer_config(vec![self.broker.clone()], None); - let producer = KafkaProducer::new(config); - Box::new(Next { - destination: TopicOrPartition::Topic({ + Box::new(Next::new( + TopicOrPartition::Topic({ Topic { name: self.destination_topic.clone(), } }), - producer, - }) + self.broker.clone(), + )) } } @@ -126,13 +205,7 @@ fn main() { let brokers = matches.value_of("brokers").unwrap(); let group_id = matches.value_of("group-id").unwrap(); let dest_topic = matches.value_of("dest-topic").unwrap(); - // TODO: implement this - /* let _batch_size = matches - .value_of("batch_size") - .unwrap() - .parse::() - .unwrap();*/ - env_logger::init(); + // env_logger::init(); let config = KafkaConfig::new_consumer_config( vec![brokers.to_string()], group_id.to_string(), diff --git a/src/bin/mocks/generate_data.rs b/src/bin/mocks/generate_data.rs index 1f5c854..e18a144 100644 --- a/src/bin/mocks/generate_data.rs +++ b/src/bin/mocks/generate_data.rs @@ -74,7 +74,7 @@ fn main() { let destination = TopicOrPartition::Topic(topic); let config = KafkaConfig::new_producer_config(vec!["localhost:9092".to_string()], None); - let mut producer = KafkaProducer::new(config); + let mut producer = KafkaProducer::new(config, None); for _ in 0..number { let payload = KafkaPayload { @@ -82,7 +82,7 @@ fn main() { headers: None, payload: Some(generate_metric().as_bytes().to_vec()), }; - producer.produce(&destination, &payload); + producer.produce(&destination, &payload, 1).unwrap(); producer.poll(); } diff --git a/src/processing/mod.rs b/src/processing/mod.rs index 5f1e9f2..8e6456b 100644 --- a/src/processing/mod.rs +++ b/src/processing/mod.rs @@ -208,7 +208,7 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { #[cfg(test)] mod tests { use super::strategies::{ - CommitRequest, MessageRejected, ProcessingStrategy, ProcessingStrategyFactory, + CommitRequest, ProcessingError, ProcessingStrategy, ProcessingStrategyFactory, }; use super::StreamProcessor; use crate::backends::local::broker::LocalBroker; @@ -240,7 +240,7 @@ mod tests { } } - fn submit(&mut self, message: Message) -> Result<(), MessageRejected> { + fn submit(&mut self, message: Message) -> Result<(), ProcessingError> { self.message = Some(message); Ok(()) } diff --git a/src/processing/strategies/mod.rs b/src/processing/strategies/mod.rs index 18954db..00efecc 100644 --- a/src/processing/strategies/mod.rs +++ b/src/processing/strategies/mod.rs @@ -1,15 +1,18 @@ use crate::types::{Message, Partition, Position}; use std::collections::HashMap; use std::time::Duration; +use thiserror::Error; pub mod noop; pub mod transform; -#[derive(Debug, Clone)] -pub struct MessageRejected; - -#[derive(Debug, Clone)] -pub struct InvalidMessage; +#[derive(Error, Debug)] +pub enum ProcessingError { + #[error("Retryable error")] + MessageRejected, + #[error("Invalid message")] + InvalidMessage, +} /// Signals that we need to commit offsets #[derive(Debug, Clone, PartialEq)] @@ -47,7 +50,7 @@ pub trait ProcessingStrategy: Send + Sync { /// If the processing strategy is unable to accept a message (due to it /// being at or over capacity, for example), this method will raise a /// ``MessageRejected`` exception. - fn submit(&mut self, message: Message) -> Result<(), MessageRejected>; + fn submit(&mut self, message: Message) -> Result<(), ProcessingError>; /// Close this instance. No more messages should be accepted by the /// instance after this method has been called. diff --git a/src/processing/strategies/noop.rs b/src/processing/strategies/noop.rs index 7b0893f..d07e300 100644 --- a/src/processing/strategies/noop.rs +++ b/src/processing/strategies/noop.rs @@ -1,5 +1,5 @@ use crate::backends::kafka::types::KafkaPayload; -use crate::processing::strategies::{CommitRequest, MessageRejected, ProcessingStrategy}; +use crate::processing::strategies::{CommitRequest, ProcessingError, ProcessingStrategy}; use crate::types::{Message, Partition, Position}; use log::info; use std::collections::HashMap; @@ -15,7 +15,7 @@ impl ProcessingStrategy for NoopCommit { self.commit(false) } - fn submit(&mut self, message: Message) -> Result<(), MessageRejected> { + fn submit(&mut self, message: Message) -> Result<(), ProcessingError> { let next_offset = message.next_offset(); self.partitions.insert( message.partition, diff --git a/src/processing/strategies/transform.rs b/src/processing/strategies/transform.rs index 5a9389a..3fca144 100644 --- a/src/processing/strategies/transform.rs +++ b/src/processing/strategies/transform.rs @@ -1,11 +1,9 @@ -use crate::processing::strategies::{ - CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, -}; +use crate::processing::strategies::{CommitRequest, ProcessingError, ProcessingStrategy}; use crate::types::Message; use std::time::Duration; pub struct Transform { - pub function: fn(TPayload) -> Result, + pub function: fn(TPayload) -> Result, pub next_step: Box>, } @@ -16,7 +14,7 @@ impl Processin self.next_step.poll() } - fn submit(&mut self, message: Message) -> Result<(), MessageRejected> { + fn submit(&mut self, message: Message) -> Result<(), ProcessingError> { // TODO: Handle InvalidMessage let transformed = (self.function)(message.payload).unwrap(); @@ -44,16 +42,14 @@ impl Processin #[cfg(test)] mod tests { use super::Transform; - use crate::processing::strategies::{ - CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, - }; + use crate::processing::strategies::{CommitRequest, ProcessingError, ProcessingStrategy}; use crate::types::{Message, Partition, Topic}; use chrono::Utc; use std::time::Duration; #[test] fn test_transform() { - fn identity(value: String) -> Result { + fn identity(value: String) -> Result { Ok(value) } @@ -62,7 +58,7 @@ mod tests { fn poll(&mut self) -> Option { None } - fn submit(&mut self, _message: Message) -> Result<(), MessageRejected> { + fn submit(&mut self, _message: Message) -> Result<(), ProcessingError> { Ok(()) } fn close(&mut self) {}