Skip to content
This repository was archived by the owner on Nov 8, 2023. It is now read-only.

Commit 322a8d2

Browse files
committed
try to_owned
1 parent 9f70ed2 commit 322a8d2

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

src/backends/kafka/mod.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,18 @@ impl<'a> KafkaPayload<'a> {
4444
}
4545
}
4646
}
47-
impl<'a> Clone for KafkaPayload<'a> {
48-
fn clone(&self) -> KafkaPayload<'a> {
47+
// impl<'a> Clone for KafkaPayload<'a> {
48+
// fn clone(&self) -> KafkaPayload<'a> {
49+
// match self {
50+
// Self::Borrowed(ref msg) => Self::Owned(msg.detach()),
51+
// Self::Owned(ref msg) => Self::Owned(msg.clone()),
52+
// }
53+
// }
54+
// }
55+
56+
impl ToOwned for KafkaPayload<'static> {
57+
type Owned = KafkaPayload<'static>;
58+
fn to_owned(&self) -> KafkaPayload<'static> {
4959
match self {
5060
Self::Borrowed(ref msg) => Self::Owned(msg.detach()),
5161
Self::Owned(ref msg) => Self::Owned(msg.clone()),

src/processing/mod.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,20 @@ impl<'a> StreamProcessor<'a> {
112112
} else {
113113
// Otherwise, we need to try fetch a new message from the consumer,
114114
// even if there is no active assignment and/or processing strategy.
115-
let msg = self.consumer.poll(Some(Duration::from_secs(1)));
115+
let msg = self
116+
.consumer
117+
.poll(Some(Duration::from_secs(1)))
118+
.unwrap()
119+
.unwrap()
120+
.to_owned();
121+
122+
self.message = Some(msg)
123+
116124
//TODO: Support errors properly
117-
match msg {
118-
Ok(m) => self.message = m,
119-
Err(_) => return Err(RunError::PollError),
120-
}
125+
// match msg {
126+
// Ok(m) => self.message = m,
127+
// Err(_) => return Err(RunError::PollError),
128+
// }
121129
}
122130

123131
let mut trait_callbacks = self.strategies.lock().unwrap();

0 commit comments

Comments
 (0)