From ccf315782e175b24dabf2a1a9327d5a00a9dc657 Mon Sep 17 00:00:00 2001 From: laywin Date: Mon, 25 Dec 2023 17:13:54 +0800 Subject: [PATCH 1/3] zk push empty protect --- .../zk/ZookeeperRegisterServiceImpl.java | 51 +++++++++++++--- .../zk/ZookeeperRegisterServiceImplTest.java | 59 +++++++++++++++++-- 2 files changed, 96 insertions(+), 14 deletions(-) diff --git a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 0d26f7b6867..e104f5b4890 100644 --- a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import io.seata.common.ConfigurationKeys; import io.seata.common.util.CollectionUtils; import io.seata.common.util.NetUtil; import io.seata.common.util.StringUtils; @@ -193,7 +194,7 @@ List doLookup(String clusterName) throws Exception { lock = CLUSTER_LOCK.get(clusterName); } synchronized (lock) { - if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) { + if (!LISTENER_SERVICE_MAP.containsKey(clusterName) && isCurrentCluster(clusterName)) { boolean exist = getClientInstance().exists(ROOT_PATH + clusterName); if (!exist) { return null; @@ -285,18 +286,17 @@ private void recover() throws Exception { private void subscribeCluster(String cluster) throws Exception { subscribe(cluster, (parentPath, currentChilds) -> { String clusterName = parentPath.replace(ROOT_PATH, ""); - if (CollectionUtils.isEmpty(currentChilds) && CLUSTER_ADDRESS_MAP.get(clusterName) != null) { - CLUSTER_ADDRESS_MAP.remove(clusterName); - } else if (!CollectionUtils.isEmpty(currentChilds)) { - refreshClusterAddressMap(clusterName, currentChilds); - } + refreshClusterAddressMap(clusterName, currentChilds); }); } private void refreshClusterAddressMap(String clusterName, List instances) { List newAddressList = new ArrayList<>(); - if (instances == null) { - CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + if (CollectionUtils.isEmpty(instances)) { + if (!isCurrentCluster(clusterName)) { + CLUSTER_ADDRESS_MAP.remove(clusterName); + clearClusterListener(clusterName); + } return; } for (String path : instances) { @@ -310,6 +310,41 @@ private void refreshClusterAddressMap(String clusterName, List instances CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); } + /** + * + * clear cluster's zk listener + * + * @param clusterName clusterName + */ + private void clearClusterListener(String clusterName) { + List listeners = LISTENER_SERVICE_MAP.get(clusterName); + for (IZkChildListener listener : listeners) { + try { + unsubscribe(clusterName, listener); + } catch (Exception e) { + LOGGER.warn("unsubscribe cluster {} failed", clusterName); + } + } + } + + /*** + * + * check whether it is current cluster + * + * @param clusterName cluserName + */ + private boolean isCurrentCluster(String clusterName) { + String txServiceGroupName = ConfigurationFactory.getInstance() + .getConfig(ConfigurationKeys.TX_SERVICE_GROUP); + + if (StringUtils.isNotEmpty(txServiceGroupName)) { + String currentClusterName = getServiceGroup(txServiceGroupName); + return clusterName.equals(currentClusterName); + } + + return false; + } + private String getClusterName() { String clusterConfigName = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER); return FILE_CONFIG.getConfig(clusterConfigName); diff --git a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 9faaadf04d6..484b0cb6540 100644 --- a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -16,13 +16,10 @@ */ package io.seata.discovery.registry.zk; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.Lists; import io.seata.common.util.NetUtil; +import io.seata.config.Configuration; +import io.seata.config.ConfigurationFactory; import io.seata.config.exception.ConfigNotFoundException; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; @@ -32,6 +29,21 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; +import org.mockito.MockedStatic; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; /** */ @@ -103,4 +115,39 @@ public void execute() throws Throwable { service.unsubscribe("default", listener2); } + @Test + public void testPushEmptyProtection() throws Exception { + + service.subscribe("cluster", (s, list) -> {}); + + // mock config + MockedStatic configurationFactoryMockedStatic = mockStatic(ConfigurationFactory.class); + Configuration configuration = mock(Configuration.class); + when(configuration.getConfig(anyString())).thenReturn("cluster"); + configurationFactoryMockedStatic.when(ConfigurationFactory::getInstance).thenReturn(configuration); + + // set CLUSTER_ADDRESS_MAP + Field field = ZookeeperRegisterServiceImpl.class.getDeclaredField("CLUSTER_ADDRESS_MAP"); + field.setAccessible(true); + ConcurrentMap> CLUSTER_ADDRESS_MAP = (ConcurrentMap>)field.get(null); + CLUSTER_ADDRESS_MAP.put("cluster", Lists.newArrayList(NetUtil.toInetSocketAddress("127.0.0.1:8091"))); + + // invoke + Method refreshClusterAddressMapMethod = ZookeeperRegisterServiceImpl.class.getDeclaredMethod("refreshClusterAddressMap", String.class, List.class); + refreshClusterAddressMapMethod.setAccessible(true); + refreshClusterAddressMapMethod.invoke(service, "cluster", null); + + // test the push empty protection situation + Assertions.assertEquals(1, CLUSTER_ADDRESS_MAP.get("cluster").size()); + + + when(configuration.getConfig(anyString())).thenReturn("mycluster"); + + refreshClusterAddressMapMethod.invoke(service, "cluster", null); + configurationFactoryMockedStatic.close(); + + // test the normal remove situation + Assertions.assertEquals(0, CLUSTER_ADDRESS_MAP.get("cluster").size()); + } + } From 4640842c4cad46d3bcbd8762426bb0117bd29757 Mon Sep 17 00:00:00 2001 From: laywin Date: Mon, 25 Dec 2023 17:20:10 +0800 Subject: [PATCH 2/3] add change log --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + 2 files changed, 2 insertions(+) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 93dd17f4ffd..0055bbd2df7 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -43,6 +43,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6195](https://github.com/apache/incubator-seata/pull/6195)] update the url in change log to apache/incubator-seata - [[#6200](https://github.com/apache/incubator-seata/pull/6200)] cancel required_status_checks - [[#6201](https://github.com/apache/incubator-seata/pull/6201)] restore required_status_checks kept to remove context validation +- [[#6206](https://github.com/apache/incubator-seata/pull/6206)] zk registry push empty protection optimize ### security: - [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 632833f9473..32d92964d41 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -43,6 +43,7 @@ - [[#6195](https://github.com/apache/incubator-seata/pull/6195)] 更新 change log 中的 seata url 为 apache/incubator-seata - [[#6200](https://github.com/apache/incubator-seata/pull/6200)] 去除required_status_checks检查 - [[#6201](https://github.com/apache/incubator-seata/pull/6201)] 恢复required_status_checks但去除context校验 +- [[#6206](https://github.com/apache/incubator-seata/pull/6206)] zk 注册中心推空保护优化 ### security: - [[#6069](https://github.com/apache/incubator-seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞 From 0a47eff279544615fb23acb2700486147b0c6ab1 Mon Sep 17 00:00:00 2001 From: laywin Date: Mon, 25 Dec 2023 17:30:03 +0800 Subject: [PATCH 3/3] fix unit test --- .../registry/zk/ZookeeperRegisterServiceImplTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 484b0cb6540..d259c9c5ea3 100644 --- a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -79,6 +79,9 @@ public void buildZkTest() { @Test public void testAll() throws Exception { + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "default"); + service.register(new InetSocketAddress(NetUtil.getLocalAddress(), 33333)); Assertions.assertThrows(ConfigNotFoundException.class, new Executable() { @@ -147,7 +150,7 @@ public void testPushEmptyProtection() throws Exception { configurationFactoryMockedStatic.close(); // test the normal remove situation - Assertions.assertEquals(0, CLUSTER_ADDRESS_MAP.get("cluster").size()); + Assertions.assertNull(CLUSTER_ADDRESS_MAP.get("cluster")); } }