Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.

Commit 16194f6

Browse files
committed
Add recv to Arroyo Kafka consumer
1 parent 78cc14a commit 16194f6

File tree

3 files changed

+26
-20
lines changed

3 files changed

+26
-20
lines changed

src/backends/kafka/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,14 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
209209
}
210210
}
211211

212+
async fn recv(&mut self) -> Result<ArroyoMessage<KafkaPayload>, ConsumerError> {
213+
let consumer = self.consumer.as_mut().unwrap();
214+
match consumer.recv().await {
215+
Ok(result) => Ok(create_kafka_message(result)),
216+
Err(e) => Err(ConsumerError::BrokerError(Box::new(e))),
217+
}
218+
}
219+
212220
fn pause(&mut self, partitions: HashSet<Partition>) -> Result<(), ConsumerError> {
213221
self.state.assert_consuming_state()?;
214222

src/backends/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ pub trait Consumer<'a, TPayload: Clone> {
9595
timeout: Option<Duration>,
9696
) -> Result<Option<Message<TPayload>>, ConsumerError>;
9797

98+
async fn recv(&mut self) -> Result<Message<TPayload>, ConsumerError>;
99+
98100
/// Pause consuming from the provided partitions.
99101
///
100102
/// A partition that is paused will be automatically resumed during

src/bin/consumer/asyncnoop.rs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use rust_arroyo::types::{Partition, Topic};
1111
use std::collections::HashMap;
1212
use std::time::Duration;
1313
use std::time::SystemTime;
14+
use tokio::time::timeout;
1415

1516
struct EmptyCallbacks {}
1617
impl AssignmentCallbacks for EmptyCallbacks {
@@ -64,30 +65,25 @@ async fn consume_and_produce(
6465
source_topic: source_topic.to_string(),
6566
};
6667
loop {
67-
match consumer.poll(Some(Duration::from_secs(2))).await {
68-
Ok(result) => match result {
69-
None => {
70-
match strategy.poll().await {
71-
Some(request) => {
72-
consumer.stage_positions(request.positions).await.unwrap();
73-
consumer.commit_positions().await.unwrap();
74-
//info!("Committed: {:?}", request);
68+
match timeout(Duration::from_secs(2), consumer.recv()).await {
69+
Ok(result) => {
70+
match result {
71+
Ok(message) => {
72+
match strategy.poll().await {
73+
Some(request) => {
74+
consumer.stage_positions(request.positions).await.unwrap();
75+
consumer.commit_positions().await.unwrap();
76+
//info!("Committed: {:?}", request);
77+
}
78+
None => {}
7579
}
76-
None => {}
80+
strategy.submit(message).await;
7781
}
78-
}
79-
Some(m) => {
80-
match strategy.poll().await {
81-
Some(request) => {
82-
consumer.stage_positions(request.positions).await.unwrap();
83-
consumer.commit_positions().await.unwrap();
84-
//info!("Committed: {:?}", request);
85-
}
86-
None => {}
82+
Err(e) => {
83+
panic!("Kafka error: {}", e)
8784
}
88-
strategy.submit(m).await;
8985
}
90-
},
86+
}
9187
Err(_) => {
9288
error!("timeoout, flushing batch");
9389
match strategy.poll().await {

0 commit comments

Comments
 (0)