Skip to content

Commit 10a2a41

Browse files
authored
Move refresh slot to the background in the Data Path. (#3851)
* Move refresh slots to the bg instead of blocking in the poll_recover in the data path Signed-off-by: GilboaAWS <[email protected]> * Remove the timeout logic Signed-off-by: GilboaAWS <[email protected]> * Added test for non head of the line blocking Signed-off-by: GilboaAWS <[email protected]> * Added a new test for validating non head of the line blocking for refresh slots Signed-off-by: GilboaAWS <[email protected]> * print of ligs Signed-off-by: GilboaAWS <[email protected]> * The final test Signed-off-by: GilboaAWS <[email protected]> * Adjusted the test for Head of the line blocking to be non-blocking any more Signed-off-by: GilboaAWS <[email protected]> * remove chrono depend. and removed prints Signed-off-by: GilboaAWS <[email protected]> * remove old test that is not needed anymore Signed-off-by: GilboaAWS <[email protected]> * Address comments, removed the task handle and the retry count logic Signed-off-by: GilboaAWS <[email protected]> * Move task communication to track the task handle instead of oneshot channel Signed-off-by: GilboaAWS <[email protected]> * Changed logic on task abort and panic scenarios Signed-off-by: GilboaAWS <[email protected]> * Address comments, fix arg name to handle instead of channel Signed-off-by: GilboaAWS <[email protected]> --------- Signed-off-by: GilboaAWS <[email protected]>
1 parent 9763e60 commit 10a2a41

File tree

2 files changed

+127
-144
lines changed

2 files changed

+127
-144
lines changed

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 110 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,8 @@ struct Message<C: Sized> {
706706
}
707707

708708
enum RecoverFuture {
709-
RecoverSlots(BoxFuture<'static, RedisResult<()>>),
709+
RefreshingSlots(JoinHandle<RedisResult<()>>),
710+
ReconnectToInitialNodes(BoxFuture<'static, ()>),
710711
Reconnect(BoxFuture<'static, ()>),
711712
}
712713

@@ -1570,6 +1571,20 @@ where
15701571
notifiers
15711572
}
15721573

1574+
fn spawn_refresh_slots_task(
1575+
inner: Arc<InnerCore<C>>,
1576+
policy: &RefreshPolicy,
1577+
) -> JoinHandle<RedisResult<()>> {
1578+
// Clone references for task
1579+
let inner_clone = inner.clone();
1580+
let policy_clone = policy.clone();
1581+
1582+
// Spawn the background task and return its handle
1583+
tokio::spawn(async move {
1584+
Self::refresh_slots_and_subscriptions_with_retries(inner_clone, &policy_clone).await
1585+
})
1586+
}
1587+
15731588
async fn aggregate_results(
15741589
receivers: Vec<(Option<String>, oneshot::Receiver<RedisResult<Response>>)>,
15751590
routing: &MultipleNodeRoutingInfo,
@@ -2713,36 +2728,93 @@ where
27132728
}
27142729

27152730
fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
2716-
trace!("entered poll_recovere");
2731+
trace!("entered poll_recover");
2732+
27172733
let recover_future = match &mut self.state {
27182734
ConnectionState::PollComplete => return Poll::Ready(Ok(())),
27192735
ConnectionState::Recover(future) => future,
27202736
};
2737+
27212738
match recover_future {
2722-
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
2723-
Ok(_) => {
2724-
trace!("Recovered!");
2725-
self.state = ConnectionState::PollComplete;
2726-
Poll::Ready(Ok(()))
2727-
}
2728-
Err(err) => {
2729-
trace!("Recover slots failed!");
2730-
let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
2731-
ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
2732-
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
2733-
)))
2734-
} else {
2735-
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
2736-
Self::refresh_slots_and_subscriptions_with_retries(
2739+
RecoverFuture::RefreshingSlots(handle) => {
2740+
// Check if the task has completed
2741+
match handle.now_or_never() {
2742+
Some(Ok(Ok(()))) => {
2743+
// Task succeeded
2744+
trace!("Slot refresh completed successfully!");
2745+
self.state = ConnectionState::PollComplete;
2746+
return Poll::Ready(Ok(()));
2747+
}
2748+
Some(Ok(Err(e))) => {
2749+
// Task completed but returned an engine error
2750+
trace!("Slot refresh failed: {:?}", e);
2751+
2752+
if e.kind() == ErrorKind::AllConnectionsUnavailable {
2753+
// If all connections unavailable, try reconnect
2754+
self.state =
2755+
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(
2756+
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
2757+
self.inner.clone(),
2758+
)),
2759+
));
2760+
return Poll::Ready(Err(e));
2761+
} else {
2762+
// Retry refresh
2763+
let new_handle = Self::spawn_refresh_slots_task(
27372764
self.inner.clone(),
27382765
&RefreshPolicy::Throttable,
2739-
),
2740-
)))
2741-
};
2742-
self.state = next_state;
2743-
Poll::Ready(Err(err))
2766+
);
2767+
self.state = ConnectionState::Recover(RecoverFuture::RefreshingSlots(
2768+
new_handle,
2769+
));
2770+
return Poll::Ready(Ok(()));
2771+
}
2772+
}
2773+
Some(Err(join_err)) => {
2774+
if join_err.is_cancelled() {
2775+
// Task was intentionally aborted - don't treat as an error
2776+
trace!("Slot refresh task was aborted");
2777+
self.state = ConnectionState::PollComplete;
2778+
return Poll::Ready(Ok(()));
2779+
} else {
2780+
// Task panicked - try reconnecting to initial nodes as a recovery strategy
2781+
warn!("Slot refresh task panicked: {:?} - attempting recovery by reconnecting to initial nodes", join_err);
2782+
2783+
// TODO - consider a gracefully closing of the client
2784+
// Since a panic indicates a bug in the refresh logic,
2785+
// it might be safer to close the client entirely
2786+
self.state =
2787+
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(
2788+
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
2789+
self.inner.clone(),
2790+
)),
2791+
));
2792+
2793+
// Report this critical error to clients
2794+
let err = RedisError::from((
2795+
ErrorKind::ClientError,
2796+
"Slot refresh task panicked",
2797+
format!("{:?}", join_err),
2798+
));
2799+
return Poll::Ready(Err(err));
2800+
}
2801+
}
2802+
None => {
2803+
// Task is still running
2804+
// Just continue and return Ok to not block poll_flush
2805+
}
27442806
}
2745-
},
2807+
2808+
// Always return Ready to not block poll_flush
2809+
Poll::Ready(Ok(()))
2810+
}
2811+
// Other cases remain unchanged
2812+
RecoverFuture::ReconnectToInitialNodes(ref mut future) => {
2813+
ready!(future.as_mut().poll(cx));
2814+
trace!("Reconnected to initial nodes");
2815+
self.state = ConnectionState::PollComplete;
2816+
Poll::Ready(Ok(()))
2817+
}
27462818
RecoverFuture::Reconnect(ref mut future) => {
27472819
ready!(future.as_mut().poll(cx));
27482820
trace!("Reconnected connections");
@@ -3016,12 +3088,21 @@ where
30163088
match ready!(self.poll_complete(cx)) {
30173089
PollFlushAction::None => return Poll::Ready(Ok(())),
30183090
PollFlushAction::RebuildSlots => {
3019-
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
3020-
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
3021-
self.inner.clone(),
3022-
&RefreshPolicy::Throttable,
3023-
),
3024-
)));
3091+
// Spawn refresh task
3092+
let task_handle = ClusterConnInner::spawn_refresh_slots_task(
3093+
self.inner.clone(),
3094+
&RefreshPolicy::Throttable,
3095+
);
3096+
3097+
// Update state
3098+
self.state =
3099+
ConnectionState::Recover(RecoverFuture::RefreshingSlots(task_handle));
3100+
}
3101+
PollFlushAction::ReconnectFromInitialConnections => {
3102+
self.state =
3103+
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(Box::pin(
3104+
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
3105+
)));
30253106
}
30263107
PollFlushAction::Reconnect(addresses) => {
30273108
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
@@ -3034,11 +3115,6 @@ where
30343115
.map(|_| ()), // Convert Vec<Arc<Notify>> to () as it's not needed here
30353116
)));
30363117
}
3037-
PollFlushAction::ReconnectFromInitialConnections => {
3038-
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
3039-
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
3040-
)));
3041-
}
30423118
}
30433119
}
30443120
}

glide-core/redis-rs/redis/tests/test_cluster_async.rs

Lines changed: 17 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,119 +1670,28 @@ mod cluster_async {
16701670

16711671
#[test]
16721672
#[serial_test::serial]
1673-
fn test_async_cluster_refresh_topology_even_with_zero_retries() {
1674-
let name = "test_async_cluster_refresh_topology_even_with_zero_retries";
1675-
1676-
let should_refresh = atomic::AtomicBool::new(false);
1677-
1678-
let MockEnv {
1679-
runtime,
1680-
async_connection: mut connection,
1681-
handler: _handler,
1682-
..
1683-
} = MockEnv::with_client_builder(
1684-
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0)
1685-
// Disable the rate limiter to refresh slots immediately on the MOVED error.
1686-
.slots_refresh_rate_limit(Duration::from_secs(0), 0),
1687-
name,
1688-
move |cmd: &[u8], port| {
1689-
if !should_refresh.load(atomic::Ordering::SeqCst) {
1690-
respond_startup(name, cmd)?;
1691-
}
1692-
1693-
if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") {
1694-
return Err(Ok(Value::SimpleString("OK".into())));
1695-
}
1696-
1697-
if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
1698-
return Err(Ok(Value::Array(vec![
1699-
Value::Array(vec![
1700-
Value::Int(0),
1701-
Value::Int(1),
1702-
Value::Array(vec![
1703-
Value::BulkString(name.as_bytes().to_vec()),
1704-
Value::Int(6379),
1705-
]),
1706-
]),
1707-
Value::Array(vec![
1708-
Value::Int(2),
1709-
Value::Int(16383),
1710-
Value::Array(vec![
1711-
Value::BulkString(name.as_bytes().to_vec()),
1712-
Value::Int(6380),
1713-
]),
1714-
]),
1715-
])));
1716-
}
1717-
1718-
if contains_slice(cmd, b"GET") {
1719-
let get_response = Err(Ok(Value::BulkString(b"123".to_vec())));
1720-
match port {
1721-
6380 => get_response,
1722-
// Respond that the key exists on a node that does not yet have a connection:
1723-
_ => {
1724-
// Should not attempt to refresh slots more than once:
1725-
assert!(!should_refresh.swap(true, Ordering::SeqCst));
1726-
Err(parse_redis_value(
1727-
format!("-MOVED 123 {name}:6380\r\n").as_bytes(),
1728-
))
1729-
}
1730-
}
1731-
} else {
1732-
panic!("unexpected command {cmd:?}")
1733-
}
1734-
},
1735-
);
1736-
1737-
let value = runtime.block_on(
1738-
cmd("GET")
1739-
.arg("test")
1740-
.query_async::<_, Option<i32>>(&mut connection),
1741-
);
1742-
1743-
// The user should receive an initial error, because there are no retries and the first request failed.
1744-
assert_eq!(
1745-
value,
1746-
Err(RedisError::from((
1747-
ErrorKind::Moved,
1748-
"An error was signalled by the server",
1749-
"test_async_cluster_refresh_topology_even_with_zero_retries:6380".to_string()
1750-
)))
1751-
);
1752-
1753-
let value = runtime.block_on(
1754-
cmd("GET")
1755-
.arg("test")
1756-
.query_async::<_, Option<i32>>(&mut connection),
1757-
);
1758-
1759-
assert_eq!(value, Ok(Some(123)));
1760-
}
1761-
1762-
#[test]
1763-
#[serial_test::serial]
1764-
fn test_async_cluster_refresh_topology_is_blocking() {
1765-
// Test: Head-of-Line Blocking During Slot Refresh
1673+
fn test_async_cluster_refresh_topology_is_not_blocking() {
1674+
// Test: Non-Head-of-Line Blocking During Slot Refresh
17661675
//
17671676
// This test verifies that during cluster topology refresh operations triggered by
1768-
// MOVED errors, the implementation exhibits head-of-line blocking behavior.
1769-
// When a client receives a MOVED error (indicating topology changes), it needs to
1770-
// refresh its slot mapping. This process blocks all subsequent commands until the
1771-
// refresh completes.
1677+
// MOVED errors, the implementation does not exhibit head-of-line blocking behavior.
1678+
// When a client receives a MOVED error (indicating topology changes), it refreshes
1679+
// its slot mapping in the background, allowing other commands to proceed concurrently.
17721680
//
1773-
// The test employs the following strategy to verify the blocking behavior:
1681+
// The test employs the following strategy to verify the non-blocking behavior:
17741682
//
17751683
// 1. Trigger Slot Refresh: Send a blocking BLPOP command that will receive a MOVED error when
17761684
// slot 0 is migrated, initiating a topology refresh operation.
17771685
//
17781686
// 2. Atomicly migrate slot and pause clients: Use SET SLOT and CLIENT PAUSE to artificially delay the node's
17791687
// response during the refresh operation.
17801688
//
1781-
// 3. Verify Blocking Behavior: While the refresh is in progress, send a GET command
1782-
// to a different node in the cluster and verify it times out due to being blocked.
1689+
// 3. Verify Non-Blocking Behavior: While the refresh is in progress, send a GET command
1690+
// to a different node in the cluster. Unlike the blocking implementation, this command
1691+
// should complete successfully without timing out.
17831692
//
1784-
// This test intentionally demonstrates how topology refresh operations can block
1785-
// subsequent commands, even those directed to healthy nodes in the cluster.
1693+
// This test intentionally demonstrates how topology refresh operations is no longer blocking
1694+
// subsequent commands.
17861695

17871696
// Create a cluster with 3 nodes
17881697
let cluster = TestClusterContext::new_with_cluster_client_builder(
@@ -1839,16 +1748,14 @@ mod cluster_async {
18391748
// This GET should time out as it's blocked by the topology refresh
18401749
let get_result = tokio::time::timeout(
18411750
Duration::from_millis(1000),
1842-
client1.get::<_, redis::Value>(other_shard_key),
1751+
client1.get::<_, String>(other_shard_key),
18431752
)
18441753
.await;
18451754

1846-
// Assert that we got a timeout error due to head-of-line blocking
1847-
assert!(get_result.is_err());
1848-
assert!(matches!(
1849-
get_result.unwrap_err(),
1850-
tokio::time::error::Elapsed { .. }
1851-
));
1755+
// Assert that the GET succeeded (no timeout or error)
1756+
assert!(get_result.is_ok());
1757+
let result = get_result.unwrap().unwrap();
1758+
assert_eq!(result, "value2");
18521759

18531760
true
18541761
});
@@ -1889,7 +1796,7 @@ mod cluster_async {
18891796

18901797
assert!(
18911798
result,
1892-
"The test should pass, demonstrating blocking behavior"
1799+
"The test should pass, demonstrating non blocking behavior"
18931800
);
18941801

18951802
Ok::<_, RedisError>(())

0 commit comments

Comments
 (0)