diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index fb0f41b9e4d..b6df1a6f4a6 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -44,6 +44,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6195](https://github.com/apache/incubator-seata/pull/6195)] update the url in change log to apache/incubator-seata - [[#6200](https://github.com/apache/incubator-seata/pull/6200)] cancel required_status_checks - [[#6201](https://github.com/apache/incubator-seata/pull/6201)] restore required_status_checks kept to remove context validation +- [[#6206](https://github.com/apache/incubator-seata/pull/6206)] zk registry push empty protection optimize - [[#6218](https://github.com/apache/incubator-seata/pull/6218)] remove Seata-Docker link ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index e87037d13e4..83deff3ad0a 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -44,6 +44,7 @@ - [[#6195](https://github.com/apache/incubator-seata/pull/6195)] 更新 change log 中的 seata url 为 apache/incubator-seata - [[#6200](https://github.com/apache/incubator-seata/pull/6200)] 去除required_status_checks检查 - [[#6201](https://github.com/apache/incubator-seata/pull/6201)] 恢复required_status_checks但去除context校验 +- [[#6206](https://github.com/apache/incubator-seata/pull/6206)] zk 注册中心推空保护优化 - [[#6218](https://github.com/apache/incubator-seata/pull/6218)] 移除Seata-Docker链接 ### security: diff --git a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 0d26f7b6867..e104f5b4890 100644 --- a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import io.seata.common.ConfigurationKeys; import io.seata.common.util.CollectionUtils; import io.seata.common.util.NetUtil; import io.seata.common.util.StringUtils; @@ -193,7 +194,7 @@ List doLookup(String clusterName) throws Exception { lock = CLUSTER_LOCK.get(clusterName); } synchronized (lock) { - if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) { + if (!LISTENER_SERVICE_MAP.containsKey(clusterName) && isCurrentCluster(clusterName)) { boolean exist = getClientInstance().exists(ROOT_PATH + clusterName); if (!exist) { return null; @@ -285,18 +286,17 @@ private void recover() throws Exception { private void subscribeCluster(String cluster) throws Exception { subscribe(cluster, (parentPath, currentChilds) -> { String clusterName = parentPath.replace(ROOT_PATH, ""); - if (CollectionUtils.isEmpty(currentChilds) && CLUSTER_ADDRESS_MAP.get(clusterName) != null) { - CLUSTER_ADDRESS_MAP.remove(clusterName); - } else if (!CollectionUtils.isEmpty(currentChilds)) { - refreshClusterAddressMap(clusterName, currentChilds); - } + refreshClusterAddressMap(clusterName, currentChilds); }); } private void refreshClusterAddressMap(String clusterName, List instances) { List newAddressList = new ArrayList<>(); - if (instances == null) { - CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + if (CollectionUtils.isEmpty(instances)) { + if (!isCurrentCluster(clusterName)) { + CLUSTER_ADDRESS_MAP.remove(clusterName); + clearClusterListener(clusterName); + } return; } for (String path : instances) { @@ -310,6 +310,41 @@ private void refreshClusterAddressMap(String clusterName, List instances CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); } + /** + * + * clear cluster's zk listener + * + * @param clusterName clusterName + */ + private void clearClusterListener(String clusterName) { + List listeners = LISTENER_SERVICE_MAP.get(clusterName); + for (IZkChildListener listener : listeners) { + try { + unsubscribe(clusterName, listener); + } catch (Exception e) { + LOGGER.warn("unsubscribe cluster {} failed", clusterName); + } + } + } + + /*** + * + * check whether it is current cluster + * + * @param clusterName cluserName + */ + private boolean isCurrentCluster(String clusterName) { + String txServiceGroupName = ConfigurationFactory.getInstance() + .getConfig(ConfigurationKeys.TX_SERVICE_GROUP); + + if (StringUtils.isNotEmpty(txServiceGroupName)) { + String currentClusterName = getServiceGroup(txServiceGroupName); + return clusterName.equals(currentClusterName); + } + + return false; + } + private String getClusterName() { String clusterConfigName = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER); return FILE_CONFIG.getConfig(clusterConfigName); diff --git a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 9faaadf04d6..d259c9c5ea3 100644 --- a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -16,13 +16,10 @@ */ package io.seata.discovery.registry.zk; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.Lists; import io.seata.common.util.NetUtil; +import io.seata.config.Configuration; +import io.seata.config.ConfigurationFactory; import io.seata.config.exception.ConfigNotFoundException; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; @@ -32,6 +29,21 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; +import org.mockito.MockedStatic; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; /** */ @@ -67,6 +79,9 @@ public void buildZkTest() { @Test public void testAll() throws Exception { + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "default"); + service.register(new InetSocketAddress(NetUtil.getLocalAddress(), 33333)); Assertions.assertThrows(ConfigNotFoundException.class, new Executable() { @@ -103,4 +118,39 @@ public void execute() throws Throwable { service.unsubscribe("default", listener2); } + @Test + public void testPushEmptyProtection() throws Exception { + + service.subscribe("cluster", (s, list) -> {}); + + // mock config + MockedStatic configurationFactoryMockedStatic = mockStatic(ConfigurationFactory.class); + Configuration configuration = mock(Configuration.class); + when(configuration.getConfig(anyString())).thenReturn("cluster"); + configurationFactoryMockedStatic.when(ConfigurationFactory::getInstance).thenReturn(configuration); + + // set CLUSTER_ADDRESS_MAP + Field field = ZookeeperRegisterServiceImpl.class.getDeclaredField("CLUSTER_ADDRESS_MAP"); + field.setAccessible(true); + ConcurrentMap> CLUSTER_ADDRESS_MAP = (ConcurrentMap>)field.get(null); + CLUSTER_ADDRESS_MAP.put("cluster", Lists.newArrayList(NetUtil.toInetSocketAddress("127.0.0.1:8091"))); + + // invoke + Method refreshClusterAddressMapMethod = ZookeeperRegisterServiceImpl.class.getDeclaredMethod("refreshClusterAddressMap", String.class, List.class); + refreshClusterAddressMapMethod.setAccessible(true); + refreshClusterAddressMapMethod.invoke(service, "cluster", null); + + // test the push empty protection situation + Assertions.assertEquals(1, CLUSTER_ADDRESS_MAP.get("cluster").size()); + + + when(configuration.getConfig(anyString())).thenReturn("mycluster"); + + refreshClusterAddressMapMethod.invoke(service, "cluster", null); + configurationFactoryMockedStatic.close(); + + // test the normal remove situation + Assertions.assertNull(CLUSTER_ADDRESS_MAP.get("cluster")); + } + }