|
1 | 1 | pub mod strategies;
|
2 | 2 |
|
3 | 3 | use crate::backends::kafka::create_kafka_message;
|
| 4 | +use crate::backends::kafka::types::KafkaPayload; |
4 | 5 | use crate::backends::{AssignmentCallbacks, Consumer};
|
5 | 6 | use crate::processing::strategies::async_noop::build_topic_partitions;
|
6 | 7 | use crate::processing::strategies::async_noop::AsyncNoopCommit;
|
@@ -222,12 +223,56 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> {
|
222 | 223 | pub struct StreamingStreamProcessor {
|
223 | 224 | pub consumer: StreamConsumer<CustomContext>,
|
224 | 225 | pub strategy: AsyncNoopCommit,
|
225 |
| - //message: Option<Message<TPayload>>, |
| 226 | + pub message: Option<Message<KafkaPayload>>, |
226 | 227 | pub shutdown_requested: bool,
|
227 | 228 | }
|
228 | 229 |
|
229 | 230 | impl StreamingStreamProcessor {
|
230 | 231 | pub async fn run_once(&mut self) -> Result<(), RunError> {
|
| 232 | + let message_carried_over = self.message.is_some(); |
| 233 | + |
| 234 | + if message_carried_over { |
| 235 | + // If a message was carried over from the previous run, the consumer |
| 236 | + // should be paused and not returning any messages on ``poll``. |
| 237 | + let res = timeout(Duration::from_secs(2), self.consumer.recv()).await; |
| 238 | + match res { |
| 239 | + Err(_) => {} |
| 240 | + Ok(_) => return Err(RunError::InvalidState), |
| 241 | + } |
| 242 | + } else { |
| 243 | + // Otherwise, we need to try fetch a new message from the consumer, |
| 244 | + // even if there is no active assignment and/or processing strategy. |
| 245 | + let msg = timeout(Duration::from_secs(2), self.consumer.recv()).await; |
| 246 | + //TODO: Support errors properly |
| 247 | + match msg { |
| 248 | + Ok(m) => match m { |
| 249 | + Ok(msg) => { |
| 250 | + self.message = Some(create_kafka_message(msg)); |
| 251 | + } |
| 252 | + Err(_) => return Err(RunError::PollError), |
| 253 | + }, |
| 254 | + Err(_) => self.message = None, |
| 255 | + } |
| 256 | + } |
| 257 | + |
| 258 | + let commit_request = self.strategy.poll().await; |
| 259 | + match commit_request { |
| 260 | + None => {} |
| 261 | + Some(request) => { |
| 262 | + let part_list = build_topic_partitions(request); |
| 263 | + self.consumer.commit(&part_list, CommitMode::Sync).unwrap(); |
| 264 | + info!("Committed: {:?}", part_list); |
| 265 | + } |
| 266 | + }; |
| 267 | + |
| 268 | + let msg = replace(&mut self.message, None); |
| 269 | + if let Some(msg_s) = msg { |
| 270 | + self.strategy.submit(msg_s).await; |
| 271 | + } |
| 272 | + Ok(()) |
| 273 | + } |
| 274 | + |
| 275 | + pub async fn _run_once(&mut self) -> Result<(), RunError> { |
231 | 276 | match timeout(Duration::from_secs(2), self.consumer.recv()).await {
|
232 | 277 | Ok(result) => match result {
|
233 | 278 | Err(e) => panic!("Kafka error: {}", e),
|
|
0 commit comments