Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*/
public class SubsMapHelper {
private static final Logger log = LoggerFactory.getLogger(SubsMapHelper.class);
private final IgniteCache<IgniteRegistrationInfo, Boolean> map;
private final IgniteCache<String, Set<IgniteRegistrationInfo>> map;
private final RegistrationListener registrationListener;
private final ConcurrentMap<String, Set<RegistrationInfo>> localSubs = new ConcurrentHashMap<>();
private final Throttling throttling;
Expand All @@ -54,7 +54,7 @@ public SubsMapHelper(Ignite ignite, RegistrationListener registrationListener, V
this.registrationListener = registrationListener;
throttling = new Throttling(vertxInternal, a -> getAndUpdate(a, vertxInternal));
shutdown = false;
map.query(new ContinuousQuery<IgniteRegistrationInfo, Boolean>()
map.query(new ContinuousQuery<String, Set<IgniteRegistrationInfo>>()
.setAutoUnsubscribe(true)
.setTimeInterval(100L)
.setPageSize(128)
Expand All @@ -66,11 +66,9 @@ public List<RegistrationInfo> get(String address) {
return null;
}
try {
List<Cache.Entry<IgniteRegistrationInfo, Boolean>> remote = map
.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.address().equals(address)))
.getAll();
Set<IgniteRegistrationInfo> remote = map.get(address);
List<RegistrationInfo> infos;
int size = remote.size();
int size = (remote != null) ? remote.size() : 0;
Set<RegistrationInfo> local = localSubs.get(address);
if (local != null) {
synchronized (local) {
Expand All @@ -86,8 +84,10 @@ public List<RegistrationInfo> get(String address) {
} else {
infos = new ArrayList<>(size);
}
for (Cache.Entry<IgniteRegistrationInfo, Boolean> info : remote) {
infos.add(info.getKey().registrationInfo());
if (remote != null) {
for (IgniteRegistrationInfo info : remote) {
infos.add(info.registrationInfo());
}
}

return infos;
Expand All @@ -105,7 +105,15 @@ public Void put(String address, RegistrationInfo registrationInfo) {
localSubs.compute(address, (add, curr) -> addToSet(registrationInfo, curr));
fireRegistrationUpdateEvent(address);
} else {
map.put(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
Set<IgniteRegistrationInfo> remoteInfos = map.get(address);
if (remoteInfos == null) {
Comment thread
mutexd marked this conversation as resolved.
Outdated
Set<IgniteRegistrationInfo> newInfoSet = new HashSet<>();
newInfoSet.add(new IgniteRegistrationInfo(address, registrationInfo));
map.put(address, newInfoSet);
} else {
remoteInfos.add(new IgniteRegistrationInfo(address, registrationInfo));
map.put(address, remoteInfos);
}
Comment thread
mutexd marked this conversation as resolved.
Outdated
}
} catch (IllegalStateException | CacheException e) {
throw new VertxException(e);
Expand All @@ -128,7 +136,14 @@ public Void remove(String address, RegistrationInfo registrationInfo) {
localSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
fireRegistrationUpdateEvent(address);
} else {
map.remove(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
Set<IgniteRegistrationInfo> originalInfos = map.get(address);
if (originalInfos != null && originalInfos.remove(new IgniteRegistrationInfo(address, registrationInfo))) {
if (originalInfos.isEmpty()) {
map.remove(address);
} else {
map.put(address, originalInfos);
}
}
Comment thread
mutexd marked this conversation as resolved.
Outdated
}
} catch (IllegalStateException | CacheException e) {
throw new VertxException(e, true);
Expand All @@ -142,15 +157,25 @@ private Set<RegistrationInfo> removeFromSet(RegistrationInfo registrationInfo, S
}

public void removeAllForNode(String nodeId) {
List<Cache.Entry<IgniteRegistrationInfo, Boolean>> toRemove = map
.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.registrationInfo().nodeId().equals(nodeId)))
.getAll();
for (Cache.Entry<IgniteRegistrationInfo, Boolean> info : toRemove) {
try {
map.remove(info.getKey(), Boolean.TRUE);
} catch (IllegalStateException | CacheException t) {
log.warn("Could not remove subscriber: " + t.getMessage());
try {
List<Cache.Entry<String, Set<IgniteRegistrationInfo>>> allEntries = map
.query(new ScanQuery<String, Set<IgniteRegistrationInfo>>(null))
Comment thread
mutexd marked this conversation as resolved.
.getAll();
if (allEntries == null || allEntries.isEmpty()) {
return;
}
for (Cache.Entry<String, Set<IgniteRegistrationInfo>> entry : allEntries) {
String address = entry.getKey();
Set<IgniteRegistrationInfo> registrations = entry.getValue();
boolean modified = registrations.removeIf(info -> info.registrationInfo().nodeId().equals(nodeId));
if (registrations.isEmpty()) {
map.remove(address);
} else if (modified) {
map.put(address, registrations);
}
}
} catch (IllegalStateException | CacheException t) {
log.warn("Could not remove subscribers for nodeId " + nodeId + ": " + t.getMessage());
}
}

Expand All @@ -177,10 +202,10 @@ private Future<List<RegistrationInfo>> getAndUpdate(String address, VertxInterna
return prom.future();
}

private void listen(final Iterable<CacheEntryEvent<? extends IgniteRegistrationInfo, ? extends Boolean>> events, final VertxInternal vertxInternal) {
private void listen(final Iterable<CacheEntryEvent<? extends String, ? extends Set<IgniteRegistrationInfo>>> events, final VertxInternal vertxInternal) {
vertxInternal.executeBlocking(() -> {
StreamSupport.stream(events.spliterator(), false)
.map(e -> e.getKey().address())
.map(Cache.Entry::getKey)
.distinct()
.forEach(this::fireRegistrationUpdateEvent);
return null;
Expand Down
Loading