Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.

Commit 15120e6

Browse files
committed
Add processor
1 parent 7a29c4c commit 15120e6

File tree

2 files changed

+76
-35
lines changed

2 files changed

+76
-35
lines changed

src/bin/consumer/asyncnoop.rs

+12-35
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
11
use clap::{App, Arg};
2-
use log::{debug, error, info};
2+
use log::{debug, info};
33
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
44
use rdkafka::consumer::stream_consumer::StreamConsumer;
5-
use rdkafka::consumer::CommitMode;
65
use rdkafka::consumer::Consumer;
76
use rdkafka::producer::FutureProducer;
87
use rdkafka::util::get_rdkafka_version;
9-
use rust_arroyo::backends::kafka::create_kafka_message;
108
use rust_arroyo::backends::AssignmentCallbacks;
11-
use rust_arroyo::processing::strategies::async_noop::build_topic_partitions;
129
use rust_arroyo::processing::strategies::async_noop::AsyncNoopCommit;
1310
use rust_arroyo::processing::strategies::async_noop::CustomContext;
11+
use rust_arroyo::processing::StreamingStreamProcessor;
1412
use rust_arroyo::types::{Partition, Topic};
1513
use std::collections::HashMap;
16-
use std::time::Duration;
1714
use std::time::SystemTime;
18-
use tokio::time::timeout;
1915

2016
// A type alias with your custom consumer can be created for convenience.
2117
type LoggingConsumer = StreamConsumer<CustomContext>;
@@ -71,7 +67,7 @@ async fn consume_and_produce(
7167
);
7268
let batch = Vec::new();
7369

74-
let mut strategy = AsyncNoopCommit {
70+
let strategy = AsyncNoopCommit {
7571
topic: topic_clone,
7672
producer,
7773
batch,
@@ -80,35 +76,16 @@ async fn consume_and_produce(
8076
dest_topic: dest_topic.to_string(),
8177
source_topic: source_topic.to_string(),
8278
};
83-
loop {
84-
match timeout(Duration::from_secs(2), consumer.recv()).await {
85-
Ok(result) => match result {
86-
Err(e) => panic!("Kafka error: {}", e),
87-
Ok(m) => {
88-
match strategy.poll().await {
89-
Some(partition_list) => {
90-
let part_list = build_topic_partitions(partition_list);
91-
consumer.commit(&part_list, CommitMode::Sync).unwrap();
92-
info!("Committed: {:?}", part_list);
93-
}
94-
None => {}
95-
}
9679

97-
strategy.submit(create_kafka_message(m)).await;
98-
}
99-
},
100-
Err(_) => {
101-
error!("timeoout, flushing batch");
102-
match strategy.poll().await {
103-
Some(partition_list) => {
104-
let part_list = build_topic_partitions(partition_list);
105-
consumer.commit(&part_list, CommitMode::Sync).unwrap();
106-
info!("Committed: {:?}", part_list);
107-
}
108-
None => {}
109-
}
110-
}
111-
}
80+
let mut processor = StreamingStreamProcessor {
81+
consumer,
82+
strategy,
83+
shutdown_requested: false,
84+
};
85+
86+
match processor.run().await {
87+
Ok(_) => {}
88+
Err(_) => panic!("Kafka error"),
11289
}
11390
}
11491

src/processing/mod.rs

+64
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
pub mod strategies;
22

3+
use crate::backends::kafka::create_kafka_message;
34
use crate::backends::{AssignmentCallbacks, Consumer};
5+
use crate::processing::strategies::async_noop::build_topic_partitions;
6+
use crate::processing::strategies::async_noop::AsyncNoopCommit;
7+
use crate::processing::strategies::async_noop::CustomContext;
48
use crate::types::{Message, Partition, Topic};
59
use async_mutex::Mutex;
610
use futures::executor::block_on;
11+
use log::{error, info};
12+
use rdkafka::consumer::stream_consumer::StreamConsumer;
13+
use rdkafka::consumer::CommitMode;
14+
use rdkafka::consumer::Consumer as RdConsumer;
715
use std::collections::HashMap;
816
use std::mem::replace;
917
use std::sync::Arc;
1018
use std::time::Duration;
1119
use strategies::{ProcessingStrategy, ProcessingStrategyFactory};
20+
use tokio::time::timeout;
1221

1322
#[derive(Debug, Clone)]
1423
pub struct InvalidState;
@@ -210,6 +219,61 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {
210219
}
211220
}
212221

222+
pub struct StreamingStreamProcessor {
223+
pub consumer: StreamConsumer<CustomContext>,
224+
pub strategy: AsyncNoopCommit,
225+
//message: Option<Message<TPayload>>,
226+
pub shutdown_requested: bool,
227+
}
228+
229+
impl StreamingStreamProcessor {
230+
pub async fn run_once(&mut self) -> Result<(), RunError> {
231+
match timeout(Duration::from_secs(2), self.consumer.recv()).await {
232+
Ok(result) => match result {
233+
Err(e) => panic!("Kafka error: {}", e),
234+
Ok(m) => {
235+
match self.strategy.poll().await {
236+
Some(partition_list) => {
237+
let part_list = build_topic_partitions(partition_list);
238+
self.consumer.commit(&part_list, CommitMode::Sync).unwrap();
239+
info!("Committed: {:?}", part_list);
240+
}
241+
None => {}
242+
}
243+
244+
self.strategy.submit(create_kafka_message(m)).await;
245+
}
246+
},
247+
Err(_) => {
248+
error!("timeoout, flushing batch");
249+
match self.strategy.poll().await {
250+
Some(partition_list) => {
251+
let part_list = build_topic_partitions(partition_list);
252+
self.consumer.commit(&part_list, CommitMode::Sync).unwrap();
253+
info!("Committed: {:?}", part_list);
254+
}
255+
None => {}
256+
}
257+
}
258+
}
259+
Ok(())
260+
}
261+
262+
/// The main run loop, see class docstring for more information.
263+
pub async fn run(&mut self) -> Result<(), RunError> {
264+
loop {
265+
match self.run_once().await {
266+
Ok(()) => {}
267+
Err(e) => return Err(e),
268+
}
269+
}
270+
}
271+
272+
pub fn signal_shutdown(&mut self) {
273+
self.shutdown_requested = true;
274+
}
275+
}
276+
213277
#[cfg(test)]
214278
mod tests {
215279
use super::strategies::{

0 commit comments

Comments
 (0)