@@ -1668,163 +1668,6 @@ mod cluster_async {
1668
1668
}
1669
1669
}
1670
1670
1671
- #[ test]
1672
- #[ serial_test:: serial]
1673
- fn test_async_cluster_refresh_topology_even_with_zero_retries ( ) {
1674
- // # Test: Non-Head-of-Line Blocking During Slot Refresh
1675
- //
1676
- // This test verifies that the Redis Cluster client implementation does not exhibit head-of-line blocking
1677
- // when a slot refresh operation is in progress. When the client receives a MOVED error and begins
1678
- // refreshing its slot mappings, it should still be able to process other requests to different keys
1679
- // without waiting for the slot refresh to complete.
1680
- //
1681
- // ## Test Flow:
1682
- //
1683
- // 1. Send a GET request for key "test". This returns a MOVED error from port 6379, triggering a slot refresh.
1684
- //
1685
- // 2. Immediately send a GET request for key "foo". If the client properly avoids head-of-line
1686
- // blocking, this request should succeed immediately without waiting for the slot refresh.
1687
- //
1688
- // 3. Send a second GET request for key "test". Since the slot refresh is still in progress,
1689
- // this MUST still receive a MOVED error, confirming that the refresh is still ongoing.
1690
- //
1691
- // 4. Wait for slot refresh to complete.
1692
- //
1693
- // 5. Send a final GET request for key "test", which should now succeed as the slot refresh has completed.
1694
- let name = "test_async_cluster_refresh_topology_even_with_zero_retries" ;
1695
-
1696
- // Flag to track when we've done the initial GET that triggered MOVED
1697
- let initiated_refresh = atomic:: AtomicBool :: new ( false ) ;
1698
-
1699
- let MockEnv {
1700
- runtime,
1701
- async_connection : mut connection,
1702
- handler : _handler,
1703
- ..
1704
- } = MockEnv :: with_client_builder (
1705
- ClusterClient :: builder ( vec ! [ & * format!( "redis://{name}" ) ] ) . retries ( 0 )
1706
- // Disable the rate limiter to refresh slots immediately on the MOVED error.
1707
- . slots_refresh_rate_limit ( Duration :: from_secs ( 0 ) , 0 ) ,
1708
- name,
1709
- move |cmd : & [ u8 ] , port| {
1710
- if !initiated_refresh. load ( atomic:: Ordering :: SeqCst ) {
1711
- respond_startup ( name, cmd) ?;
1712
- }
1713
-
1714
- if contains_slice ( cmd, b"PING" ) || contains_slice ( cmd, b"SETNAME" ) {
1715
- return Err ( Ok ( Value :: SimpleString ( "OK" . into ( ) ) ) ) ;
1716
- }
1717
-
1718
- if contains_slice ( cmd, b"CLUSTER" ) && contains_slice ( cmd, b"SLOTS" ) {
1719
- return Err ( Ok ( Value :: Array ( vec ! [
1720
- Value :: Array ( vec![
1721
- Value :: Int ( 0 ) ,
1722
- Value :: Int ( 1 ) ,
1723
- Value :: Array ( vec![
1724
- Value :: BulkString ( name. as_bytes( ) . to_vec( ) ) ,
1725
- Value :: Int ( 6379 ) ,
1726
- ] ) ,
1727
- ] ) ,
1728
- Value :: Array ( vec![
1729
- Value :: Int ( 2 ) ,
1730
- Value :: Int ( 16383 ) ,
1731
- Value :: Array ( vec![
1732
- Value :: BulkString ( name. as_bytes( ) . to_vec( ) ) ,
1733
- Value :: Int ( 6380 ) ,
1734
- ] ) ,
1735
- ] ) ,
1736
- ] ) ) ) ;
1737
- }
1738
-
1739
- if contains_slice ( cmd, b"GET" ) {
1740
- if contains_slice ( cmd, b"test" ) {
1741
- // Mark that we've started the refresh process
1742
- initiated_refresh. store ( true , Ordering :: SeqCst ) ;
1743
- // Port-based routing for 'test' key
1744
- if port == 6379 {
1745
- Err ( parse_redis_value (
1746
- format ! ( "-MOVED 6918 {name}:6380\r \n " ) . as_bytes ( ) ,
1747
- ) )
1748
- } else if port == 6380 {
1749
- Err ( Ok ( Value :: BulkString ( b"test-value" . to_vec ( ) ) ) )
1750
- } else {
1751
- Err ( Ok ( Value :: Nil ) )
1752
- }
1753
- } else if contains_slice ( cmd, b"foo" ) {
1754
- // For 'foo' key, always succeed regardless of port
1755
- Err ( Ok ( Value :: BulkString ( b"foo-value" . to_vec ( ) ) ) )
1756
- } else {
1757
- panic ! ( "unexpected key" )
1758
- }
1759
- } else {
1760
- panic ! ( "unexpected command {cmd:?}" )
1761
- }
1762
- } ,
1763
- ) ;
1764
-
1765
- // STEP 1: First GET to 'test' triggers MOVED error and slot refresh
1766
- let value = runtime. block_on (
1767
- cmd ( "GET" )
1768
- . arg ( "test" )
1769
- . query_async :: < _ , String > ( & mut connection) ,
1770
- ) ;
1771
-
1772
- // This should return MOVED error
1773
- assert ! ( value. is_err( ) ) ;
1774
- assert_eq ! ( value. unwrap_err( ) . kind( ) , ErrorKind :: Moved ) ;
1775
-
1776
- // STEP 2: While slot refresh is happening, send request to 'foo'
1777
- let foo_value = runtime. block_on (
1778
- cmd ( "GET" )
1779
- . arg ( "foo" )
1780
- . query_async :: < _ , String > ( & mut connection) ,
1781
- ) ;
1782
-
1783
- // This should succeed immediately, demonstrating non-blocking behavior
1784
- assert_eq ! (
1785
- foo_value,
1786
- Ok ( "foo-value" . to_string( ) ) ,
1787
- "Request for 'foo' should succeed even during slot refresh"
1788
- ) ;
1789
-
1790
- // STEP 3: Try 'test' again to verify refresh is still ongoing
1791
- let second_test_value = runtime. block_on (
1792
- cmd ( "GET" )
1793
- . arg ( "test" )
1794
- . query_async :: < _ , String > ( & mut connection) ,
1795
- ) ;
1796
-
1797
- // This should still fail with MOVED error
1798
- assert ! (
1799
- second_test_value. is_err( ) ,
1800
- "Second request for 'test' should fail"
1801
- ) ;
1802
- assert_eq ! (
1803
- second_test_value. unwrap_err( ) . kind( ) ,
1804
- ErrorKind :: Moved ,
1805
- "Second request for 'test' should fail with MOVED error"
1806
- ) ;
1807
-
1808
- // STEP 4: Wait for slot refresh to complete
1809
- runtime. block_on ( async {
1810
- sleep ( futures_time:: time:: Duration :: from_millis ( 200 ) ) . await ;
1811
- } ) ;
1812
-
1813
- // STEP 5: Final request after refresh should work
1814
- let final_value = runtime. block_on (
1815
- cmd ( "GET" )
1816
- . arg ( "test" )
1817
- . query_async :: < _ , String > ( & mut connection) ,
1818
- ) ;
1819
-
1820
- // This should now succeed
1821
- assert_eq ! (
1822
- final_value,
1823
- Ok ( "test-value" . to_string( ) ) ,
1824
- "Request for 'test' after refresh should succeed"
1825
- ) ;
1826
- }
1827
-
1828
1671
#[ test]
1829
1672
#[ serial_test:: serial]
1830
1673
fn test_async_cluster_refresh_topology_is_not_blocking ( ) {
@@ -1917,7 +1760,7 @@ mod cluster_async {
1917
1760
true
1918
1761
} ) ;
1919
1762
1920
- // STEP 4: Give the BLPOP time to start blocking
1763
+ // STEP 4: Give the BLPOP time to start blocking - it will have 1500 ms to wait for MOVED
1921
1764
sleep ( futures_time:: time:: Duration :: from_millis ( 500 ) ) . await ;
1922
1765
1923
1766
// STEP 5: Trigger migration to cause MOVED error and delay the response
0 commit comments