Skip to content

Commit cb3c582

Browse files
guanzhenxingclaudeAias00Copilot847850277
authored
Fix: Preserve Gateway's independent upstream health check state when receiving config updates from Admin (#6274)
* Fix: Preserve unhealthy upstream state when receiving config updates from admin When admin publishes configuration updates with upstreams marked as status=false, the gateway should preserve their unhealthy state and continue health checking instead of completely removing them. This allows the gateway's independent health check to recover upstreams when they become healthy. Changes: - UpstreamCacheManager: Refactored submit() method to preserve unhealthy state for both status=true and status=false upstreams - Added processOfflineUpstreams() to handle status=false upstreams with health check enabled, keeping them in unhealthy map for monitoring - Added processValidUpstreams() to check if valid upstreams were previously unhealthy and preserve that status - UpstreamCheckTask: Made removeFromMap() public to support state preservation Co-Authored-By: Claude <noreply@anthropic.com> * Test: Add tests for upstream unhealthy state preservation Add comprehensive tests to verify the fix for preserving unhealthy upstream state when receiving config updates from admin. UpstreamCacheManagerTest: - testSubmitWithStatusFalsePreservesUnhealthyState: Verify that upstreams with status=false that were previously unhealthy remain in unhealthy map - testSubmitWithNewOfflineUpstreamAddedToUnhealthy: Verify new upstreams with status=false are added to unhealthy map for monitoring - testSubmitPreservesUnhealthyForValidUpstream: Verify valid upstreams that were previously unhealthy remain in unhealthy map - testSubmitWithHealthCheckDisabledAndStatusFalse: Verify upstreams with healthCheckEnabled=false are removed, not added to unhealthy map UpstreamCheckTaskTest: - testPutToMap: Test adding upstreams to healthy map - testPutToMapUnhealthy: Test adding upstreams to unhealthy map - testRemoveFromMap: Test removing upstreams from healthy map - testRemoveFromMapUnhealthy: Test removing upstreams from unhealthy map - testMoveUpstreamBetweenMaps: Test moving upstreams between maps Co-Authored-By: Claude <noreply@anthropic.com> * Update shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTaskTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Opt: Move empty list check before initialization to avoid unnecessary processing Move the empty list check to the beginning of submit() method to avoid calling initializeUpstreamHealthStatus() and stream partitioning when the upstream list is empty. This is a minor performance optimization that reduces unnecessary method calls and stream operations when processing empty upstream lists. Changes: - Move isEmpty() check before initializeUpstreamHealthStatus() - Add early return for empty lists - Remove redundant isEmpty() check after partitioning Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: aias00 <liuhongyu@apache.org> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: zhengpeng <847850277@qq.com>
1 parent a6b8de1 commit cb3c582

File tree

4 files changed

+427
-34
lines changed

4 files changed

+427
-34
lines changed

shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java

Lines changed: 89 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.shenyu.common.utils.Singleton;
2727
import org.apache.shenyu.loadbalancer.entity.Upstream;
2828

29+
import java.util.ArrayList;
2930
import java.util.HashSet;
3031
import java.util.List;
3132
import java.util.Map;
@@ -146,53 +147,110 @@ public void removeByKey(final String key) {
146147
*/
147148
public void submit(final String selectorId, final List<Upstream> upstreamList) {
148149
List<Upstream> actualUpstreamList = Objects.isNull(upstreamList) ? Lists.newArrayList() : upstreamList;
149-
actualUpstreamList.forEach(upstream -> {
150-
if (!upstream.isHealthCheckEnabled()) {
151-
upstream.setStatus(true);
152-
upstream.setHealthy(true);
153-
}
154-
});
150+
151+
// Check if the list is empty first to avoid unnecessary processing
152+
if (actualUpstreamList.isEmpty()) {
153+
List<Upstream> existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
154+
removeAllUpstreams(selectorId, existUpstreamList);
155+
return;
156+
}
157+
158+
initializeUpstreamHealthStatus(actualUpstreamList);
159+
155160
Map<Boolean, List<Upstream>> partitionedUpstreams = actualUpstreamList.stream()
156161
.collect(Collectors.partitioningBy(Upstream::isStatus));
157162
List<Upstream> validUpstreamList = partitionedUpstreams.get(true);
158163
List<Upstream> offlineUpstreamList = partitionedUpstreams.get(false);
159164
List<Upstream> existUpstreamList = MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> Lists.newArrayList());
160165

161-
if (actualUpstreamList.isEmpty()) {
162-
existUpstreamList.forEach(up -> task.triggerRemoveOne(selectorId, up));
163-
}
166+
processOfflineUpstreams(selectorId, offlineUpstreamList, existUpstreamList);
167+
processValidUpstreams(selectorId, validUpstreamList, existUpstreamList);
168+
169+
List<Upstream> healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId);
170+
UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList);
171+
}
172+
173+
private void initializeUpstreamHealthStatus(final List<Upstream> upstreamList) {
174+
upstreamList.forEach(upstream -> {
175+
if (!upstream.isHealthCheckEnabled()) {
176+
upstream.setStatus(true);
177+
upstream.setHealthy(true);
178+
}
179+
});
180+
}
181+
182+
private void removeAllUpstreams(final String selectorId, final List<Upstream> existUpstreamList) {
183+
List<Upstream> toRemove = new ArrayList<>(existUpstreamList);
184+
toRemove.forEach(up -> task.triggerRemoveOne(selectorId, up));
185+
}
164186

165-
// Use a Set for O(1) lookups instead of nested loops
187+
private void processOfflineUpstreams(final String selectorId, final List<Upstream> offlineUpstreamList,
188+
final List<Upstream> existUpstreamList) {
189+
Map<String, Upstream> currentUnhealthyMap = getCurrentUnhealthyMap(selectorId);
166190
Set<Upstream> existUpstreamSet = new HashSet<>(existUpstreamList);
191+
167192
offlineUpstreamList.forEach(offlineUp -> {
193+
String key = upstreamMapKey(offlineUp);
168194
if (existUpstreamSet.contains(offlineUp)) {
169-
task.triggerRemoveOne(selectorId, offlineUp);
195+
if (currentUnhealthyMap.containsKey(key) && offlineUp.isHealthCheckEnabled()) {
196+
task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp);
197+
task.removeFromMap(task.getHealthyUpstream(), selectorId, offlineUp);
198+
} else {
199+
task.triggerRemoveOne(selectorId, offlineUp);
200+
}
201+
} else if (offlineUp.isHealthCheckEnabled()) {
202+
task.putToMap(task.getUnhealthyUpstream(), selectorId, offlineUp);
203+
}
204+
});
205+
}
206+
207+
private void processValidUpstreams(final String selectorId, final List<Upstream> validUpstreamList,
208+
final List<Upstream> existUpstreamList) {
209+
if (validUpstreamList.isEmpty()) {
210+
return;
211+
}
212+
213+
updateExistingUpstreams(validUpstreamList, existUpstreamList);
214+
addNewUpstreams(selectorId, validUpstreamList, existUpstreamList);
215+
}
216+
217+
private void updateExistingUpstreams(final List<Upstream> validUpstreamList, final List<Upstream> existUpstreamList) {
218+
Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
219+
.collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing));
220+
221+
validUpstreamList.forEach(validUp -> {
222+
Upstream matchedExistUp = existUpstreamMap.get(upstreamMapKey(validUp));
223+
if (Objects.nonNull(matchedExistUp)) {
224+
matchedExistUp.setWeight(validUp.getWeight());
225+
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
226+
if (!matchedExistUp.isHealthCheckEnabled()) {
227+
matchedExistUp.setHealthy(true);
228+
}
170229
}
171230
});
231+
}
232+
233+
private void addNewUpstreams(final String selectorId, final List<Upstream> validUpstreamList,
234+
final List<Upstream> existUpstreamList) {
235+
Map<String, Upstream> currentUnhealthyMap = getCurrentUnhealthyMap(selectorId);
172236

173-
if (!validUpstreamList.isEmpty()) {
174-
// update upstream weight
175-
Map<String, Upstream> existUpstreamMap = existUpstreamList.stream()
176-
.collect(Collectors.toMap(this::upstreamMapKey, existUp -> existUp, (existing, replacement) -> existing));
177-
validUpstreamList.forEach(validUp -> {
178-
String key = upstreamMapKey(validUp);
179-
Upstream matchedExistUp = existUpstreamMap.get(key);
180-
if (Objects.nonNull(matchedExistUp)) {
181-
matchedExistUp.setWeight(validUp.getWeight());
182-
matchedExistUp.setHealthCheckEnabled(validUp.isHealthCheckEnabled());
183-
if (!matchedExistUp.isHealthCheckEnabled()) {
184-
matchedExistUp.setHealthy(true);
185-
}
237+
validUpstreamList.stream()
238+
.filter(validUp -> !existUpstreamList.contains(validUp))
239+
.forEach(up -> {
240+
Upstream prevUnhealthy = currentUnhealthyMap.get(upstreamMapKey(up));
241+
if (Objects.nonNull(prevUnhealthy)) {
242+
task.putToMap(task.getUnhealthyUpstream(), selectorId, up);
243+
} else {
244+
task.triggerAddOne(selectorId, up);
186245
}
187246
});
247+
}
188248

189-
validUpstreamList.stream()
190-
.filter(validUp -> !existUpstreamList.contains(validUp))
191-
.forEach(up -> task.triggerAddOne(selectorId, up));
192-
}
193-
194-
List<Upstream> healthyUpstreamList = task.getHealthyUpstreamListBySelectorId(selectorId);
195-
UPSTREAM_MAP.put(selectorId, Objects.isNull(healthyUpstreamList) ? Lists.newArrayList() : healthyUpstreamList);
249+
private Map<String, Upstream> getCurrentUnhealthyMap(final String selectorId) {
250+
List<Upstream> currentUnhealthy = task.getUnhealthyUpstream().get(selectorId);
251+
return Objects.isNull(currentUnhealthy)
252+
? Maps.newConcurrentMap()
253+
: currentUnhealthy.stream().collect(Collectors.toMap(this::upstreamMapKey, u -> u, (a, b) -> a));
196254
}
197255

198256
private String upstreamMapKey(final Upstream upstream) {

shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private void finishHealthCheck() {
265265
public void triggerAddOne(final String selectorId, final Upstream upstream) {
266266
putToMap(healthyUpstream, selectorId, upstream);
267267
}
268-
268+
269269
/**
270270
* Remove a specific upstream via selectorId.
271271
*
@@ -277,7 +277,14 @@ public void triggerRemoveOne(final String selectorId, final Upstream upstream) {
277277
removeFromMap(unhealthyUpstream, selectorId, upstream);
278278
}
279279

280-
private void putToMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
280+
/**
281+
* Put upstream to specified map (for preserving health status).
282+
*
283+
* @param map the map to put upstream
284+
* @param selectorId the selector id
285+
* @param upstream the upstream
286+
*/
287+
public void putToMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
281288
synchronized (lock) {
282289
List<Upstream> list = MapUtils.computeIfAbsent(map, selectorId, k -> Lists.newArrayList());
283290
if (!list.contains(upstream)) {
@@ -286,7 +293,14 @@ private void putToMap(final Map<String, List<Upstream>> map, final String select
286293
}
287294
}
288295

289-
private void removeFromMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
296+
/**
297+
* Remove upstream from specified map.
298+
*
299+
* @param map the map to remove upstream from
300+
* @param selectorId the selector id
301+
* @param upstream the upstream
302+
*/
303+
public void removeFromMap(final Map<String, List<Upstream>> map, final String selectorId, final Upstream upstream) {
290304
synchronized (lock) {
291305
List<Upstream> list = map.get(selectorId);
292306
if (CollectionUtils.isNotEmpty(list)) {

shenyu-loadbalancer/src/test/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManagerTest.java

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.Objects;
3132

3233
/**
3334
* The type UpstreamCacheManager check task test.
@@ -106,4 +107,188 @@ public void testSubmitSyncsHealthCheckEnabled() {
106107
// Clean up
107108
upstreamCacheManager.removeByKey(testSelectorId);
108109
}
110+
111+
@Test
112+
@Order(6)
113+
public void testSubmitWithStatusFalsePreservesUnhealthyState() {
114+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
115+
final String testSelectorId = "PRESERVE_UNHEALTHY_TEST";
116+
117+
// First, submit healthy upstreams to establish baseline
118+
List<Upstream> initialList = new ArrayList<>(2);
119+
initialList.add(Upstream.builder()
120+
.protocol("http://")
121+
.url("upstream1:8080")
122+
.status(true)
123+
.healthCheckEnabled(true)
124+
.build());
125+
initialList.add(Upstream.builder()
126+
.protocol("http://")
127+
.url("upstream2:8080")
128+
.status(true)
129+
.healthCheckEnabled(true)
130+
.build());
131+
upstreamCacheManager.submit(testSelectorId, initialList);
132+
133+
// Simulate health check marking one as unhealthy
134+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
135+
if (Objects.nonNull(task)) {
136+
Upstream unhealthyUpstream = initialList.get(0);
137+
unhealthyUpstream.setHealthy(false);
138+
task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream);
139+
task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream);
140+
141+
// Verify it's in unhealthy map
142+
Assertions.assertNotNull(task.getUnhealthyUpstream().get(testSelectorId));
143+
Assertions.assertTrue(task.getUnhealthyUpstream().get(testSelectorId).stream()
144+
.anyMatch(u -> u.getUrl().equals("upstream1:8080")));
145+
}
146+
147+
// Now admin sends update with status=false for that upstream
148+
List<Upstream> updateList = new ArrayList<>(2);
149+
updateList.add(Upstream.builder()
150+
.protocol("http://")
151+
.url("upstream1:8080")
152+
.status(false)
153+
.healthCheckEnabled(true)
154+
.build());
155+
updateList.add(Upstream.builder()
156+
.protocol("http://")
157+
.url("upstream2:8080")
158+
.status(true)
159+
.healthCheckEnabled(true)
160+
.build());
161+
upstreamCacheManager.submit(testSelectorId, updateList);
162+
163+
// Verify: upstream1 should still be in unhealthy map (preserved state)
164+
if (Objects.nonNull(task)) {
165+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
166+
Assertions.assertNotNull(unhealthyList);
167+
Assertions.assertTrue(unhealthyList.stream()
168+
.anyMatch(u -> u.getUrl().equals("upstream1:8080")),
169+
"upstream1 should be preserved in unhealthy map");
170+
}
171+
172+
// Clean up
173+
upstreamCacheManager.removeByKey(testSelectorId);
174+
}
175+
176+
@Test
177+
@Order(7)
178+
public void testSubmitWithNewOfflineUpstreamAddedToUnhealthy() {
179+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
180+
final String testSelectorId = "NEW_OFFLINE_UNHEALTHY_TEST";
181+
182+
// Submit a list with a new upstream having status=false
183+
List<Upstream> upstreamList = new ArrayList<>(1);
184+
upstreamList.add(Upstream.builder()
185+
.protocol("http://")
186+
.url("new-upstream:8080")
187+
.status(false)
188+
.healthCheckEnabled(true)
189+
.build());
190+
upstreamCacheManager.submit(testSelectorId, upstreamList);
191+
192+
// Verify: new upstream with status=false should be in unhealthy map for monitoring
193+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
194+
if (Objects.nonNull(task)) {
195+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
196+
Assertions.assertNotNull(unhealthyList);
197+
Assertions.assertTrue(unhealthyList.stream()
198+
.anyMatch(u -> u.getUrl().equals("new-upstream:8080")),
199+
"New upstream with status=false should be in unhealthy map");
200+
}
201+
202+
// Clean up
203+
upstreamCacheManager.removeByKey(testSelectorId);
204+
}
205+
206+
@Test
207+
@Order(8)
208+
public void testSubmitPreservesUnhealthyForValidUpstream() {
209+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
210+
final String testSelectorId = "PRESERVE_UNHEALTHY_VALID_TEST";
211+
212+
// First submit and mark an upstream as unhealthy
213+
List<Upstream> initialList = new ArrayList<>(1);
214+
initialList.add(Upstream.builder()
215+
.protocol("http://")
216+
.url("recovering-upstream:8080")
217+
.status(true)
218+
.healthCheckEnabled(true)
219+
.build());
220+
upstreamCacheManager.submit(testSelectorId, initialList);
221+
222+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
223+
if (Objects.nonNull(task)) {
224+
// Manually mark as unhealthy
225+
Upstream unhealthyUpstream = initialList.get(0);
226+
unhealthyUpstream.setHealthy(false);
227+
task.putToMap(task.getUnhealthyUpstream(), testSelectorId, unhealthyUpstream);
228+
task.removeFromMap(task.getHealthyUpstream(), testSelectorId, unhealthyUpstream);
229+
230+
// Now admin sends update with status=true (valid) for the same upstream
231+
List<Upstream> updateList = new ArrayList<>(1);
232+
updateList.add(Upstream.builder()
233+
.protocol("http://")
234+
.url("recovering-upstream:8080")
235+
.status(true)
236+
.healthCheckEnabled(true)
237+
.build());
238+
upstreamCacheManager.submit(testSelectorId, updateList);
239+
240+
// Verify: should preserve unhealthy state since it was previously unhealthy
241+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
242+
Assertions.assertNotNull(unhealthyList);
243+
Assertions.assertTrue(unhealthyList.stream()
244+
.anyMatch(u -> u.getUrl().equals("recovering-upstream:8080")),
245+
"Previously unhealthy upstream should remain in unhealthy map");
246+
}
247+
248+
// Clean up
249+
upstreamCacheManager.removeByKey(testSelectorId);
250+
}
251+
252+
@Test
253+
@Order(9)
254+
public void testSubmitWithHealthCheckDisabledAndStatusFalse() {
255+
final UpstreamCacheManager upstreamCacheManager = UpstreamCacheManager.getInstance();
256+
final String testSelectorId = "HEALTH_CHECK_DISABLED_STATUS_FALSE_TEST";
257+
258+
// Submit upstream with healthCheckEnabled=false and status=false
259+
// This upstream should be removed, not added to unhealthy map
260+
List<Upstream> upstreamList = new ArrayList<>(1);
261+
upstreamList.add(Upstream.builder()
262+
.protocol("http://")
263+
.url("no-check-upstream:8080")
264+
.status(false)
265+
.healthCheckEnabled(false)
266+
.build());
267+
upstreamCacheManager.submit(testSelectorId, upstreamList);
268+
269+
UpstreamCheckTask task = getUpstreamCheckTask(upstreamCacheManager);
270+
if (Objects.nonNull(task)) {
271+
// Verify: should NOT be in unhealthy map since health check is disabled
272+
List<Upstream> unhealthyList = task.getUnhealthyUpstream().get(testSelectorId);
273+
Assertions.assertTrue(Objects.isNull(unhealthyList) || unhealthyList.isEmpty(),
274+
"Upstream with healthCheckEnabled=false should not be in unhealthy map");
275+
}
276+
277+
// Clean up
278+
upstreamCacheManager.removeByKey(testSelectorId);
279+
}
280+
281+
/**
282+
* Helper method to get the UpstreamCheckTask using reflection.
283+
*/
284+
private UpstreamCheckTask getUpstreamCheckTask(final UpstreamCacheManager manager) {
285+
try {
286+
java.lang.reflect.Field field = UpstreamCacheManager.class.getDeclaredField("task");
287+
field.setAccessible(true);
288+
return (UpstreamCheckTask) field.get(manager);
289+
} catch (Exception e) {
290+
// If reflection fails, return null
291+
return null;
292+
}
293+
}
109294
}

0 commit comments

Comments
 (0)