Skip to content

Move refresh slot to the background in the Data Path. #3851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 110 additions & 34 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,8 @@ struct Message<C: Sized> {
}

enum RecoverFuture {
RecoverSlots(BoxFuture<'static, RedisResult<()>>),
RefreshingSlots(JoinHandle<RedisResult<()>>),
ReconnectToInitialNodes(BoxFuture<'static, ()>),
Reconnect(BoxFuture<'static, ()>),
}

Expand Down Expand Up @@ -1570,6 +1571,20 @@ where
notifiers
}

fn spawn_refresh_slots_task(
inner: Arc<InnerCore<C>>,
policy: &RefreshPolicy,
) -> JoinHandle<RedisResult<()>> {
// Clone references for task
let inner_clone = inner.clone();
let policy_clone = policy.clone();

// Spawn the background task and return its handle
tokio::spawn(async move {
Self::refresh_slots_and_subscriptions_with_retries(inner_clone, &policy_clone).await
})
}

async fn aggregate_results(
receivers: Vec<(Option<String>, oneshot::Receiver<RedisResult<Response>>)>,
routing: &MultipleNodeRoutingInfo,
Expand Down Expand Up @@ -2713,36 +2728,93 @@ where
}

fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
trace!("entered poll_recovere");
trace!("entered poll_recover");

let recover_future = match &mut self.state {
ConnectionState::PollComplete => return Poll::Ready(Ok(())),
ConnectionState::Recover(future) => future,
};

match recover_future {
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
Ok(_) => {
trace!("Recovered!");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
}
Err(err) => {
trace!("Recover slots failed!");
let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)))
} else {
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
Self::refresh_slots_and_subscriptions_with_retries(
RecoverFuture::RefreshingSlots(handle) => {
// Check if the task has completed
match handle.now_or_never() {
Some(Ok(Ok(()))) => {
// Task succeeded
trace!("Slot refresh completed successfully!");
self.state = ConnectionState::PollComplete;
return Poll::Ready(Ok(()));
}
Some(Ok(Err(e))) => {
// Task completed but returned an engine error
trace!("Slot refresh failed: {:?}", e);

if e.kind() == ErrorKind::AllConnectionsUnavailable {
// If all connections unavailable, try reconnect
self.state =
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
self.inner.clone(),
)),
));
return Poll::Ready(Err(e));
} else {
// Retry refresh
let new_handle = Self::spawn_refresh_slots_task(
self.inner.clone(),
&RefreshPolicy::Throttable,
),
)))
};
self.state = next_state;
Poll::Ready(Err(err))
);
self.state = ConnectionState::Recover(RecoverFuture::RefreshingSlots(
new_handle,
));
return Poll::Ready(Ok(()));
}
}
Some(Err(join_err)) => {
if join_err.is_cancelled() {
// Task was intentionally aborted - don't treat as an error
trace!("Slot refresh task was aborted");
self.state = ConnectionState::PollComplete;
return Poll::Ready(Ok(()));
} else {
// Task panicked - try reconnecting to initial nodes as a recovery strategy
warn!("Slot refresh task panicked: {:?} - attempting recovery by reconnecting to initial nodes", join_err);

// TODO - consider a gracefully closing of the client
// Since a panic indicates a bug in the refresh logic,
// it might be safer to close the client entirely
self.state =
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
self.inner.clone(),
)),
));

// Report this critical error to clients
let err = RedisError::from((
ErrorKind::ClientError,
"Slot refresh task panicked",
format!("{:?}", join_err),
));
return Poll::Ready(Err(err));
}
}
None => {
// Task is still running
// Just continue and return Ok to not block poll_flush
}
}
},

// Always return Ready to not block poll_flush
Poll::Ready(Ok(()))
}
// Other cases remain unchanged
RecoverFuture::ReconnectToInitialNodes(ref mut future) => {
ready!(future.as_mut().poll(cx));
trace!("Reconnected to initial nodes");
self.state = ConnectionState::PollComplete;
Poll::Ready(Ok(()))
}
RecoverFuture::Reconnect(ref mut future) => {
ready!(future.as_mut().poll(cx));
trace!("Reconnected connections");
Expand Down Expand Up @@ -3016,12 +3088,21 @@ where
match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots => {
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
),
)));
// Spawn refresh task
let task_handle = ClusterConnInner::spawn_refresh_slots_task(
self.inner.clone(),
&RefreshPolicy::Throttable,
);

// Update state
self.state =
ConnectionState::Recover(RecoverFuture::RefreshingSlots(task_handle));
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state =
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(Box::pin(
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)));
}
PollFlushAction::Reconnect(addresses) => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
Expand All @@ -3034,11 +3115,6 @@ where
.map(|_| ()), // Convert Vec<Arc<Notify>> to () as it's not needed here
)));
}
PollFlushAction::ReconnectFromInitialConnections => {
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
)));
}
}
}
}
Expand Down
127 changes: 17 additions & 110 deletions glide-core/redis-rs/redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,119 +1670,28 @@ mod cluster_async {

#[test]
#[serial_test::serial]
fn test_async_cluster_refresh_topology_even_with_zero_retries() {
let name = "test_async_cluster_refresh_topology_even_with_zero_retries";

let should_refresh = atomic::AtomicBool::new(false);

let MockEnv {
runtime,
async_connection: mut connection,
handler: _handler,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0)
// Disable the rate limiter to refresh slots immediately on the MOVED error.
.slots_refresh_rate_limit(Duration::from_secs(0), 0),
name,
move |cmd: &[u8], port| {
if !should_refresh.load(atomic::Ordering::SeqCst) {
respond_startup(name, cmd)?;
}

if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") {
return Err(Ok(Value::SimpleString("OK".into())));
}

if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
return Err(Ok(Value::Array(vec![
Value::Array(vec![
Value::Int(0),
Value::Int(1),
Value::Array(vec![
Value::BulkString(name.as_bytes().to_vec()),
Value::Int(6379),
]),
]),
Value::Array(vec![
Value::Int(2),
Value::Int(16383),
Value::Array(vec![
Value::BulkString(name.as_bytes().to_vec()),
Value::Int(6380),
]),
]),
])));
}

if contains_slice(cmd, b"GET") {
let get_response = Err(Ok(Value::BulkString(b"123".to_vec())));
match port {
6380 => get_response,
// Respond that the key exists on a node that does not yet have a connection:
_ => {
// Should not attempt to refresh slots more than once:
assert!(!should_refresh.swap(true, Ordering::SeqCst));
Err(parse_redis_value(
format!("-MOVED 123 {name}:6380\r\n").as_bytes(),
))
}
}
} else {
panic!("unexpected command {cmd:?}")
}
},
);

let value = runtime.block_on(
cmd("GET")
.arg("test")
.query_async::<_, Option<i32>>(&mut connection),
);

// The user should receive an initial error, because there are no retries and the first request failed.
assert_eq!(
value,
Err(RedisError::from((
ErrorKind::Moved,
"An error was signalled by the server",
"test_async_cluster_refresh_topology_even_with_zero_retries:6380".to_string()
)))
);

let value = runtime.block_on(
cmd("GET")
.arg("test")
.query_async::<_, Option<i32>>(&mut connection),
);

assert_eq!(value, Ok(Some(123)));
}

#[test]
#[serial_test::serial]
fn test_async_cluster_refresh_topology_is_blocking() {
// Test: Head-of-Line Blocking During Slot Refresh
fn test_async_cluster_refresh_topology_is_not_blocking() {
// Test: Non-Head-of-Line Blocking During Slot Refresh
//
// This test verifies that during cluster topology refresh operations triggered by
// MOVED errors, the implementation exhibits head-of-line blocking behavior.
// When a client receives a MOVED error (indicating topology changes), it needs to
// refresh its slot mapping. This process blocks all subsequent commands until the
// refresh completes.
// MOVED errors, the implementation does not exhibit head-of-line blocking behavior.
// When a client receives a MOVED error (indicating topology changes), it refreshes
// its slot mapping in the background, allowing other commands to proceed concurrently.
//
// The test employs the following strategy to verify the blocking behavior:
// The test employs the following strategy to verify the non-blocking behavior:
//
// 1. Trigger Slot Refresh: Send a blocking BLPOP command that will receive a MOVED error when
// slot 0 is migrated, initiating a topology refresh operation.
//
// 2. Atomicly migrate slot and pause clients: Use SET SLOT and CLIENT PAUSE to artificially delay the node's
// response during the refresh operation.
//
// 3. Verify Blocking Behavior: While the refresh is in progress, send a GET command
// to a different node in the cluster and verify it times out due to being blocked.
// 3. Verify Non-Blocking Behavior: While the refresh is in progress, send a GET command
// to a different node in the cluster. Unlike the blocking implementation, this command
// should complete successfully without timing out.
//
// This test intentionally demonstrates how topology refresh operations can block
// subsequent commands, even those directed to healthy nodes in the cluster.
// This test intentionally demonstrates how topology refresh operations is no longer blocking
// subsequent commands.

// Create a cluster with 3 nodes
let cluster = TestClusterContext::new_with_cluster_client_builder(
Expand Down Expand Up @@ -1839,16 +1748,14 @@ mod cluster_async {
// This GET should time out as it's blocked by the topology refresh
let get_result = tokio::time::timeout(
Duration::from_millis(1000),
client1.get::<_, redis::Value>(other_shard_key),
client1.get::<_, String>(other_shard_key),
)
.await;

// Assert that we got a timeout error due to head-of-line blocking
assert!(get_result.is_err());
assert!(matches!(
get_result.unwrap_err(),
tokio::time::error::Elapsed { .. }
));
// Assert that the GET succeeded (no timeout or error)
assert!(get_result.is_ok());
let result = get_result.unwrap().unwrap();
assert_eq!(result, "value2");

true
});
Expand Down Expand Up @@ -1889,7 +1796,7 @@ mod cluster_async {

assert!(
result,
"The test should pass, demonstrating blocking behavior"
"The test should pass, demonstrating non blocking behavior"
);

Ok::<_, RedisError>(())
Expand Down
Loading