Skip to content

Commit 11f1534

Browse files
authored
Refactor: Trigger the listener added for the first time (#225)
* refactor: Trigger the listener added for the first time * refactor: Trigger the watch listener before adding to watchers
1 parent 479f3f3 commit 11f1534

File tree

1 file changed

+15
-1
lines changed
  • polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow

1 file changed

+15
-1
lines changed

polaris-discovery/polaris-discovery-client/src/main/java/com/tencent/polaris/discovery/client/flow/WatchFlow.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,18 @@
3636
import com.tencent.polaris.client.util.NamedThreadFactory;
3737
import com.tencent.polaris.client.util.Utils;
3838
import com.tencent.polaris.logging.LoggerFactory;
39+
import java.util.Arrays;
3940
import java.util.Collections;
4041
import java.util.HashSet;
42+
import java.util.List;
4143
import java.util.Map;
4244
import java.util.Set;
4345
import java.util.concurrent.ConcurrentHashMap;
4446
import java.util.concurrent.Executor;
4547
import java.util.concurrent.Executors;
4648
import java.util.concurrent.atomic.AtomicBoolean;
4749
import java.util.function.BiConsumer;
50+
import java.util.stream.Collectors;
4851
import org.slf4j.Logger;
4952

5053
/**
@@ -77,7 +80,18 @@ public WatchServiceResponse commonWatchService(CommonWatchServiceRequest request
7780
InstancesResponse response = syncFlow.commonSyncGetAllInstances(request.getAllRequest());
7881
watchers.computeIfAbsent(request.getSvcEventKey().getServiceKey(),
7982
key -> Collections.synchronizedSet(new HashSet<>()));
80-
boolean result = watchers.get(serviceKey).addAll(request.getWatchServiceRequest().getListeners());
83+
List<ServiceListener> addListeners = request.getWatchServiceRequest().getListeners();
84+
Set<ServiceListener> existListeners = watchers.get(serviceKey);
85+
List<ServiceListener> firstAddedListeners = addListeners.stream()
86+
.filter(serviceListener -> !existListeners.contains(serviceListener)).collect(Collectors.toList());
87+
if (CollectionUtils.isNotEmpty(firstAddedListeners)) {
88+
ServiceChangeEvent event = ServiceChangeEvent.builder().serviceKey(serviceKey)
89+
.addInstances(Arrays.asList(response.getInstances()))
90+
.allInstances(Arrays.asList(response.getInstances())).build();
91+
firstAddedListeners.forEach(
92+
serviceListener -> executor.execute(event.getServiceKey(), () -> serviceListener.onEvent(event)));
93+
}
94+
boolean result = existListeners.addAll(addListeners);
8195
return new WatchServiceResponse(response, result);
8296
}
8397

0 commit comments

Comments
 (0)