Skip to content

Commit 5397d7d

Browse files
committed
Redis: into_stream
1 parent 01f2b0d commit 5397d7d

File tree

3 files changed

+195
-4
lines changed

3 files changed

+195
-4
lines changed

sea-streamer-redis/src/consumer/mod.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ mod future;
33
mod node;
44
mod options;
55
mod shard;
6+
mod stream;
67

78
use cluster::*;
89
use future::StreamFuture;
910
pub use future::{NextFuture, StreamFuture as RedisMessageStream};
1011
use node::*;
1112
pub use options::*;
1213
use shard::*;
14+
pub use stream::*;
1315

1416
use flume::{bounded, unbounded, Receiver, Sender};
1517
use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};
@@ -209,10 +211,14 @@ impl RedisConsumer {
209211
}
210212
}
211213

214+
#[inline]
212215
fn auto_ack(&self, header: &MessageHeader) -> RedisResult<()> {
216+
Self::auto_ack_static(&self.handle, header)
217+
}
218+
219+
fn auto_ack_static(handle: &Sender<CtrlMsg>, header: &MessageHeader) -> RedisResult<()> {
213220
// unbounded, so never blocks
214-
if self
215-
.handle
221+
if handle
216222
.try_send(CtrlMsg::Ack(
217223
(header.stream_key().clone(), *header.shard_id()),
218224
get_message_id(header),
@@ -268,6 +274,15 @@ impl RedisConsumer {
268274
}
269275
Ok(())
270276
}
277+
278+
pub fn into_stream<'a>(self) -> RedisMessStream<'a> {
279+
RedisMessStream {
280+
config: self.config,
281+
stream: self.receiver.into_stream(),
282+
handle: self.handle,
283+
read: false,
284+
}
285+
}
271286
}
272287

273288
pub(crate) async fn create_consumer(
+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use super::{ConsumerConfig, CtrlMsg, RedisConsumer};
2+
use crate::{RedisErr, RedisResult};
3+
use flume::{r#async::RecvStream, Sender};
4+
use sea_streamer_types::{export::futures::Stream, SharedMessage, StreamErr};
5+
use std::{fmt::Debug, pin::Pin, task::Poll};
6+
7+
pub struct RedisMessStream<'a> {
8+
pub(super) config: ConsumerConfig,
9+
pub(super) stream: RecvStream<'a, RedisResult<SharedMessage>>,
10+
pub(super) handle: Sender<CtrlMsg>,
11+
pub(super) read: bool,
12+
}
13+
14+
impl<'a> Debug for RedisMessStream<'a> {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
f.debug_struct("RedisMessStream").finish()
17+
}
18+
}
19+
20+
// logic must mirror that of sea-streamer-redis/src/consumer/future.rs
21+
22+
impl<'a> Stream for RedisMessStream<'a> {
23+
type Item = RedisResult<SharedMessage>;
24+
25+
fn poll_next(
26+
mut self: Pin<&mut Self>,
27+
cx: &mut std::task::Context<'_>,
28+
) -> Poll<Option<Self::Item>> {
29+
use std::task::Poll::{Pending, Ready};
30+
if !self.read && !self.config.pre_fetch {
31+
self.read = true;
32+
self.handle.try_send(CtrlMsg::Read).ok();
33+
}
34+
match Pin::new(&mut self.stream).poll_next(cx) {
35+
Ready(res) => match res {
36+
Some(Ok(msg)) => {
37+
if self.config.auto_ack
38+
&& RedisConsumer::auto_ack_static(&self.handle, msg.header()).is_err()
39+
{
40+
return Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied))));
41+
}
42+
self.read = false;
43+
Ready(Some(Ok(msg)))
44+
}
45+
Some(Err(err)) => Ready(Some(Err(err))),
46+
None => Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied)))),
47+
},
48+
Pending => Pending,
49+
}
50+
}
51+
}
52+
53+
impl<'a> Drop for RedisMessStream<'a> {
54+
fn drop(&mut self) {
55+
if self.read {
56+
self.handle.try_send(CtrlMsg::Unread).ok();
57+
}
58+
}
59+
}

sea-streamer-redis/tests/realtime.rs

+119-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use util::*;
66
#[cfg(feature = "test")]
77
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
88
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
9-
async fn main() -> anyhow::Result<()> {
9+
async fn realtime_1() -> anyhow::Result<()> {
1010
use sea_streamer_redis::{
1111
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
1212
};
@@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
1717
};
1818
use std::time::Duration;
1919

20-
const TEST: &str = "realtime";
20+
const TEST: &str = "realtime_1";
2121
env_logger::init();
2222
test(false).await?;
2323
test(true).await?;
@@ -135,3 +135,120 @@ async fn main() -> anyhow::Result<()> {
135135

136136
Ok(())
137137
}
138+
139+
#[cfg(feature = "test")]
140+
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
141+
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
142+
async fn realtime_2() -> anyhow::Result<()> {
143+
use sea_streamer_redis::{
144+
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisResult, RedisStreamer,
145+
};
146+
use sea_streamer_runtime::sleep;
147+
use sea_streamer_types::{
148+
export::futures::{Stream, StreamExt},
149+
Buffer, ConsumerMode, ConsumerOptions, Message, Producer, ShardId, SharedMessage,
150+
StreamKey, Streamer, StreamerUri, Timestamp,
151+
};
152+
use std::time::Duration;
153+
154+
const TEST: &str = "realtime_2";
155+
env_logger::init();
156+
test(false).await?;
157+
158+
async fn test(enable_cluster: bool) -> anyhow::Result<()> {
159+
println!("Enable Cluster = {enable_cluster} ...");
160+
161+
let mut options = RedisConnectOptions::default();
162+
options.set_enable_cluster(enable_cluster);
163+
let streamer = RedisStreamer::connect(
164+
std::env::var("BROKERS_URL")
165+
.unwrap_or_else(|_| "redis://localhost".to_owned())
166+
.parse::<StreamerUri>()
167+
.unwrap(),
168+
options,
169+
)
170+
.await?;
171+
println!("Connect Streamer ... ok");
172+
173+
let now = Timestamp::now_utc();
174+
let stream_key = StreamKey::new(format!(
175+
"{}-{}a",
176+
TEST,
177+
now.unix_timestamp_nanos() / 1_000_000
178+
))?;
179+
let zero = ShardId::new(0);
180+
181+
let mut producer = streamer.create_generic_producer(Default::default()).await?;
182+
183+
println!("Producing 0..5 ...");
184+
let mut sequence = 0;
185+
for i in 0..5 {
186+
let message = format!("{i}");
187+
let receipt = producer.send_to(&stream_key, message)?.await?;
188+
assert_eq!(receipt.stream_key(), &stream_key);
189+
// should always increase
190+
assert!(receipt.sequence() > &sequence);
191+
sequence = *receipt.sequence();
192+
assert_eq!(receipt.shard_id(), &zero);
193+
}
194+
195+
let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime);
196+
options.set_auto_stream_reset(AutoStreamReset::Latest);
197+
198+
let mut half = streamer
199+
.create_consumer(&[stream_key.clone()], options.clone())
200+
.await?
201+
.into_stream();
202+
203+
// Why do we have to wait? We want consumer to have started reading
204+
// before producing any messages. While after `create` returns the consumer
205+
// is ready (connection opened), there is still a small delay to send an `XREAD`
206+
// operation to the server.
207+
sleep(Duration::from_millis(5)).await;
208+
209+
println!("Producing 5..10 ...");
210+
for i in 5..10 {
211+
let message = format!("{i}");
212+
producer.send_to(&stream_key, message)?;
213+
}
214+
215+
println!("Flush producer ...");
216+
producer.flush().await?;
217+
218+
options.set_auto_stream_reset(AutoStreamReset::Earliest);
219+
let mut full = streamer
220+
.create_consumer(&[stream_key.clone()], options)
221+
.await?
222+
.into_stream();
223+
224+
let seq = stream_n(&mut half, 5).await?;
225+
assert_eq!(seq, [5, 6, 7, 8, 9]);
226+
println!("Stream latest ... ok");
227+
228+
let seq = stream_n(&mut full, 10).await?;
229+
assert_eq!(seq, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
230+
println!("Stream all ... ok");
231+
232+
println!("End test case.");
233+
Ok(())
234+
}
235+
236+
async fn stream_n<S: Stream<Item = RedisResult<SharedMessage>> + std::marker::Unpin>(
237+
stream: &mut S,
238+
num: usize,
239+
) -> anyhow::Result<Vec<usize>> {
240+
let mut numbers = Vec::new();
241+
for _ in 0..num {
242+
match stream.next().await {
243+
Some(mess) => {
244+
let mess = mess?;
245+
numbers.push(mess.message().as_str().unwrap().parse::<usize>().unwrap());
246+
}
247+
None => panic!("Stream ended?"),
248+
}
249+
}
250+
Ok(numbers)
251+
}
252+
253+
Ok(())
254+
}

0 commit comments

Comments
 (0)