Skip to content

Commit 2dd8090

Browse files
authored
feat: support kafka lane. (#1791)
1 parent d26a210 commit 2dd8090

File tree

30 files changed

+3989
-6
lines changed

30 files changed

+3989
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,4 @@
5353
- [feat: support traffic gray lane router](https://github.com/Tencent/spring-cloud-tencent/pull/1785)
5454
- [fix: fix NPE when application context is null #1787](https://github.com/Tencent/spring-cloud-tencent/pull/1787)
5555
- [fix: fix lane router property name.](https://github.com/Tencent/spring-cloud-tencent/pull/1789)
56+
- [feat: support kafka lane.](https://github.com/Tencent/spring-cloud-tencent/pull/1791)

spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/PolarisRefreshConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2424
import org.springframework.context.annotation.Bean;
2525
import org.springframework.context.annotation.Configuration;
26+
import org.springframework.core.env.Environment;
2627

2728
/**
2829
* Configuration for listening the change of service status.
@@ -41,8 +42,8 @@ public PolarisServiceStatusChangeListener polarisServiceChangeListener(ServiceIn
4142

4243
@Bean
4344
@ConditionalOnMissingBean
44-
public ServiceInstanceChangeCallbackManager serviceInstanceChangeCallbackManager() {
45-
return new ServiceInstanceChangeCallbackManager();
45+
public ServiceInstanceChangeCallbackManager serviceInstanceChangeCallbackManager(Environment environment) {
46+
return new ServiceInstanceChangeCallbackManager(environment);
4647
}
4748

4849
@Bean

spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/ServiceInstanceChangeCallbackManager.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.beans.factory.config.BeanPostProcessor;
3636
import org.springframework.boot.context.event.ApplicationReadyEvent;
3737
import org.springframework.context.ApplicationListener;
38+
import org.springframework.core.env.Environment;
3839
import org.springframework.util.CollectionUtils;
3940

4041
/**
@@ -50,7 +51,10 @@ public class ServiceInstanceChangeCallbackManager implements ApplicationListener
5051

5152
private final ScheduledThreadPoolExecutor serviceChangeListenerExecutor;
5253

53-
public ServiceInstanceChangeCallbackManager() {
54+
private final Environment environment;
55+
56+
public ServiceInstanceChangeCallbackManager(Environment environment) {
57+
this.environment = environment;
5458
this.serviceChangeListenerExecutor = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("service-change-listener"));
5559
}
5660

@@ -101,6 +105,19 @@ public synchronized Object postProcessAfterInitialization(Object bean, String be
101105
if (clz.isAnnotationPresent(ServiceInstanceChangeListener.class)) {
102106
ServiceInstanceChangeListener serviceInstanceChangeListener = clz.getAnnotation(ServiceInstanceChangeListener.class);
103107
serviceName = serviceInstanceChangeListener.serviceName();
108+
String message = null;
109+
try {
110+
serviceName = environment.resolveRequiredPlaceholders(serviceName);
111+
}
112+
catch (Exception e) {
113+
// resolve failed, reset service name.
114+
message = e.getMessage();
115+
serviceName = null;
116+
}
117+
if (StringUtils.isBlank(serviceName)) {
118+
LOG.warn("resolve service name failed, bean name:{}, config service name:{}, message:{}",
119+
beanName, serviceInstanceChangeListener.serviceName(), message);
120+
}
104121
}
105122

106123
if (StringUtils.isBlank(serviceName)) {
@@ -123,8 +140,8 @@ public synchronized Object postProcessAfterInitialization(Object bean, String be
123140

124141
@Override
125142
public void onApplicationEvent(@NonNull ApplicationReadyEvent event) {
126-
PolarisDiscoveryClient polarisDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisDiscoveryClient.class);
127-
PolarisReactiveDiscoveryClient polarisReactiveDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisReactiveDiscoveryClient.class);
143+
PolarisDiscoveryClient polarisDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisDiscoveryClient.class, false);
144+
PolarisReactiveDiscoveryClient polarisReactiveDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisReactiveDiscoveryClient.class, false);
128145
for (String serviceName : callbackMap.keySet()) {
129146
try {
130147
if (polarisDiscoveryClient != null) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
3+
*
4+
* Copyright (C) 2021 Tencent. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.cloud.polaris.discovery.refresh;
19+
20+
import java.lang.reflect.Field;
21+
import java.util.List;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
24+
import com.tencent.cloud.polaris.registry.PolarisAutoServiceRegistration;
25+
import com.tencent.polaris.api.pojo.Instance;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.ExtendWith;
28+
29+
import org.springframework.beans.BeansException;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.beans.factory.config.BeanPostProcessor;
32+
import org.springframework.boot.autoconfigure.SpringBootApplication;
33+
import org.springframework.boot.test.context.SpringBootTest;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.test.context.junit.jupiter.SpringExtension;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
39+
40+
/**
41+
* Test for {@link ServiceInstanceChangeCallback}.
42+
*/
43+
@ExtendWith(SpringExtension.class)
44+
@SpringBootTest(webEnvironment = RANDOM_PORT,
45+
classes = ServiceInstanceChangeCallbackTest.TestApplication.class,
46+
properties = {"spring.config.location = classpath:application-test.yml",
47+
"spring.main.web-application-type = servlet",
48+
"spring.cloud.gateway.enabled = false"})
49+
public class ServiceInstanceChangeCallbackTest {
50+
51+
@Autowired
52+
ServiceInstanceChangeCallbackManager serviceInstanceChangeCallbackManager;
53+
54+
@Test
55+
public void test1() {
56+
// Get callbackMap from serviceInstanceChangeCallbackManager via reflection
57+
try {
58+
Field callbackMapField = ServiceInstanceChangeCallbackManager.class.getDeclaredField("callbackMap");
59+
callbackMapField.setAccessible(true);
60+
ConcurrentHashMap<String, List<ServiceInstanceChangeCallback>> callbackMap =
61+
(ConcurrentHashMap<String, List<ServiceInstanceChangeCallback>>) callbackMapField.get(serviceInstanceChangeCallbackManager);
62+
63+
// Verify
64+
assertThat(callbackMap.containsKey("java_provider_test")).isTrue();
65+
assertThat(callbackMap.containsKey("QuickstartCalleeService")).isTrue();
66+
// ignore error and empty
67+
assertThat(callbackMap.size()).isEqualTo(2);
68+
69+
}
70+
catch (Exception e) {
71+
throw new RuntimeException("Failed to get callbackMap via reflection", e);
72+
}
73+
}
74+
75+
@SpringBootApplication
76+
protected static class TestApplication {
77+
78+
@Bean
79+
public SelfServiceChangeCallback selfServiceChangeCallback() {
80+
return new SelfServiceChangeCallback();
81+
}
82+
83+
@Bean
84+
public CalleeServiceChangeCallback calleeServiceChangeCallback() {
85+
return new CalleeServiceChangeCallback();
86+
}
87+
88+
@Bean
89+
public ErrorServiceChangeCallback errorServiceChangeCallback() {
90+
return new ErrorServiceChangeCallback();
91+
}
92+
93+
@Bean
94+
public EmptyServiceChangeCallback emptyServiceChangeCallback() {
95+
return new EmptyServiceChangeCallback();
96+
}
97+
98+
@Bean
99+
public ParsingEmptyServiceChangeCallback parseEmptyServiceChangeCallback() {
100+
return new ParsingEmptyServiceChangeCallback();
101+
}
102+
103+
@Bean
104+
public TestBeanPostProcessor testBeanPostProcessor() {
105+
return new TestBeanPostProcessor();
106+
}
107+
108+
}
109+
110+
@ServiceInstanceChangeListener(serviceName = "${spring.application.name}")
111+
static class SelfServiceChangeCallback implements ServiceInstanceChangeCallback {
112+
113+
@Override
114+
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
115+
116+
}
117+
}
118+
119+
@ServiceInstanceChangeListener(serviceName = "${error.name}")
120+
static class ErrorServiceChangeCallback implements ServiceInstanceChangeCallback {
121+
122+
@Override
123+
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
124+
125+
}
126+
}
127+
128+
@ServiceInstanceChangeListener(serviceName = "${test.empty}")
129+
static class ParsingEmptyServiceChangeCallback implements ServiceInstanceChangeCallback {
130+
131+
@Override
132+
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
133+
134+
}
135+
}
136+
137+
@ServiceInstanceChangeListener(serviceName = "")
138+
static class EmptyServiceChangeCallback implements ServiceInstanceChangeCallback {
139+
140+
@Override
141+
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
142+
143+
}
144+
}
145+
146+
@ServiceInstanceChangeListener(serviceName = "QuickstartCalleeService")
147+
static class CalleeServiceChangeCallback implements ServiceInstanceChangeCallback {
148+
149+
@Override
150+
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
151+
152+
}
153+
}
154+
155+
static class TestBeanPostProcessor implements BeanPostProcessor {
156+
@Override
157+
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
158+
if (bean instanceof PolarisAutoServiceRegistration) {
159+
return org.mockito.Mockito.mock(PolarisAutoServiceRegistration.class);
160+
}
161+
return bean;
162+
}
163+
}
164+
}

spring-cloud-starter-tencent-polaris-discovery/src/test/resources/application-test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@ spring:
3232
username: nacos
3333
password: nacos
3434
cluster-name: polaris
35+
test:
36+
empty:

spring-cloud-tencent-commons/src/main/java/com/tencent/cloud/common/util/ApplicationContextAwareUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,17 @@ public static <T> T getBean(Class<T> requiredType) {
9393
}
9494

9595
public static <T> T getBeanIfExists(Class<T> requiredType) {
96+
return getBeanIfExists(requiredType, false);
97+
}
98+
99+
public static <T> T getBeanIfExists(Class<T> requiredType, boolean warnIfFailed) {
96100
try {
97101
return applicationContext.getBean(requiredType);
98102
}
99103
catch (Throwable e) {
100-
LOGGER.warn("get bean failed, bean type: {}", requiredType.getName());
104+
if (warnIfFailed) {
105+
LOGGER.warn("get bean failed, bean type: {}", requiredType.getName());
106+
}
101107
return null;
102108
}
103109
}

spring-cloud-tencent-dependencies/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,12 @@
226226
<version>${revision}</version>
227227
</dependency>
228228

229+
<dependency>
230+
<groupId>com.tencent.cloud</groupId>
231+
<artifactId>spring-cloud-starter-tencent-mq-plugin</artifactId>
232+
<version>${revision}</version>
233+
</dependency>
234+
229235
<!-- third part framework dependencies -->
230236
<dependency>
231237
<groupId>org.springdoc</groupId>

spring-cloud-tencent-plugin-starters/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<module>spring-cloud-starter-tencent-traffic-mirroring-plugin</module>
2727
<module>spring-cloud-starter-tencent-fault-injection-plugin</module>
2828
<module>spring-cloud-starter-tencent-tsf-tls-plugin</module>
29+
<module>spring-cloud-starter-tencent-mq-plugin</module>
2930
</modules>
3031

3132
</project>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>spring-cloud-tencent-plugin-starters</artifactId>
7+
<groupId>com.tencent.cloud</groupId>
8+
<version>${revision}</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>spring-cloud-starter-tencent-mq-plugin</artifactId>
14+
<name>Spring Cloud Starter Tencent Message Queue plugin</name>
15+
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>com.tencent.cloud</groupId>
20+
<artifactId>spring-cloud-starter-tencent-polaris-discovery</artifactId>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>org.springframework.boot</groupId>
25+
<artifactId>spring-boot-starter-aop</artifactId>
26+
<exclusions>
27+
<exclusion>
28+
<groupId>org.springframework.boot</groupId>
29+
<artifactId>spring-boot-starter-logging</artifactId>
30+
</exclusion>
31+
<exclusion>
32+
<groupId>org.springframework.boot</groupId>
33+
<artifactId>spring-boot-starter</artifactId>
34+
</exclusion>
35+
</exclusions>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>org.springframework.kafka</groupId>
40+
<artifactId>spring-kafka</artifactId>
41+
<scope>provided</scope>
42+
<optional>true</optional>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.springframework.boot</groupId>
47+
<artifactId>spring-boot-starter-json</artifactId>
48+
<scope>provided</scope>
49+
<optional>true</optional>
50+
</dependency>
51+
52+
</dependencies>
53+
54+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
3+
*
4+
* Copyright (C) 2021 Tencent. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.cloud.plugin.mq.lane;
19+
20+
import java.util.List;
21+
22+
import com.tencent.cloud.common.metadata.MetadataContext;
23+
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
24+
import com.tencent.polaris.api.pojo.Instance;
25+
import com.tencent.polaris.api.pojo.RegistryCacheValue;
26+
import com.tencent.polaris.api.pojo.ServiceEventKey;
27+
import com.tencent.polaris.api.utils.StringUtils;
28+
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
29+
30+
public abstract class AbstractActiveLane extends AbstractResourceEventListener {
31+
32+
public abstract boolean ifConsume(String laneId, MqLaneProperties mqLaneProperties);
33+
34+
public abstract String getLaneHeaderKey();
35+
36+
public String formatLaneId(String laneId) {
37+
return laneId;
38+
}
39+
40+
public abstract void callback(List<Instance> currentServiceInstances);
41+
42+
@Override
43+
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
44+
45+
if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE
46+
&& newValue instanceof ServiceInstancesByProto
47+
&& StringUtils.equals(svcEventKey.getService(), MetadataContext.LOCAL_SERVICE)) {
48+
49+
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
50+
callback(newIns.getInstances());
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)