Skip to content

Commit b35dd01

Browse files
committed
Redis: into_stream
1 parent a5bf5ba commit b35dd01

File tree

3 files changed

+195
-4
lines changed

3 files changed

+195
-4
lines changed

sea-streamer-redis/src/consumer/mod.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ mod future;
33
mod node;
44
mod options;
55
mod shard;
6+
mod stream;
67

78
use cluster::*;
89
use future::StreamFuture;
910
pub use future::{NextFuture, StreamFuture as RedisMessageStream};
1011
use node::*;
1112
pub use options::*;
1213
use shard::*;
14+
pub use stream::*;
1315

1416
use flume::{bounded, unbounded, Receiver, Sender};
1517
use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};
@@ -209,10 +211,14 @@ impl RedisConsumer {
209211
}
210212
}
211213

214+
#[inline]
212215
fn auto_ack(&self, header: &MessageHeader) -> RedisResult<()> {
216+
Self::auto_ack_static(&self.handle, header)
217+
}
218+
219+
fn auto_ack_static(handle: &Sender<CtrlMsg>, header: &MessageHeader) -> RedisResult<()> {
213220
// unbounded, so never blocks
214-
if self
215-
.handle
221+
if handle
216222
.try_send(CtrlMsg::Ack(
217223
(header.stream_key().clone(), *header.shard_id()),
218224
get_message_id(header),
@@ -268,6 +274,15 @@ impl RedisConsumer {
268274
}
269275
Ok(())
270276
}
277+
278+
pub fn into_stream<'a>(self) -> RedisMessStream<'a> {
279+
RedisMessStream {
280+
config: self.config,
281+
stream: self.receiver.into_stream(),
282+
handle: self.handle,
283+
read: false,
284+
}
285+
}
271286
}
272287

273288
pub(crate) async fn create_consumer(
+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use super::{ConsumerConfig, CtrlMsg, RedisConsumer};
2+
use crate::{RedisErr, RedisResult};
3+
use flume::{r#async::RecvStream, Sender};
4+
use sea_streamer_types::{export::futures::Stream, SharedMessage, StreamErr};
5+
use std::{fmt::Debug, pin::Pin, task::Poll};
6+
7+
pub struct RedisMessStream<'a> {
8+
pub(super) config: ConsumerConfig,
9+
pub(super) stream: RecvStream<'a, RedisResult<SharedMessage>>,
10+
pub(super) handle: Sender<CtrlMsg>,
11+
pub(super) read: bool,
12+
}
13+
14+
impl<'a> Debug for RedisMessStream<'a> {
15+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16+
f.debug_struct("RedisMessStream").finish()
17+
}
18+
}
19+
20+
// logic must mirror that of sea-streamer-redis/src/consumer/future.rs
21+
22+
impl<'a> Stream for RedisMessStream<'a> {
23+
type Item = RedisResult<SharedMessage>;
24+
25+
fn poll_next(
26+
mut self: Pin<&mut Self>,
27+
cx: &mut std::task::Context<'_>,
28+
) -> Poll<Option<Self::Item>> {
29+
use std::task::Poll::{Pending, Ready};
30+
if !self.read && !self.config.pre_fetch {
31+
self.read = true;
32+
self.handle.try_send(CtrlMsg::Read).ok();
33+
}
34+
match Pin::new(&mut self.stream).poll_next(cx) {
35+
Ready(res) => match res {
36+
Some(Ok(msg)) => {
37+
if self.config.auto_ack
38+
&& RedisConsumer::auto_ack_static(&self.handle, msg.header()).is_err()
39+
{
40+
return Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied))));
41+
}
42+
self.read = false;
43+
Ready(Some(Ok(msg)))
44+
}
45+
Some(Err(err)) => Ready(Some(Err(err))),
46+
None => Ready(Some(Err(StreamErr::Backend(RedisErr::ConsumerDied)))),
47+
},
48+
Pending => Pending,
49+
}
50+
}
51+
}
52+
53+
impl<'a> Drop for RedisMessStream<'a> {
54+
fn drop(&mut self) {
55+
if self.read {
56+
self.handle.try_send(CtrlMsg::Unread).ok();
57+
}
58+
}
59+
}

sea-streamer-redis/tests/realtime.rs

+119-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use util::*;
66
#[cfg(feature = "test")]
77
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
88
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
9-
async fn main() -> anyhow::Result<()> {
9+
async fn realtime_1() -> anyhow::Result<()> {
1010
use sea_streamer_redis::{
1111
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisStreamer,
1212
};
@@ -16,7 +16,7 @@ async fn main() -> anyhow::Result<()> {
1616
};
1717
use std::time::Duration;
1818

19-
const TEST: &str = "realtime";
19+
const TEST: &str = "realtime_1";
2020
env_logger::init();
2121
test(false).await?;
2222
test(true).await?;
@@ -134,3 +134,120 @@ async fn main() -> anyhow::Result<()> {
134134

135135
Ok(())
136136
}
137+
138+
#[cfg(feature = "test")]
139+
#[cfg_attr(feature = "runtime-tokio", tokio::test)]
140+
#[cfg_attr(feature = "runtime-async-std", async_std::test)]
141+
async fn realtime_2() -> anyhow::Result<()> {
142+
use sea_streamer_redis::{
143+
AutoStreamReset, RedisConnectOptions, RedisConsumerOptions, RedisResult, RedisStreamer,
144+
};
145+
use sea_streamer_runtime::sleep;
146+
use sea_streamer_types::{
147+
export::futures::{Stream, StreamExt},
148+
Buffer, ConsumerMode, ConsumerOptions, Message, Producer, ShardId, SharedMessage,
149+
StreamKey, Streamer, Timestamp,
150+
};
151+
use std::time::Duration;
152+
153+
const TEST: &str = "realtime_2";
154+
env_logger::init();
155+
test(false).await?;
156+
157+
async fn test(enable_cluster: bool) -> anyhow::Result<()> {
158+
println!("Enable Cluster = {enable_cluster} ...");
159+
160+
let mut options = RedisConnectOptions::default();
161+
options.set_enable_cluster(enable_cluster);
162+
let streamer = RedisStreamer::connect(
163+
std::env::var("BROKERS_URL")
164+
.unwrap_or_else(|_| "redis://localhost".to_owned())
165+
.parse()
166+
.unwrap(),
167+
options,
168+
)
169+
.await?;
170+
println!("Connect Streamer ... ok");
171+
172+
let now = Timestamp::now_utc();
173+
let stream_key = StreamKey::new(format!(
174+
"{}-{}a",
175+
TEST,
176+
now.unix_timestamp_nanos() / 1_000_000
177+
))?;
178+
let zero = ShardId::new(0);
179+
180+
let mut producer = streamer.create_generic_producer(Default::default()).await?;
181+
182+
println!("Producing 0..5 ...");
183+
let mut sequence = 0;
184+
for i in 0..5 {
185+
let message = format!("{i}");
186+
let receipt = producer.send_to(&stream_key, message)?.await?;
187+
assert_eq!(receipt.stream_key(), &stream_key);
188+
// should always increase
189+
assert!(receipt.sequence() > &sequence);
190+
sequence = *receipt.sequence();
191+
assert_eq!(receipt.shard_id(), &zero);
192+
}
193+
194+
let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime);
195+
options.set_auto_stream_reset(AutoStreamReset::Latest);
196+
197+
let mut half = streamer
198+
.create_consumer(&[stream_key.clone()], options.clone())
199+
.await?
200+
.into_stream();
201+
202+
// Why do we have to wait? We want consumer to have started reading
203+
// before producing any messages. While after `create` returns the consumer
204+
// is ready (connection opened), there is still a small delay to send an `XREAD`
205+
// operation to the server.
206+
sleep(Duration::from_millis(5)).await;
207+
208+
println!("Producing 5..10 ...");
209+
for i in 5..10 {
210+
let message = format!("{i}");
211+
producer.send_to(&stream_key, message)?;
212+
}
213+
214+
println!("Flush producer ...");
215+
producer.flush().await?;
216+
217+
options.set_auto_stream_reset(AutoStreamReset::Earliest);
218+
let mut full = streamer
219+
.create_consumer(&[stream_key.clone()], options)
220+
.await?
221+
.into_stream();
222+
223+
let seq = stream_n(&mut half, 5).await?;
224+
assert_eq!(seq, [5, 6, 7, 8, 9]);
225+
println!("Stream latest ... ok");
226+
227+
let seq = stream_n(&mut full, 10).await?;
228+
assert_eq!(seq, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
229+
println!("Stream all ... ok");
230+
231+
println!("End test case.");
232+
Ok(())
233+
}
234+
235+
async fn stream_n<S: Stream<Item = RedisResult<SharedMessage>> + std::marker::Unpin>(
236+
stream: &mut S,
237+
num: usize,
238+
) -> anyhow::Result<Vec<usize>> {
239+
let mut numbers = Vec::new();
240+
for _ in 0..num {
241+
match stream.next().await {
242+
Some(mess) => {
243+
let mess = mess?;
244+
numbers.push(mess.message().as_str().unwrap().parse::<usize>().unwrap());
245+
}
246+
None => panic!("Stream ended?"),
247+
}
248+
}
249+
Ok(numbers)
250+
}
251+
252+
Ok(())
253+
}

0 commit comments

Comments
 (0)