Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.

Commit 9f70ed2

Browse files
committed
markus fix for kafka payload
1 parent c088997 commit 9f70ed2

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

src/backends/kafka/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::Consumer as ArroyoConsumer;
2-
use super::{AssignmentCallbacks, ConsumeError, ConsumerClosed, PauseError, PollError};
2+
use super::{AssignmentCallbacks, ConsumeError, ConsumerClosed, PauseError, Payload, PollError};
33
use crate::types::Message as ArroyoMessage;
44
use crate::types::{Partition, Position, Topic};
55
use chrono::{DateTime, NaiveDateTime, Utc};
@@ -53,7 +53,7 @@ impl<'a> Clone for KafkaPayload<'a> {
5353
}
5454
}
5555

56-
fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage<KafkaPayload> {
56+
fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage<Payload> {
5757
let topic = Topic {
5858
name: msg.topic().to_string(),
5959
};
@@ -66,7 +66,7 @@ fn create_kafka_message(msg: BorrowedMessage) -> ArroyoMessage<KafkaPayload> {
6666
ArroyoMessage::new(
6767
partition,
6868
msg.offset() as u64,
69-
KafkaPayload::new(msg),
69+
Payload::Kafka(KafkaPayload::new(msg)),
7070
DateTime::from_utc(NaiveDateTime::from_timestamp(time_millis, 0), Utc),
7171
)
7272
}
@@ -183,7 +183,7 @@ impl ArroyoConsumer for KafkaConsumer {
183183
fn poll(
184184
&self,
185185
timeout: Option<Duration>,
186-
) -> Result<Option<ArroyoMessage<KafkaPayload<'_>>>, PollError> {
186+
) -> Result<Option<ArroyoMessage<Payload<'_>>>, PollError> {
187187
let duration = timeout.unwrap_or(Duration::from_millis(100));
188188

189189
match self.consumer.as_ref() {

0 commit comments

Comments
 (0)