Skip to content

Commit a8e775a

Browse files
committed
Adding e2e maint notifications tests covering node migration and rebind
1 parent 3b21ec0 commit a8e775a

File tree

3 files changed

+232
-27
lines changed

3 files changed

+232
-27
lines changed

tests/test_scenario/fault_injector_client.py

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def execute_migrate(
113113
endpoint_config: Dict[str, Any],
114114
target_node: str,
115115
empty_node: str,
116+
skip_end_notification: bool = False,
116117
) -> str:
117118
pass
118119

@@ -452,6 +453,7 @@ def execute_migrate(
452453
endpoint_config: Dict[str, Any],
453454
target_node: str,
454455
empty_node: str,
456+
skip_end_notification: bool = False,
455457
) -> str:
456458
"""Execute rladmin migrate command and wait for completion."""
457459
command = f"migrate node {target_node} all_shards target_node {empty_node}"
@@ -554,15 +556,20 @@ def __init__(self, oss_cluster: bool = False):
554556
self.proxy_helper.set_cluster_slots(
555557
self.CLUSTER_SLOTS_INTERCEPTOR_NAME, self.DEFAULT_CLUSTER_SLOTS
556558
)
557-
logging.info("Sleeping for 2 seconds to allow proxy to apply the changes...")
558-
time.sleep(2)
559559

560560
self.seq_id = 0
561561

562562
def _get_seq_id(self):
563563
self.seq_id += 1
564564
return self.seq_id
565565

566+
def get_operation_result(
567+
self,
568+
action_id: str,
569+
timeout: int = 60,
570+
) -> Dict[str, Any]:
571+
return {"status": "done"}
572+
566573
def find_target_node_and_empty_node(
567574
self,
568575
endpoint_config: Dict[str, Any],
@@ -633,7 +640,11 @@ def execute_failover(
633640
return {"status": "done"}
634641

635642
def execute_migrate(
636-
self, endpoint_config: Dict[str, Any], target_node: str, empty_node: str
643+
self,
644+
endpoint_config: Dict[str, Any],
645+
target_node: str,
646+
empty_node: str,
647+
skip_end_notification: bool = False,
637648
) -> str:
638649
"""
639650
Simulate migrate command execution.
@@ -661,25 +672,27 @@ def execute_migrate(
661672
time.sleep(self.SLEEP_TIME_BETWEEN_START_END_NOTIFICATIONS)
662673

663674
if self.oss_cluster:
664-
# intercept cluster slots
665-
self.proxy_helper.set_cluster_slots(
666-
self.CLUSTER_SLOTS_INTERCEPTOR_NAME,
667-
[
668-
SlotsRange("127.0.0.1", self.NODE_PORT_2, 0, 200),
669-
SlotsRange("127.0.0.1", self.NODE_PORT_1, 201, 8191),
670-
SlotsRange("127.0.0.1", self.NODE_PORT_2, 8192, 16383),
671-
],
672-
)
673-
# send smigrated
674-
end_maint_notif = RespTranslator.oss_maint_notification_to_resp(
675-
f"SMIGRATED {self._get_seq_id()} 127.0.0.1:{self.NODE_PORT_2} 0-200"
676-
)
675+
if not skip_end_notification:
676+
# intercept cluster slots
677+
self.proxy_helper.set_cluster_slots(
678+
self.CLUSTER_SLOTS_INTERCEPTOR_NAME,
679+
[
680+
SlotsRange("127.0.0.1", self.NODE_PORT_2, 0, 200),
681+
SlotsRange("127.0.0.1", self.NODE_PORT_1, 201, 8191),
682+
SlotsRange("127.0.0.1", self.NODE_PORT_2, 8192, 16383),
683+
],
684+
)
685+
# send smigrated
686+
end_maint_notif = RespTranslator.oss_maint_notification_to_resp(
687+
f"SMIGRATED {self._get_seq_id()} 127.0.0.1:{self.NODE_PORT_2} 0-200"
688+
)
689+
self.proxy_helper.send_notification(self.NODE_PORT_1, end_maint_notif)
677690
else:
678691
# send migrated
679692
end_maint_notif = RespTranslator.re_cluster_maint_notification_to_resp(
680693
f"MIGRATED {self._get_seq_id()} [1]"
681694
)
682-
self.proxy_helper.send_notification(self.NODE_PORT_1, end_maint_notif)
695+
self.proxy_helper.send_notification(self.NODE_PORT_1, end_maint_notif)
683696

684697
return "done"
685698

@@ -695,17 +708,15 @@ def execute_rebind(self, endpoint_config: Dict[str, Any], endpoint_id: str) -> s
695708
"""
696709
sleep_time = self.SLEEP_TIME_BETWEEN_START_END_NOTIFICATIONS
697710
if self.oss_cluster:
698-
# send smigrating
699-
maint_start_notif = RespTranslator.oss_maint_notification_to_resp(
700-
f"SMIGRATING {self._get_seq_id()} 0-8191"
701-
)
711+
# smigrating should be sent as part of the migrate flow
712+
pass
702713
else:
703714
# send moving
704715
sleep_time = self.MOVING_TTL
705716
maint_start_notif = RespTranslator.re_cluster_maint_notification_to_resp(
706717
f"MOVING {self._get_seq_id()} {sleep_time} 127.0.0.1:{self.NODE_PORT_3}"
707718
)
708-
self.proxy_helper.send_notification(self.NODE_PORT_1, maint_start_notif)
719+
self.proxy_helper.send_notification(self.NODE_PORT_1, maint_start_notif)
709720

710721
# sleep to allow the client to receive the notification
711722
time.sleep(sleep_time)
@@ -725,7 +736,7 @@ def execute_rebind(self, endpoint_config: Dict[str, Any], endpoint_id: str) -> s
725736
)
726737
self.proxy_helper.send_notification(self.NODE_PORT_1, smigrated_node_1)
727738
else:
728-
# TODO drop connections to node 1
739+
# TODO drop connections to node 1 to simulate that the node is removed
729740
pass
730741

731742
return "done"

tests/test_scenario/maint_notifications_helpers.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,12 @@ def execute_migrate(
127127
endpoint_config: Dict[str, Any],
128128
target_node: str,
129129
empty_node: str,
130+
skip_end_notification: bool = False,
130131
) -> str:
131132
"""Execute rladmin migrate command and wait for completion."""
132-
return fault_injector.execute_migrate(endpoint_config, target_node, empty_node)
133+
return fault_injector.execute_migrate(
134+
endpoint_config, target_node, empty_node, skip_end_notification
135+
)
133136

134137
@staticmethod
135138
def execute_rebind(

tests/test_scenario/test_maint_notifications.py

Lines changed: 194 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@ def _execute_migration(
7373
endpoints_config: Dict[str, Any],
7474
target_node: str,
7575
empty_node: str,
76+
skip_end_notification: bool = False,
7677
):
7778
migrate_action_id = ClusterOperations.execute_migrate(
7879
fault_injector=fault_injector_client,
7980
endpoint_config=endpoints_config,
8081
target_node=target_node,
8182
empty_node=empty_node,
83+
skip_end_notification=skip_end_notification,
8284
)
8385

8486
self._migration_executed = True
@@ -118,6 +120,7 @@ def _execute_migrate_bind_flow(
118120
endpoints_config=endpoints_config,
119121
target_node=target_node,
120122
empty_node=empty_node,
123+
skip_end_notification=True,
121124
)
122125
self._execute_bind(
123126
fault_injector_client=fault_injector_client,
@@ -1364,7 +1367,6 @@ def execute_commands(duration: int, errors: Queue):
13641367
)
13651368
logging.debug(f"{threading.current_thread().name}: Thread ended")
13661369

1367-
logging.info("Creating one connection in the pool.")
13681370
# get the node covering first shard - it is the node we will failover
13691371
target_node = (
13701372
cluster_client_maint_notifications.nodes_manager.get_node_from_slot(0)
@@ -1447,7 +1449,7 @@ def test_notification_handling_during_migration_without_node_replacement(
14471449
cluster_client_maint_notifications.nodes_manager.nodes_cache.copy()
14481450
)
14491451

1450-
logging.info("Executing failover command...")
1452+
logging.info("Executing migrate command...")
14511453
migration_thread = Thread(
14521454
target=self._execute_migration,
14531455
name="migration_thread",
@@ -1566,7 +1568,7 @@ def execute_commands(duration: int, errors: Queue):
15661568
thread.start()
15671569
threads.append(thread)
15681570

1569-
logging.info("Executing failover command...")
1571+
logging.info("Executing migrate command...")
15701572
migration_thread = Thread(
15711573
target=self._execute_migration,
15721574
name="migration_thread",
@@ -1605,3 +1607,192 @@ def execute_commands(duration: int, errors: Queue):
16051607

16061608
# validate no errors were raised in the command execution threads
16071609
assert errors.empty(), f"Errors occurred in threads: {errors.queue}"
1610+
1611+
@pytest.mark.timeout(300) # 5 minutes timeout for this test
1612+
def test_notification_handling_during_migration_and_re_bind(
1613+
self,
1614+
cluster_client_maint_notifications: RedisCluster,
1615+
fault_injector_client_oss_api: FaultInjectorClient,
1616+
cluster_endpoints_config: Dict[str, Any],
1617+
):
1618+
"""
1619+
Test the push notifications are received when executing re cluster operations.
1620+
1621+
"""
1622+
# get the node covering first shard - it is the node we will have migrated slots
1623+
target_node = (
1624+
cluster_client_maint_notifications.nodes_manager.get_node_from_slot(0)
1625+
)
1626+
logging.info(
1627+
f"Creating one connection in the pool using node {target_node.name}."
1628+
)
1629+
conn = target_node.redis_connection.connection_pool.get_connection()
1630+
cluster_nodes = (
1631+
cluster_client_maint_notifications.nodes_manager.nodes_cache.copy()
1632+
)
1633+
1634+
logging.info("Executing migrate and bind flow ...")
1635+
migrate_and_bind_thread = Thread(
1636+
target=self._execute_migrate_bind_flow,
1637+
name="migrate_and_bind_thread",
1638+
args=(
1639+
fault_injector_client_oss_api,
1640+
cluster_endpoints_config,
1641+
self.target_node.node_id,
1642+
self.empty_node.node_id,
1643+
self.endpoint_id,
1644+
),
1645+
)
1646+
migrate_and_bind_thread.start()
1647+
1648+
logging.info("Waiting for SMIGRATING push notifications...")
1649+
ClientValidations.wait_push_notification(
1650+
cluster_client_maint_notifications,
1651+
timeout=SMIGRATING_TIMEOUT,
1652+
connection=conn,
1653+
)
1654+
1655+
logging.info("Validating connection maintenance state...")
1656+
assert conn.maintenance_state == MaintenanceState.MAINTENANCE
1657+
assert conn._sock.gettimeout() == RELAXED_TIMEOUT
1658+
assert conn.should_reconnect() is False
1659+
1660+
assert len(cluster_nodes) == len(
1661+
cluster_client_maint_notifications.nodes_manager.nodes_cache
1662+
)
1663+
for node_key in cluster_nodes.keys():
1664+
assert (
1665+
node_key in cluster_client_maint_notifications.nodes_manager.nodes_cache
1666+
)
1667+
1668+
logging.info("Waiting for SMIGRATED push notifications...")
1669+
ClientValidations.wait_push_notification(
1670+
cluster_client_maint_notifications,
1671+
timeout=SMIGRATED_TIMEOUT,
1672+
connection=conn,
1673+
)
1674+
1675+
logging.info("Validating connection state after SMIGRATED ...")
1676+
1677+
assert conn.should_reconnect() is True
1678+
1679+
# the overall number of nodes should be the same - one removed and one added
1680+
assert len(cluster_nodes) == len(
1681+
cluster_client_maint_notifications.nodes_manager.nodes_cache
1682+
)
1683+
assert (
1684+
target_node.name
1685+
not in cluster_client_maint_notifications.nodes_manager.nodes_cache
1686+
)
1687+
1688+
logging.info("Releasing connection back to the pool...")
1689+
target_node.redis_connection.connection_pool.release(conn)
1690+
1691+
migrate_and_bind_thread.join()
1692+
1693+
@pytest.mark.timeout(300) # 5 minutes timeout for this test
1694+
def test_command_execution_during_migration_and_re_bind(
1695+
self,
1696+
fault_injector_client_oss_api: FaultInjectorClient,
1697+
cluster_endpoints_config: Dict[str, Any],
1698+
):
1699+
"""
1700+
Test the push notifications are received when executing re cluster operations.
1701+
"""
1702+
1703+
errors = Queue()
1704+
if isinstance(fault_injector_client_oss_api, ProxyServerFaultInjector):
1705+
execution_duration = 20
1706+
else:
1707+
execution_duration = 180
1708+
1709+
socket_timeout = 0.5
1710+
1711+
cluster_client_maint_notifications = _get_cluster_client_maint_notifications(
1712+
endpoints_config=cluster_endpoints_config,
1713+
disable_retries=True,
1714+
socket_timeout=socket_timeout,
1715+
enable_maintenance_notifications=True,
1716+
)
1717+
1718+
def execute_commands(duration: int, errors: Queue):
1719+
start = time.time()
1720+
while time.time() - start < duration:
1721+
try:
1722+
# the slot is covered by the first shard - this one will have slots migrated
1723+
cluster_client_maint_notifications.set("key:{3}", "value")
1724+
cluster_client_maint_notifications.get("key:{3}")
1725+
# execute also commands that will run on the second shard
1726+
cluster_client_maint_notifications.set("key:{0}", "value")
1727+
cluster_client_maint_notifications.get("key:{0}")
1728+
except Exception as e:
1729+
logging.error(
1730+
f"Error in thread {threading.current_thread().name}: {e}"
1731+
)
1732+
errors.put(
1733+
f"Command failed in thread {threading.current_thread().name}: {e}"
1734+
)
1735+
logging.debug(f"{threading.current_thread().name}: Thread ended")
1736+
1737+
# get the node covering first shard - it is the node we will migrate and remove
1738+
target_node = (
1739+
cluster_client_maint_notifications.nodes_manager.get_node_from_slot(0)
1740+
)
1741+
1742+
cluster_nodes = (
1743+
cluster_client_maint_notifications.nodes_manager.nodes_cache.copy()
1744+
)
1745+
1746+
threads = []
1747+
for i in range(10):
1748+
thread = Thread(
1749+
target=execute_commands,
1750+
name=f"command_execution_thread_{i}",
1751+
args=(
1752+
execution_duration,
1753+
errors,
1754+
),
1755+
)
1756+
thread.start()
1757+
threads.append(thread)
1758+
1759+
logging.info("Executing migrate and bind flow...")
1760+
migrate_and_bind_thread = Thread(
1761+
target=self._execute_migrate_bind_flow,
1762+
name="migrate_and_bind_thread",
1763+
args=(
1764+
fault_injector_client_oss_api,
1765+
cluster_endpoints_config,
1766+
self.target_node.node_id,
1767+
self.empty_node.node_id,
1768+
self.endpoint_id,
1769+
),
1770+
)
1771+
migrate_and_bind_thread.start()
1772+
1773+
for thread in threads:
1774+
thread.join()
1775+
1776+
migrate_and_bind_thread.join()
1777+
1778+
# validate cluster nodes
1779+
assert len(cluster_nodes) == len(
1780+
cluster_client_maint_notifications.nodes_manager.nodes_cache
1781+
)
1782+
assert (
1783+
target_node.name
1784+
not in cluster_client_maint_notifications.nodes_manager.nodes_cache
1785+
)
1786+
1787+
for (
1788+
node
1789+
) in cluster_client_maint_notifications.nodes_manager.nodes_cache.values():
1790+
# validate connections settings
1791+
self._validate_default_state(
1792+
node.redis_connection,
1793+
expected_matching_conns_count=10,
1794+
configured_timeout=socket_timeout,
1795+
)
1796+
1797+
# validate no errors were raised in the command execution threads
1798+
assert errors.empty(), f"Errors occurred in threads: {errors.queue}"

0 commit comments

Comments
 (0)