Skip to content

Commit c010455

Browse files
Johan Rylandernihohit
Johan Rylander
authored andcommitted
Add with_config to able to add timeouts when using sentinel client
1 parent e9b8049 commit c010455

File tree

4 files changed

+267
-29
lines changed

4 files changed

+267
-29
lines changed

redis/src/client.rs

+86-26
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,42 @@ impl Client {
6666
}
6767
}
6868

69+
/// Options for creation of async connection
70+
pub struct AsyncConnectionConfig {
71+
/// Maximum time to wait for a response from the server
72+
response_timeout: Option<std::time::Duration>,
73+
/// Maximum time to wait for a connection to be established
74+
connection_timeout: Option<std::time::Duration>,
75+
}
76+
77+
impl AsyncConnectionConfig {
78+
/// Creates a new instance of the options with nothing set
79+
pub fn new() -> Self {
80+
Self {
81+
response_timeout: None,
82+
connection_timeout: None,
83+
}
84+
}
85+
86+
/// Sets the connection timeout
87+
pub fn with_connection_timeout(mut self, connection_timeout: std::time::Duration) -> Self {
88+
self.connection_timeout = Some(connection_timeout);
89+
self
90+
}
91+
92+
/// Sets the response timeout
93+
pub fn with_response_timeout(mut self, response_timeout: std::time::Duration) -> Self {
94+
self.response_timeout = Some(response_timeout);
95+
self
96+
}
97+
}
98+
99+
impl Default for AsyncConnectionConfig {
100+
fn default() -> Self {
101+
Self::new()
102+
}
103+
}
104+
69105
/// To enable async support you need to chose one of the supported runtimes and active its
70106
/// corresponding feature: `tokio-comp` or `async-std-comp`
71107
#[cfg(feature = "aio")]
@@ -135,18 +171,8 @@ impl Client {
135171
pub async fn get_multiplexed_async_connection(
136172
&self,
137173
) -> RedisResult<crate::aio::MultiplexedConnection> {
138-
match Runtime::locate() {
139-
#[cfg(feature = "tokio-comp")]
140-
Runtime::Tokio => {
141-
self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(None)
142-
.await
143-
}
144-
#[cfg(feature = "async-std-comp")]
145-
Runtime::AsyncStd => {
146-
self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(None)
147-
.await
148-
}
149-
}
174+
self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
175+
.await
150176
}
151177

152178
/// Returns an async connection from the client.
@@ -159,27 +185,61 @@ impl Client {
159185
&self,
160186
response_timeout: std::time::Duration,
161187
connection_timeout: std::time::Duration,
188+
) -> RedisResult<crate::aio::MultiplexedConnection> {
189+
self.get_multiplexed_async_connection_with_config(
190+
&AsyncConnectionConfig::new()
191+
.with_connection_timeout(connection_timeout)
192+
.with_response_timeout(response_timeout),
193+
)
194+
.await
195+
}
196+
197+
/// Returns an async connection from the client.
198+
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
199+
#[cfg_attr(
200+
docsrs,
201+
doc(cfg(any(feature = "tokio-comp", feature = "async-std-comp")))
202+
)]
203+
pub async fn get_multiplexed_async_connection_with_config(
204+
&self,
205+
config: &AsyncConnectionConfig,
162206
) -> RedisResult<crate::aio::MultiplexedConnection> {
163207
let result = match Runtime::locate() {
164208
#[cfg(feature = "tokio-comp")]
165209
rt @ Runtime::Tokio => {
166-
rt.timeout(
167-
connection_timeout,
168-
self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(Some(
169-
response_timeout,
170-
)),
171-
)
172-
.await
210+
if let Some(connection_timeout) = config.connection_timeout {
211+
rt.timeout(
212+
connection_timeout,
213+
self.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
214+
config.response_timeout,
215+
),
216+
)
217+
.await
218+
} else {
219+
Ok(self
220+
.get_multiplexed_async_connection_inner::<crate::aio::tokio::Tokio>(
221+
config.response_timeout,
222+
)
223+
.await)
224+
}
173225
}
174226
#[cfg(feature = "async-std-comp")]
175227
rt @ Runtime::AsyncStd => {
176-
rt.timeout(
177-
connection_timeout,
178-
self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
179-
Some(response_timeout),
180-
),
181-
)
182-
.await
228+
if let Some(connection_timeout) = config.connection_timeout {
229+
rt.timeout(
230+
connection_timeout,
231+
self.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
232+
config.response_timeout,
233+
),
234+
)
235+
.await
236+
} else {
237+
Ok(self
238+
.get_multiplexed_async_connection_inner::<crate::aio::async_std::AsyncStd>(
239+
config.response_timeout,
240+
)
241+
.await)
242+
}
183243
}
184244
};
185245

redis/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
384384
#![cfg_attr(docsrs, feature(doc_cfg))]
385385

386386
// public api
387+
pub use crate::client::AsyncConnectionConfig;
387388
pub use crate::client::Client;
388389
pub use crate::cmd::{cmd, pack_command, pipe, Arg, Cmd, Iter};
389390
pub use crate::commands::{

redis/src/sentinel.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ use rand::Rng;
112112
#[cfg(feature = "aio")]
113113
use crate::aio::MultiplexedConnection as AsyncConnection;
114114

115+
use crate::client::AsyncConnectionConfig;
115116
use crate::{
116117
connection::ConnectionInfo, types::RedisResult, Client, Cmd, Connection, ErrorKind,
117118
FromRedisValue, IntoConnectionInfo, RedisConnectionInfo, TlsMode, Value,
@@ -766,7 +767,20 @@ impl SentinelClient {
766767
/// `SentinelClient::get_connection`.
767768
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
768769
pub async fn get_async_connection(&mut self) -> RedisResult<AsyncConnection> {
769-
let client = self.async_get_client().await?;
770-
client.get_multiplexed_async_connection().await
770+
self.get_async_connection_with_config(&AsyncConnectionConfig::new())
771+
.await
772+
}
773+
774+
/// Returns an async connection from the client with options, using the same logic from
775+
/// `SentinelClient::get_connection`.
776+
#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
777+
pub async fn get_async_connection_with_config(
778+
&mut self,
779+
config: &AsyncConnectionConfig,
780+
) -> RedisResult<AsyncConnection> {
781+
self.async_get_client()
782+
.await?
783+
.get_multiplexed_async_connection_with_config(config)
784+
.await
771785
}
772786
}

redis/tests/test_sentinel.rs

+164-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ pub mod async_tests {
238238
use redis::{
239239
aio::MultiplexedConnection,
240240
sentinel::{Sentinel, SentinelClient, SentinelNodeConnectionInfo},
241-
Client, ConnectionAddr, RedisError,
241+
AsyncConnectionConfig, Client, ConnectionAddr, RedisError,
242242
};
243243

244244
use crate::{assert_is_master_role, assert_replica_role_and_master_addr, support::*};
@@ -486,4 +486,167 @@ pub mod async_tests {
486486
})
487487
.unwrap();
488488
}
489+
490+
#[test]
491+
fn test_sentinel_client_async_with_connection_timeout() {
492+
let master_name = "master1";
493+
let mut context = TestSentinelContext::new(2, 3, 3);
494+
let mut master_client = SentinelClient::build(
495+
context.sentinels_connection_info().clone(),
496+
String::from(master_name),
497+
Some(context.sentinel_node_connection_info()),
498+
redis::sentinel::SentinelServerType::Master,
499+
)
500+
.unwrap();
501+
502+
let mut replica_client = SentinelClient::build(
503+
context.sentinels_connection_info().clone(),
504+
String::from(master_name),
505+
Some(context.sentinel_node_connection_info()),
506+
redis::sentinel::SentinelServerType::Replica,
507+
)
508+
.unwrap();
509+
510+
let connection_options =
511+
AsyncConnectionConfig::new().with_connection_timeout(std::time::Duration::from_secs(1));
512+
513+
block_on_all(async move {
514+
let mut master_con = master_client
515+
.get_async_connection_with_config(&connection_options)
516+
.await?;
517+
518+
async_assert_is_connection_to_master(&mut master_con).await;
519+
520+
let node_conn_info = context.sentinel_node_connection_info();
521+
let sentinel = context.sentinel_mut();
522+
let master_client = sentinel
523+
.async_master_for(master_name, Some(&node_conn_info))
524+
.await?;
525+
526+
// Read commands to the replica node
527+
for _ in 0..20 {
528+
let mut replica_con = replica_client
529+
.get_async_connection_with_config(&connection_options)
530+
.await?;
531+
532+
async_assert_connection_is_replica_of_correct_master(
533+
&mut replica_con,
534+
&master_client,
535+
)
536+
.await;
537+
}
538+
539+
Ok::<(), RedisError>(())
540+
})
541+
.unwrap();
542+
}
543+
544+
#[test]
545+
fn test_sentinel_client_async_with_response_timeout() {
546+
let master_name = "master1";
547+
let mut context = TestSentinelContext::new(2, 3, 3);
548+
let mut master_client = SentinelClient::build(
549+
context.sentinels_connection_info().clone(),
550+
String::from(master_name),
551+
Some(context.sentinel_node_connection_info()),
552+
redis::sentinel::SentinelServerType::Master,
553+
)
554+
.unwrap();
555+
556+
let mut replica_client = SentinelClient::build(
557+
context.sentinels_connection_info().clone(),
558+
String::from(master_name),
559+
Some(context.sentinel_node_connection_info()),
560+
redis::sentinel::SentinelServerType::Replica,
561+
)
562+
.unwrap();
563+
564+
let connection_options =
565+
AsyncConnectionConfig::new().with_response_timeout(std::time::Duration::from_secs(1));
566+
567+
block_on_all(async move {
568+
let mut master_con = master_client
569+
.get_async_connection_with_config(&connection_options)
570+
.await?;
571+
572+
async_assert_is_connection_to_master(&mut master_con).await;
573+
574+
let node_conn_info = context.sentinel_node_connection_info();
575+
let sentinel = context.sentinel_mut();
576+
let master_client = sentinel
577+
.async_master_for(master_name, Some(&node_conn_info))
578+
.await?;
579+
580+
// Read commands to the replica node
581+
for _ in 0..20 {
582+
let mut replica_con = replica_client
583+
.get_async_connection_with_config(&connection_options)
584+
.await?;
585+
586+
async_assert_connection_is_replica_of_correct_master(
587+
&mut replica_con,
588+
&master_client,
589+
)
590+
.await;
591+
}
592+
593+
Ok::<(), RedisError>(())
594+
})
595+
.unwrap();
596+
}
597+
598+
#[test]
599+
fn test_sentinel_client_async_with_timeouts() {
600+
let master_name = "master1";
601+
let mut context = TestSentinelContext::new(2, 3, 3);
602+
let mut master_client = SentinelClient::build(
603+
context.sentinels_connection_info().clone(),
604+
String::from(master_name),
605+
Some(context.sentinel_node_connection_info()),
606+
redis::sentinel::SentinelServerType::Master,
607+
)
608+
.unwrap();
609+
610+
let mut replica_client = SentinelClient::build(
611+
context.sentinels_connection_info().clone(),
612+
String::from(master_name),
613+
Some(context.sentinel_node_connection_info()),
614+
redis::sentinel::SentinelServerType::Replica,
615+
)
616+
.unwrap();
617+
618+
let connection_options = AsyncConnectionConfig::new()
619+
.with_connection_timeout(std::time::Duration::from_secs(1))
620+
.with_response_timeout(std::time::Duration::from_secs(1));
621+
622+
block_on_all(async move {
623+
let mut master_con = master_client
624+
.get_async_connection_with_config(&connection_options)
625+
.await?;
626+
627+
async_assert_is_connection_to_master(&mut master_con).await;
628+
629+
let node_conn_info = context.sentinel_node_connection_info();
630+
let sentinel = context.sentinel_mut();
631+
let master_client = sentinel
632+
.async_master_for(master_name, Some(&node_conn_info))
633+
.await?;
634+
635+
// Read commands to the replica node
636+
for _ in 0..20 {
637+
let mut replica_con = replica_client
638+
.get_async_connection_with_config(&connection_options)
639+
.await?;
640+
641+
async_assert_connection_is_replica_of_correct_master(
642+
&mut replica_con,
643+
&master_client,
644+
)
645+
.await;
646+
}
647+
648+
Ok::<(), RedisError>(())
649+
})
650+
.unwrap();
651+
}
489652
}

0 commit comments

Comments
 (0)