@@ -1834,6 +1834,137 @@ mod cluster_async {
1834
1834
) ;
1835
1835
}
1836
1836
1837
+ #[ test]
1838
+ #[ serial_test:: serial]
1839
+ fn test_async_cluster_refresh_topology_non_blocking ( ) {
1840
+ // # Test: Non-Head-of-Line Blocking During Slot Refresh
1841
+ //
1842
+ // ## Purpose
1843
+ // This test verifies that the implementation of `spawn_refresh_slots_task` successfully
1844
+ // prevents head-of-line blocking during cluster topology refresh operations. Prior to this
1845
+ // implementation, when a MOVED error triggered a topology refresh, all subsequent commands
1846
+ // would be blocked until the refresh completed, causing potential timeouts and performance
1847
+ // degradation.
1848
+ //
1849
+ // ## Background
1850
+ // In Valkey Cluster, when a client receives a MOVED error (indicating topology changes),
1851
+ // it needs to refresh its slot mapping. In the previous implementation, this would cause
1852
+ // all subsequent commands to wait for the refresh to complete, leading to head-of-line
1853
+ // blocking. The `spawn_refresh_slots_task` improvement moves this refresh operation to a
1854
+ // background task, allowing other commands to proceed concurrently.
1855
+ //
1856
+ // ## Test Strategy
1857
+ // The test employs the following strategy to verify non-blocking behavior:
1858
+ //
1859
+ // 1. **Trigger Slot Refresh**: Send a GET command to an incorrect route to trigger a MOVED
1860
+ // error, which initiates a background slot refresh operation.
1861
+ //
1862
+ // 2. **Delay Refresh Completion**: In the same pipeline, issue a CLIENT PAUSE command to
1863
+ // artificially delay the node's response during the refresh operation.
1864
+ //
1865
+ // 3. **Verify Non-Blocking Behavior**: While the first operation is still pending (due to
1866
+ // the pause), send a second GET command to a different node in the cluster.
1867
+ //
1868
+ // 4. **Assert Success**: The second GET should succeed immediately without waiting for
1869
+ // the first operation to complete, demonstrating that the slot refresh is truly
1870
+ // happening in the background.
1871
+ //
1872
+ // ## Expected Outcomes
1873
+ // - With the old implementation (blocking): The second GET would time out waiting for the
1874
+ // slot refresh to complete.
1875
+ // - With the new implementation (non-blocking): The second GET completes successfully while
1876
+ // the refresh is still in progress.
1877
+ //
1878
+ // ## Technical Implementation
1879
+ // - We use a 3-node cluster for a realistic test environment
1880
+ // - The response timeout is set to match the CLIENT PAUSE duration (2000ms)
1881
+ // - We ensure test keys map to different nodes in the cluster
1882
+ // - A deliberate wrong route is chosen to guarantee a MOVED error
1883
+ // - The CLIENT PAUSE command ensures the refresh operation doesn't complete immediately
1884
+ // - Success of the test is determined by the second GET command returning without timeout
1885
+ let cluster = TestClusterContext :: new_with_cluster_client_builder (
1886
+ 3 , // Use 3 nodes for a realistic test scenario
1887
+ 0 ,
1888
+ |builder| {
1889
+ builder
1890
+ . retries ( 3 )
1891
+ . use_protocol ( ProtocolVersion :: RESP3 )
1892
+ . response_timeout ( Duration :: from_millis ( 2000 ) ) // Match with CLIENT PAUSE timeout
1893
+ } ,
1894
+ false ,
1895
+ ) ;
1896
+
1897
+ block_on_all ( async move {
1898
+ let mut connection = cluster. async_connection ( None ) . await ;
1899
+
1900
+ // 1. Set up initial data - create keys on two different nodes
1901
+ let key1 = "test_key1" ; // Will be used to trigger the MOVED error
1902
+ let key2 = "test_key2" ; // Will be used to test non-blocking behavior
1903
+
1904
+ // Calculate slots for our test keys to ensure they're on different nodes
1905
+ let slot1 = get_slot ( key1. as_bytes ( ) ) ;
1906
+ let slot2 = get_slot ( key2. as_bytes ( ) ) ;
1907
+
1908
+ // Ensure the keys are on different nodes
1909
+ assert_ne ! (
1910
+ slot1 / 5461 ,
1911
+ slot2 / 5461 ,
1912
+ "Test keys must be on different nodes for a valid test"
1913
+ ) ;
1914
+
1915
+ // Set initial values
1916
+ let _: ( ) = connection. set ( key1, "value1" ) . await ?;
1917
+ let _: ( ) = connection. set ( key2, "value2" ) . await ?;
1918
+
1919
+ // 2. Create a route that will definitely cause a MOVED error
1920
+ let wrong_route = Route :: new (
1921
+ ( slot1 + 6000 ) % 16384 , // Create a slot far from original to ensure MOVED error
1922
+ SlotAddr :: Master ,
1923
+ ) ;
1924
+
1925
+ // 3. Send the pipeline that will trigger background refresh and pause the node
1926
+ let mut pipe = redis:: pipe ( ) ;
1927
+ pipe. atomic ( )
1928
+ . cmd ( "GET" )
1929
+ . arg ( key1)
1930
+ . ignore ( )
1931
+ . cmd ( "CLIENT" )
1932
+ . arg ( "PAUSE" )
1933
+ . arg ( 2000 ) // Pause for 2 seconds
1934
+ . ignore ( ) ;
1935
+
1936
+ // Send pipeline to wrong route to trigger MOVED error and background refresh
1937
+ connection
1938
+ . route_pipeline (
1939
+ & pipe,
1940
+ 0 ,
1941
+ 2 ,
1942
+ Some ( SingleNodeRoutingInfo :: SpecificNode ( wrong_route) ) ,
1943
+ None ,
1944
+ )
1945
+ . await ?;
1946
+
1947
+ // 4. Immediately try to get the second key while refresh is happening in background
1948
+ // This command should not be blocked despite the ongoing refresh
1949
+ let result: RedisResult < String > = connection. get ( key2) . await ;
1950
+
1951
+ // 5. Assert that the command completed successfully without timing out
1952
+ assert ! (
1953
+ result. is_ok( ) ,
1954
+ "The second GET command should succeed while refresh is in progress"
1955
+ ) ;
1956
+
1957
+ assert_eq ! (
1958
+ result. unwrap( ) ,
1959
+ "value2" ,
1960
+ "The second GET command returned incorrect value"
1961
+ ) ;
1962
+
1963
+ Ok :: < _ , RedisError > ( ( ) )
1964
+ } )
1965
+ . unwrap ( ) ;
1966
+ }
1967
+
1837
1968
#[ test]
1838
1969
fn test_async_cluster_update_slots_based_on_moved_error_indicates_slot_migration ( ) {
1839
1970
// This test simulates the scenario where the client receives a MOVED error indicating that a key is now
0 commit comments