Skip to content

Commit 12fd29c

Browse files
authored
Merge branch 'master' into fix-6279
2 parents 1222b6a + cb3c582 commit 12fd29c

File tree

4 files changed

+427
-34
lines changed

4 files changed

+427
-34
lines changed

shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java

Lines changed: 89 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.shenyu.common.utils.Singleton;
2727
import org.apache.shenyu.loadbalancer.entity.Upstream;
2828

29+
import java.util.ArrayList;
2930
import java.util.HashSet;
3031
import java.util.List;
3132
import java.util.Map;
@@ -146,53 +147,110 @@ public void removeByKey(final String key) {
146147
*/
147148
public void submit(final String selectorId, final List<Upstream> upstreamList) {
148149
List<Upstream> actualUpstreamList = Objects.isNull(upstreamList) ? Lists.newArrayList() : upstreamList;
149-
actualUpstreamList.forEach(upstream -> {
150-
if (!upstream.isHealthCheckEnabled()) {
151-
upstream.setStatus(true);
152-
upstream.setHealthy(true);
153-
}
154-
});
150+
151+
// Check if the list is empty first to avoid unnecessary processing
152+
if (actualUpstreamList.isEmpty()) {
153+
List<Upstream> existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
154+
removeAllUpstreams(selectorId, existUpstreamList);
155+
return;
156+
}
157+
158+
initializeUpstreamHealthStatus(actualUpstreamList);
159+
155160
Map<Boolean, List<Upstream>> partitionedUpstreams = actualUpstreamList.stream()
156161
.collect(Collectors.partitioningBy(Upstream::isStatus));
157162
List<Upstream> validUpstreamList = partitionedUpstreams.get(true);
158163
List<Upstream> offlineUpstreamList = partitionedUpstreams.get(false);
159164
List<Upstream> existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
160165

161-
if (actualUpstreamList.isEmpty()) {
162-
existUpstreamList.forEach(up -> task.triggerRemoveOne(selectorId, up));
163-
}
166+
processOfflineUpstreams(selectorId, offlineUpstreamList, existUpstreamList);
167+
processValidUpstreams(selectorId, validUpstreamList, existUpstreamList);
168+
169+
List<Upstream> healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId);
170+
UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList);
171+
}
172+
173+
private void initializeUpstreamHealthStatus(final List<Upstream> upstreamList) {
174+
upstreamList.forEach(upstream -> {
175+
if (!upstream.isHealthCheckEnabled()) {
176+
upstream.setStatus(true);
177+
upstream.setHealthy(true);
178+
}
179+
});
180+
}
181+
182+
private void removeAllUpstreams(final String selectorId, final List<Upstream> existUpstreamList) {
183+
List<Upstream> toRemove = new ArrayList<>(existUpstreamList);
184+
toRemove.forEach(up -> task.triggerRemoveOne(selectorId, up));
185+
}
164186

165-
// Use a Set for O(1) lookups instead of nested loops
187+
private void processOfflineUpstreams(final String selectorId, final List<Upstream> offlineUpstreamList,
188+
final List<Upstream> existUpstreamList) {
189+
Map<String, Upstream> currentUnhealthyMap = getCurrentUnhealthyMap(selectorId);
166190
Set<Upstream> existUpstreamSet = new HashSet<>(existUpstreamList);
191+
167192
offlineUpstreamList.forEach(offlineUp -> {
193+
String key = upstreamMapKey(offlineUp);
168194
if (existUpstreamSet.contains(offlineUp)) {
169-
task.triggerRemoveOne(selectorId, offlineUp);
195+
if (currentUnhealthyMap.containsKey(key) && offlineUp.isHealthCheckEnabled()) {
196+
task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp);
197+
task.removeFromMap(task.getHealthyUpstream(), selectorId, offlineUp);
198+
} else {
199+
task.triggerRemoveOne(selectorId, offlineUp);
200+
}
201+
} else if (offlineUp.isHealthCheckEnabled()) {
202+
task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp);
203+
}
204+
});
205+
}
206+
207+
private void processValidUpstreams(final String selectorId, final List<Upstream> validUpstreamList,
208+
final List<Upstream> existUpstreamList) {
209+
if (validUpstreamList.isEmpty()) {
210+
return;
211+
}
212+
213+
updateExistingUpstreams(validUpstreamList, existUpstreamList);
214+
addNewUpstreams(selectorId, validUpstreamList, existUpstreamList);
215+
}
216+
217+
private void updateExistingUpstreams(final List<Upstream> validUpstreamList, final List<Upstream> existUpstreamList) {
218+
Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
219+
.collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing));
220+
221+
validUpstreamList.forEach(validUp -> {
222+
Upstream matchedExistUp = existUpstreamMap.get(upstreamMapKey(validUp));
223+
if (Objects.nonNull(matchedExistUp)) {
224+
matchedExistUp.setWeight(validUp.getWeight());
225+
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
226+
if (!matchedExistUp.isHealthCheckEnabled()) {
227+
matchedExistUp.setHealthy(true);
228+
}
170229
}
171230
});
231+
}
232+
233+
private void addNewUpstreams(final String selectorId, final List<Upstream> validUpstreamList,
234+
final List<Upstream> existUpstreamList) {
235+
Map<String, Upstream> currentUnhealthyMap = getCurrentUnhealthyMap(selectorId);
172236

173-
if (!validUpstreamList.isEmpty()) {
174-
// update upstream weight
175-
Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
176-
.collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing));
177-
validUpstreamList.forEach(validUp -> {
178-
String key = upstreamMapKey(validUp);
179-
Upstream matchedExistUp = existUpstreamMap.get(key);
180-
if (Objects.nonNull(matchedExistUp)) {
181-
matchedExistUp.setWeight(validUp.getWeight());
182-
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
183-
if (!matchedExistUp.isHealthCheckEnabled()) {
184-
matchedExistUp.setHealthy(true);
185-
}
237+
validUpstreamList.stream()
238+
.filter(validUp -> !existUpstreamList.contains(validUp))
239+
.forEach(up -> {
240+
Upstream prevUnhealthy = currentUnhealthyMap.get(upstreamMapKey(up));
241+
if (Objects.nonNull(prevUnhealthy)) {
242+
task.putToMap(task.getUnhealthyUpstream(), selectorId, up);
243+
} else {
244+
task.triggerAddOne(selectorId, up);
186245
}
187246
});
247+
}
188248

189-
validUpstreamList.stream()
190-
.filter(validUp -> !existUpstreamList.contains(validUp))
191-
.forEach(up -> task.triggerAddOne(selectorId, up));
192-
}
193-
194-
List<Upstream> healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId);
195-
UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList);
249+
private Map<String, Upstream> getCurrentUnhealthyMap(final String selectorId) {
250+
List<Upstream> currentUnhealthy = task.getUnhealthyUpstream().get(selectorId);
251+
return Objects.isNull(currentUnhealthy)
252+
? Maps.newConcurrentMap()
253+
: currentUnhealthy.stream().collect(Collectors.toMap(this::upstreamMapKey, u -> u, (a, b) -> a));
196254
}
197255

198256
private String upstreamMapKey(final Upstream upstream) {

shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private void finishHealthCheck() {
265265
public void triggerAddOne(final String selectorId, final Upstream upstream) {
266266
putToMap(healthyUpstream, selectorId, upstream);
267267
}
268-
268+
269269
/**
270270
* Remove a specific upstream via selectorId.
271271
*
@@ -277,7 +277,14 @@ public void triggerRemoveOne(final String selectorId, final Upstream upstream) {
277277
removeFromMap(unhealthyUpstream, selectorId, upstream);
278278
}
279279

280-
private void putToMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
280+
/**
281+
* Put upstream to specified map (for preserving health status).
282+
*
283+
* @param map the map to put upstream
284+
* @param selectorId the selector id
285+
* @param upstream the upstream
286+
*/
287+
public void putToMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
281288
synchronized (lock) {
282289
List<Upstream> list = MapUtils.computeIfAbsent(map, selectorId, k -> Lists.newArrayList());
283290
if (!list.contains(upstream)) {
@@ -286,7 +293,14 @@ private void putToMap(final Map<String, List<Upstream>> map, final String select
286293
}
287294
}
288295

289-
private void removeFromMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
296+
/**
297+
* Remove upstream from specified map.
298+
*
299+
* @param map the map to remove upstream from
300+
* @param selectorId the selector id
301+
* @param upstream the upstream
302+
*/
303+
public void removeFromMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
290304
synchronized (lock) {
291305
List<Upstream> list = map.get(selectorId);
292306
if (CollectionUtils.isNotEmpty(list)) {

shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.Objects;
3132

3233
/**
3334
* The type UpstreamCacheManager check task test.
@@ -106,4 +107,188 @@ public void testSubmitSyncsHealthCheckEnabled() {
106107
// Clean up
107108
upstreamCacheManager.removeByKey(testSelectorId);
108109
}
110+
111+
@Test
112+
@Order(6)
113+
public void testSubmitWithStatusFalsePreservesUnhealthyState() {
114+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
115+
final String testSelectorId = "PRESERVE_UNHEALTHY_TEST";
116+
117+
// First, submit healthy upstreams to establish baseline
118+
List<Upstream> initialList = new ArrayList<>(2);
119+
initialList.add(Upstream.builder()
120+
.protocol("http://")
121+
.url("upstream1:8080")
122+
.status(true)
123+
.healthCheckEnabled(true)
124+
.build());
125+
initialList.add(Upstream.builder()
126+
.protocol("http://")
127+
.url("upstream2:8080")
128+
.status(true)
129+
.healthCheckEnabled(true)
130+
.build());
131+
upstreamCacheManager.submit(testSelectorId, initialList);
132+
133+
// Simulate health check marking one as unhealthy
134+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
135+
if (Objects.nonNull(task)) {
136+
Upstream unhealthyUpstream = initialList.get(0);
137+
unhealthyUpstream.setHealthy(false);
138+
task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream);
139+
task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream);
140+
141+
// Verify it's in unhealthy map
142+
Assertions.assertNotNull(task.getUnhealthyUpstream().get(testSelectorId));
143+
Assertions.assertTrue(task.getUnhealthyUpstream().get(testSelectorId).stream()
144+
.anyMatch(u -> u.getUrl().equals("upstream1:8080")));
145+
}
146+
147+
// Now admin sends update with status=false for that upstream
148+
List<Upstream> updateList = new ArrayList<>(2);
149+
updateList.add(Upstream.builder()
150+
.protocol("http://")
151+
.url("upstream1:8080")
152+
.status(false)
153+
.healthCheckEnabled(true)
154+
.build());
155+
updateList.add(Upstream.builder()
156+
.protocol("http://")
157+
.url("upstream2:8080")
158+
.status(true)
159+
.healthCheckEnabled(true)
160+
.build());
161+
upstreamCacheManager.submit(testSelectorId, updateList);
162+
163+
// Verify: upstream1 should still be in unhealthy map (preserved state)
164+
if (Objects.nonNull(task)) {
165+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
166+
Assertions.assertNotNull(unhealthyList);
167+
Assertions.assertTrue(unhealthyList.stream()
168+
.anyMatch(u -> u.getUrl().equals("upstream1:8080")),
169+
"upstream1 should be preserved in unhealthy map");
170+
}
171+
172+
// Clean up
173+
upstreamCacheManager.removeByKey(testSelectorId);
174+
}
175+
176+
@Test
177+
@Order(7)
178+
public void testSubmitWithNewOfflineUpstreamAddedToUnhealthy() {
179+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
180+
final String testSelectorId = "NEW_OFFLINE_UNHEALTHY_TEST";
181+
182+
// Submit a list with a new upstream having status=false
183+
List<Upstream> upstreamList = new ArrayList<>(1);
184+
upstreamList.add(Upstream.builder()
185+
.protocol("http://")
186+
.url("new-upstream:8080")
187+
.status(false)
188+
.healthCheckEnabled(true)
189+
.build());
190+
upstreamCacheManager.submit(testSelectorId, upstreamList);
191+
192+
// Verify: new upstream with status=false should be in unhealthy map for monitoring
193+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
194+
if (Objects.nonNull(task)) {
195+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
196+
Assertions.assertNotNull(unhealthyList);
197+
Assertions.assertTrue(unhealthyList.stream()
198+
.anyMatch(u -> u.getUrl().equals("new-upstream:8080")),
199+
"New upstream with status=false should be in unhealthy map");
200+
}
201+
202+
// Clean up
203+
upstreamCacheManager.removeByKey(testSelectorId);
204+
}
205+
206+
@Test
207+
@Order(8)
208+
public void testSubmitPreservesUnhealthyForValidUpstream() {
209+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
210+
final String testSelectorId = "PRESERVE_UNHEALTHY_VALID_TEST";
211+
212+
// First submit and mark an upstream as unhealthy
213+
List<Upstream> initialList = new ArrayList<>(1);
214+
initialList.add(Upstream.builder()
215+
.protocol("http://")
216+
.url("recovering-upstream:8080")
217+
.status(true)
218+
.healthCheckEnabled(true)
219+
.build());
220+
upstreamCacheManager.submit(testSelectorId, initialList);
221+
222+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
223+
if (Objects.nonNull(task)) {
224+
// Manually mark as unhealthy
225+
Upstream unhealthyUpstream = initialList.get(0);
226+
unhealthyUpstream.setHealthy(false);
227+
task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream);
228+
task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream);
229+
230+
// Now admin sends update with status=true (valid) for the same upstream
231+
List<Upstream> updateList = new ArrayList<>(1);
232+
updateList.add(Upstream.builder()
233+
.protocol("http://")
234+
.url("recovering-upstream:8080")
235+
.status(true)
236+
.healthCheckEnabled(true)
237+
.build());
238+
upstreamCacheManager.submit(testSelectorId, updateList);
239+
240+
// Verify: should preserve unhealthy state since it was previously unhealthy
241+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
242+
Assertions.assertNotNull(unhealthyList);
243+
Assertions.assertTrue(unhealthyList.stream()
244+
.anyMatch(u -> u.getUrl().equals("recovering-upstream:8080")),
245+
"Previously unhealthy upstream should remain in unhealthy map");
246+
}
247+
248+
// Clean up
249+
upstreamCacheManager.removeByKey(testSelectorId);
250+
}
251+
252+
@Test
253+
@Order(9)
254+
public void testSubmitWithHealthCheckDisabledAndStatusFalse() {
255+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
256+
final String testSelectorId = "HEALTH_CHECK_DISABLED_STATUS_FALSE_TEST";
257+
258+
// Submit upstream with healthCheckEnabled=false and status=false
259+
// This upstream should be removed, not added to unhealthy map
260+
List<Upstream> upstreamList = new ArrayList<>(1);
261+
upstreamList.add(Upstream.builder()
262+
.protocol("http://")
263+
.url("no-check-upstream:8080")
264+
.status(false)
265+
.healthCheckEnabled(false)
266+
.build());
267+
upstreamCacheManager.submit(testSelectorId, upstreamList);
268+
269+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
270+
if (Objects.nonNull(task)) {
271+
// Verify: should NOT be in unhealthy map since health check is disabled
272+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
273+
Assertions.assertTrue(Objects.isNull(unhealthyList) || unhealthyList.isEmpty(),
274+
"Upstream with healthCheckEnabled=false should not be in unhealthy map");
275+
}
276+
277+
// Clean up
278+
upstreamCacheManager.removeByKey(testSelectorId);
279+
}
280+
281+
/**
282+
* Helper method to get the UpstreamCheckTask using reflection.
283+
*/
284+
private UpstreamCheckTask getUpstreamCheckTask(final UpstreamCacheManager manager) {
285+
try {
286+
java.lang.reflect.Field field = UpstreamCacheManager.class.getDeclaredField("task");
287+
field.setAccessible(true);
288+
return (UpstreamCheckTask) field.get(manager);
289+
} catch (Exception e) {
290+
// If reflection fails, return null
291+
return null;
292+
}
293+
}
109294
}

0 commit comments

Comments
 (0)