Skip to content

Commit ff90eb3

Browse files
BeiKeJieDeLiuLangMaoslievrly
authored andcommitted
bugfix: address list change events leak
1 parent 98cd83b commit ff90eb3

File tree

1 file changed

+50
-7
lines changed

1 file changed

+50
-7
lines changed

discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java

+50-7
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.seata.config.Configuration;
3535
import io.seata.config.ConfigurationFactory;
3636
import io.seata.discovery.registry.RegistryService;
37+
import java.util.Objects;
3738
import org.slf4j.Logger;
3839
import org.slf4j.LoggerFactory;
3940

@@ -85,7 +86,7 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener>
8586
private final static long LIFE_KEEP_CRITICAL = 6;
8687
private static volatile EtcdRegistryServiceImpl instance;
8788
private static volatile Client client;
88-
private ConcurrentMap<String, List<InetSocketAddress>> clusterAddressMap;
89+
private ConcurrentMap<String, Pair<Long /*revision*/, List<InetSocketAddress>>> clusterAddressMap;
8990
private ConcurrentMap<String, Set<Watch.Listener>> listenerMap;
9091
private ConcurrentMap<String, EtcdWatcher> watcherMap;
9192
private static long leaseId = 0;
@@ -158,7 +159,7 @@ private void doUnregister(InetSocketAddress address) throws Exception {
158159
public void subscribe(String cluster, Watch.Listener listener) throws Exception {
159160
listenerMap.putIfAbsent(cluster, new HashSet<>());
160161
listenerMap.get(cluster).add(listener);
161-
EtcdWatcher watcher = watcherMap.computeIfAbsent(cluster, w -> new EtcdWatcher(listener));
162+
EtcdWatcher watcher = watcherMap.computeIfAbsent(cluster, w -> new EtcdWatcher(cluster, listener));
162163
executorService.submit(watcher);
163164
}
164165

@@ -212,7 +213,7 @@ public void onCompleted() {
212213
});
213214

214215
}
215-
return clusterAddressMap.get(cluster);
216+
return clusterAddressMap.get(cluster).getValue();
216217
}
217218

218219
@Override
@@ -244,7 +245,7 @@ private void refreshCluster(String cluster) throws Exception {
244245
String[] instanceInfo = keyValue.getValue().toString(UTF_8).split(":");
245246
return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1]));
246247
}).collect(Collectors.toList());
247-
clusterAddressMap.put(cluster, instanceList);
248+
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));
248249
}
249250

250251
/**
@@ -390,16 +391,23 @@ public Boolean call() {
390391
private class EtcdWatcher implements Runnable {
391392
private final Watch.Listener listener;
392393
private Watch.Watcher watcher;
394+
private String cluster;
393395

394-
public EtcdWatcher(Watch.Listener listener) {
396+
public EtcdWatcher(String cluster, Watch.Listener listener) {
397+
this.cluster = cluster;
395398
this.listener = listener;
396399
}
397400

398401
@Override
399402
public void run() {
400403
Watch watchClient = getClient().getWatchClient();
401-
WatchOption watchOption = WatchOption.newBuilder().withPrefix(buildRegistryKeyPrefix()).build();
402-
this.watcher = watchClient.watch(buildRegistryKeyPrefix(), watchOption, this.listener);
404+
WatchOption.Builder watchOptionBuilder = WatchOption.newBuilder().withPrefix(buildRegistryKeyPrefix());
405+
Pair<Long /*revision*/, List<InetSocketAddress>> addressPair = clusterAddressMap.get(cluster);
406+
if (Objects.nonNull(addressPair)) {
407+
// Maybe addressPair isn't newest now, but it's ok
408+
watchOptionBuilder.withRevision(addressPair.getKey());
409+
}
410+
this.watcher = watchClient.watch(buildRegistryKeyPrefix(), watchOptionBuilder.build(), this.listener);
403411
}
404412

405413
/**
@@ -409,4 +417,39 @@ public void stop() {
409417
this.watcher.close();
410418
}
411419
}
420+
421+
private static class Pair<K,V> {
422+
423+
/**
424+
* Key of this <code>Pair</code>.
425+
*/
426+
private K key;
427+
428+
/**
429+
* Value of this this <code>Pair</code>.
430+
*/
431+
private V value;
432+
433+
/**
434+
* Creates a new pair
435+
* @param key The key for this pair
436+
* @param value The value to use for this pair
437+
*/
438+
public Pair(K key, V value) {
439+
this.key = key;
440+
this.value = value;
441+
}
442+
443+
/**
444+
* Gets the key for this pair.
445+
* @return key for this pair
446+
*/
447+
public K getKey() { return key; }
448+
449+
/**
450+
* Gets the value for this pair.
451+
* @return value for this pair
452+
*/
453+
public V getValue() { return value; }
454+
}
412455
}

0 commit comments

Comments
 (0)