Skip to content

Commit 60eca66

Browse files
authored
Merge pull request #126 from SkyeBeFreeman/main
fix:fix service instance update bug when using multi-discovery.
2 parents 564b877 + 647195b commit 60eca66

File tree

7 files changed

+217
-81
lines changed

7 files changed

+217
-81
lines changed

polaris-common/polaris-model/src/main/java/com/tencent/polaris/client/pojo/ServiceInstancesByProto.java

Lines changed: 65 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,71 @@ public class ServiceInstancesByProto implements ServiceInstances, RegistryCacheV
6161

6262
private final int hashCode;
6363

64+
/**
65+
* 构造函数
66+
*
67+
* @param response 应答Proto
68+
* @param oldSvcInstances 旧的实例列表
69+
* @param loadFromFile 是否从缓存文件加载
70+
*/
71+
public ServiceInstancesByProto(ResponseProto.DiscoverResponse response, ServiceInstancesByProto oldSvcInstances,
72+
boolean loadFromFile) {
73+
this.service = response.getService();
74+
List<Instance> tmpInstances = new ArrayList<>();
75+
Map<String, InstanceByProto> tmpInstanceMap = new HashMap<>();
76+
Map<Node, InstanceByProto> tmpNodeMap = new HashMap<>();
77+
int totalWeight = 0;
78+
ServiceKey svcKey = new ServiceKey(this.service.getNamespace().getValue(), this.service.getName().getValue());
79+
// TODO 需要判断不同来源的实例列表覆盖情况
80+
if (CollectionUtils.isNotEmpty(response.getInstancesList())) {
81+
for (ServiceProto.Instance instance : response.getInstancesList()) {
82+
String instID = instance.getId().getValue();
83+
InstanceLocalValue instanceLocalValue = null;
84+
if (null != oldSvcInstances) {
85+
InstanceByProto oldInstance = oldSvcInstances.getInstance(instID);
86+
if (null != oldInstance) {
87+
instanceLocalValue = oldInstance.getInstanceLocalValue();
88+
}
89+
}
90+
if (null == instanceLocalValue) {
91+
//创建一个新的本地缓存实例
92+
instanceLocalValue = new DefaultInstanceLocalValue();
93+
}
94+
InstanceByProto targetInstance = new InstanceByProto(svcKey, instance, instanceLocalValue);
95+
totalWeight += targetInstance.getWeight();
96+
tmpInstances.add(targetInstance);
97+
tmpInstanceMap.put(instID, targetInstance);
98+
tmpNodeMap.put(new Node(targetInstance.getHost(), targetInstance.getPort()), targetInstance);
99+
}
100+
}
101+
Collections.sort(tmpInstances);
102+
hashCode = Objects.hash(svcKey, tmpInstances);
103+
this.svcKey = svcKey;
104+
this.instanceIdMap = Collections.unmodifiableMap(tmpInstanceMap);
105+
this.nodeMap = Collections.unmodifiableMap(tmpNodeMap);
106+
this.instances = Collections.unmodifiableList(tmpInstances);
107+
this.totalWeight = totalWeight;
108+
this.initialized = true;
109+
this.metadata = Collections.unmodifiableMap(this.service.getMetadataMap());
110+
this.loadedFromFile = loadFromFile;
111+
}
112+
113+
/**
114+
* 创建空的服务对象
115+
*/
116+
public ServiceInstancesByProto() {
117+
this.service = null;
118+
this.svcKey = null;
119+
this.initialized = false;
120+
this.instances = Collections.emptyList();
121+
this.instanceIdMap = Collections.emptyMap();
122+
this.nodeMap = Collections.emptyMap();
123+
this.metadata = Collections.emptyMap();
124+
this.loadedFromFile = false;
125+
this.totalWeight = 0;
126+
hashCode = Objects.hash(instances);
127+
}
128+
64129
@Override
65130
public ServiceKey getServiceKey() {
66131
return svcKey;
@@ -134,70 +199,6 @@ public InstanceByProto getInstanceByNode(Node node) {
134199
return nodeMap.get(node);
135200
}
136201

137-
/**
138-
* 构造函数
139-
*
140-
* @param response 应答Proto
141-
* @param oldSvcInstances 旧的实例列表
142-
* @param loadFromFile 是否从缓存文件加载
143-
*/
144-
public ServiceInstancesByProto(ResponseProto.DiscoverResponse response, ServiceInstancesByProto oldSvcInstances,
145-
boolean loadFromFile) {
146-
this.service = response.getService();
147-
List<Instance> tmpInstances = new ArrayList<>();
148-
Map<String, InstanceByProto> tmpInstanceMap = new HashMap<>();
149-
Map<Node, InstanceByProto> tmpNodeMap = new HashMap<>();
150-
int totalWeight = 0;
151-
ServiceKey svcKey = new ServiceKey(this.service.getNamespace().getValue(), this.service.getName().getValue());
152-
if (CollectionUtils.isNotEmpty(response.getInstancesList())) {
153-
for (ServiceProto.Instance instance : response.getInstancesList()) {
154-
String instID = instance.getId().getValue();
155-
InstanceLocalValue instanceLocalValue = null;
156-
if (null != oldSvcInstances) {
157-
InstanceByProto oldInstance = oldSvcInstances.getInstance(instID);
158-
if (null != oldInstance) {
159-
instanceLocalValue = oldInstance.getInstanceLocalValue();
160-
}
161-
}
162-
if (null == instanceLocalValue) {
163-
//创建一个新的本地缓存实例
164-
instanceLocalValue = new DefaultInstanceLocalValue();
165-
}
166-
InstanceByProto targetInstance = new InstanceByProto(svcKey, instance, instanceLocalValue);
167-
totalWeight += targetInstance.getWeight();
168-
tmpInstances.add(targetInstance);
169-
tmpInstanceMap.put(instID, targetInstance);
170-
tmpNodeMap.put(new Node(targetInstance.getHost(), targetInstance.getPort()), targetInstance);
171-
}
172-
}
173-
Collections.sort(tmpInstances);
174-
hashCode = Objects.hash(svcKey, tmpInstances);
175-
this.svcKey = svcKey;
176-
this.instanceIdMap = Collections.unmodifiableMap(tmpInstanceMap);
177-
this.nodeMap = Collections.unmodifiableMap(tmpNodeMap);
178-
this.instances = Collections.unmodifiableList(tmpInstances);
179-
this.totalWeight = totalWeight;
180-
this.initialized = true;
181-
this.metadata = Collections.unmodifiableMap(this.service.getMetadataMap());
182-
this.loadedFromFile = loadFromFile;
183-
}
184-
185-
/**
186-
* 创建空的服务对象
187-
*/
188-
public ServiceInstancesByProto() {
189-
this.service = null;
190-
this.svcKey = null;
191-
this.initialized = false;
192-
this.instances = Collections.emptyList();
193-
this.instanceIdMap = Collections.emptyMap();
194-
this.nodeMap = Collections.emptyMap();
195-
this.metadata = Collections.emptyMap();
196-
this.loadedFromFile = false;
197-
this.totalWeight = 0;
198-
hashCode = Objects.hash(instances);
199-
}
200-
201202
@Override
202203
@SuppressWarnings("checkstyle:all")
203204
public String toString() {

polaris-plugins/polaris-plugins-connector/connector-common/src/main/java/com/tencent/polaris/plugins/connector/common/DestroyableServerConnector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919

2020
import com.tencent.polaris.api.control.Destroyable;
2121
import com.tencent.polaris.api.plugin.server.ServerConnector;
22-
import com.tencent.polaris.api.pojo.DefaultInstance;
2322
import com.tencent.polaris.api.pojo.ServiceEventKey;
2423
import com.tencent.polaris.api.pojo.Services;
2524
import com.tencent.polaris.logging.LoggerFactory;
2625
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status;
27-
import java.util.List;
2826
import java.util.Map;
2927
import java.util.concurrent.ConcurrentHashMap;
3028
import org.slf4j.Logger;
@@ -84,7 +82,7 @@ public void addLongRunningTask(ServiceUpdateTask serviceUpdateTask) {
8482
* @param serviceUpdateTask
8583
* @return instance
8684
*/
87-
public List<DefaultInstance> syncGetServiceInstances(ServiceUpdateTask serviceUpdateTask) {
85+
public ServiceInstancesResponse syncGetServiceInstances(ServiceUpdateTask serviceUpdateTask) {
8886
return null;
8987
}
9088

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making Polaris available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.plugins.connector.common;
19+
20+
import com.tencent.polaris.api.pojo.DefaultInstance;
21+
import java.util.List;
22+
23+
/**
24+
* Response of GetServiceInstances request.
25+
*
26+
* @author Haotian Zhang
27+
*/
28+
public class ServiceInstancesResponse {
29+
30+
private String revision;
31+
32+
private List<DefaultInstance> serviceInstanceList;
33+
34+
public ServiceInstancesResponse(String revision, List<DefaultInstance> serviceInstanceList) {
35+
this.revision = revision;
36+
this.serviceInstanceList = serviceInstanceList;
37+
}
38+
39+
public String getRevision() {
40+
return revision;
41+
}
42+
43+
public void setRevision(String revision) {
44+
this.revision = revision;
45+
}
46+
47+
public List<DefaultInstance> getServiceInstanceList() {
48+
return serviceInstanceList;
49+
}
50+
51+
public void setServiceInstanceList(List<DefaultInstance> serviceInstanceList) {
52+
this.serviceInstanceList = serviceInstanceList;
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making Polaris available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.plugins.connector.composite;
19+
20+
import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL;
21+
import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC;
22+
23+
import com.google.common.collect.Lists;
24+
import com.tencent.polaris.api.utils.StringUtils;
25+
import java.util.List;
26+
27+
/**
28+
* Revision handler for multi-discovery server.
29+
*
30+
* @author Haotian Zhang
31+
*/
32+
public class CompositeRevision {
33+
34+
private static final List<String> ORDER_LIST = Lists.newArrayList(SERVER_CONNECTOR_GRPC, SERVER_CONNECTOR_CONSUL);
35+
36+
private static final String BIG_SEPARATOR = ";";
37+
38+
private static final String LIL_SEPARATOR = ":";
39+
40+
private final String[] content = new String[ORDER_LIST.size()];
41+
42+
/**
43+
* Set revision of corresponding server connector by name.
44+
*
45+
* @param name name of server connector
46+
* @param revision revision
47+
*/
48+
public void setRevision(String name, String revision) {
49+
if (ORDER_LIST.contains(name)) {
50+
content[ORDER_LIST.indexOf(name)] = revision;
51+
}
52+
}
53+
54+
/**
55+
* Generate composite revision string.
56+
*
57+
* @return revision
58+
*/
59+
public String getCompositeRevisionString() {
60+
StringBuilder revision = new StringBuilder();
61+
for (int i = 0; i < ORDER_LIST.size(); i++) {
62+
if (StringUtils.isNotBlank(content[i])) {
63+
revision.append(ORDER_LIST.get(i));
64+
revision.append(LIL_SEPARATOR);
65+
revision.append(content[i]);
66+
revision.append(BIG_SEPARATOR);
67+
}
68+
}
69+
return revision.toString();
70+
}
71+
}

polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package com.tencent.polaris.plugins.connector.composite;
1919

20+
import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_GRPC;
21+
2022
import com.google.protobuf.BoolValue;
2123
import com.google.protobuf.StringValue;
2224
import com.google.protobuf.UInt32Value;
23-
import com.tencent.polaris.api.config.plugin.DefaultPlugins;
2425
import com.tencent.polaris.api.exception.ErrorCode;
2526
import com.tencent.polaris.api.exception.PolarisException;
2627
import com.tencent.polaris.api.plugin.server.ServerEvent;
@@ -35,6 +36,7 @@
3536
import com.tencent.polaris.client.pb.ServiceProto.Service;
3637
import com.tencent.polaris.logging.LoggerFactory;
3738
import com.tencent.polaris.plugins.connector.common.DestroyableServerConnector;
39+
import com.tencent.polaris.plugins.connector.common.ServiceInstancesResponse;
3840
import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask;
3941
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant.Status;
4042
import com.tencent.polaris.plugins.connector.grpc.GrpcServiceUpdateTask;
@@ -60,7 +62,7 @@ public CompositeServiceUpdateTask(ServiceEventHandler handler, DestroyableServer
6062
protected void execute() {
6163
CompositeConnector connector = (CompositeConnector) serverConnector;
6264
for (DestroyableServerConnector sc : connector.getServerConnectors()) {
63-
if (DefaultPlugins.SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) {
65+
if (SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) {
6466
GrpcServiceUpdateTask grpcServiceUpdateTask = new GrpcServiceUpdateTask(serviceEventHandler, sc);
6567
grpcServiceUpdateTask.execute(this);
6668
return;
@@ -91,13 +93,19 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
9193
if (EventType.INSTANCE.equals(serviceEventKey.getEventType())) {
9294
// Get instance information list except polaris.
9395
List<DefaultInstance> extendInstanceList = new ArrayList<>();
96+
CompositeRevision compositeRevision = new CompositeRevision();
97+
compositeRevision.setRevision(SERVER_CONNECTOR_GRPC,
98+
discoverResponse.getService().getRevision().getValue());
9499
for (DestroyableServerConnector sc : connector.getServerConnectors()) {
95-
if (!DefaultPlugins.SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) {
96-
List<DefaultInstance> instanceList = sc.syncGetServiceInstances(this);
97-
if (extendInstanceList.isEmpty()) {
98-
extendInstanceList.addAll(instanceList);
99-
} else {
100-
// TODO 多数据源合并去重
100+
if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) {
101+
ServiceInstancesResponse serviceInstancesResponse = sc.syncGetServiceInstances(this);
102+
if (serviceInstancesResponse != null) {
103+
compositeRevision.setRevision(sc.getName(), serviceInstancesResponse.getRevision());
104+
if (extendInstanceList.isEmpty()) {
105+
extendInstanceList.addAll(serviceInstancesResponse.getServiceInstanceList());
106+
} else {
107+
// TODO 多数据源合并去重
108+
}
101109
}
102110
}
103111
}
@@ -124,6 +132,10 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
124132
}
125133
newDiscoverResponseBuilder.addInstances(instanceBuilder.build());
126134
}
135+
Service.Builder newServiceBuilder = Service.newBuilder()
136+
.mergeFrom(newDiscoverResponseBuilder.getService());
137+
newServiceBuilder.setRevision(StringValue.of(compositeRevision.getCompositeRevisionString()));
138+
newDiscoverResponseBuilder.setService(newServiceBuilder.build());
127139
}
128140
if (!newDiscoverResponseBuilder.getInstancesList().isEmpty()) {
129141
serverEvent.setError(null);
@@ -132,7 +144,7 @@ public boolean notifyServerEvent(ServerEvent serverEvent) {
132144
// Get instance information list except polaris.
133145
List<ServiceInfo> extendServiceList = new ArrayList<>();
134146
for (DestroyableServerConnector sc : connector.getServerConnectors()) {
135-
if (!DefaultPlugins.SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) {
147+
if (!SERVER_CONNECTOR_GRPC.equals(sc.getName()) && sc.isDiscoveryEnable()) {
136148
Services services = sc.syncGetServices(this);
137149
if (extendServiceList.isEmpty()) {
138150
extendServiceList.addAll(services.getServices());

0 commit comments

Comments
 (0)