Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 103 additions & 30 deletions src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.ScanQuery;

import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import javax.cache.processor.MutableEntry;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
Expand All @@ -43,7 +46,7 @@
*/
public class SubsMapHelper {
private static final Logger log = LoggerFactory.getLogger(SubsMapHelper.class);
private final IgniteCache<IgniteRegistrationInfo, Boolean> map;
private final IgniteCache<String, Set<IgniteRegistrationInfo>> map;
private final RegistrationListener registrationListener;
private final ConcurrentMap<String, Set<RegistrationInfo>> localSubs = new ConcurrentHashMap<>();
private final Throttling throttling;
Expand All @@ -54,23 +57,21 @@ public SubsMapHelper(Ignite ignite, RegistrationListener registrationListener, V
this.registrationListener = registrationListener;
throttling = new Throttling(vertxInternal, a -> getAndUpdate(a, vertxInternal));
shutdown = false;
map.query(new ContinuousQuery<IgniteRegistrationInfo, Boolean>()
.setAutoUnsubscribe(true)
.setTimeInterval(100L)
.setPageSize(128)
.setLocalListener(l -> listen(l, vertxInternal)));
map.query(new ContinuousQuery<String, Set<IgniteRegistrationInfo>>()
.setAutoUnsubscribe(true)
.setTimeInterval(100L)
.setPageSize(128)
.setLocalListener(l -> listen(l, vertxInternal)));
}

public List<RegistrationInfo> get(String address) {
if (shutdown) {
return null;
return Collections.emptyList();
}
try {
List<Cache.Entry<IgniteRegistrationInfo, Boolean>> remote = map
.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.address().equals(address)))
.getAll();
Set<IgniteRegistrationInfo> remote = map.get(address);
List<RegistrationInfo> infos;
int size = remote.size();
int size = (remote != null) ? remote.size() : 0;
Set<RegistrationInfo> local = localSubs.get(address);
if (local != null) {
synchronized (local) {
Expand All @@ -86,8 +87,10 @@ public List<RegistrationInfo> get(String address) {
} else {
infos = new ArrayList<>(size);
}
for (Cache.Entry<IgniteRegistrationInfo, Boolean> info : remote) {
infos.add(info.getKey().registrationInfo());
if (remote != null) {
for (IgniteRegistrationInfo info : remote) {
infos.add(info.registrationInfo());
}
}

return infos;
Expand All @@ -105,10 +108,10 @@ public Void put(String address, RegistrationInfo registrationInfo) {
localSubs.compute(address, (add, curr) -> addToSet(registrationInfo, curr));
fireRegistrationUpdateEvent(address);
} else {
map.put(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
map.invoke(address, new AddRegistrationProcessor(new IgniteRegistrationInfo(address, registrationInfo)));
}
} catch (IllegalStateException | CacheException e) {
throw new VertxException(e);
throw new VertxException(e, true);
}
return null;
}
Expand All @@ -128,7 +131,7 @@ public Void remove(String address, RegistrationInfo registrationInfo) {
localSubs.computeIfPresent(address, (add, curr) -> removeFromSet(registrationInfo, curr));
fireRegistrationUpdateEvent(address);
} else {
map.remove(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
map.invoke(address, new RemoveRegistrationProcessor(new IgniteRegistrationInfo(address, registrationInfo)));
}
} catch (IllegalStateException | CacheException e) {
throw new VertxException(e, true);
Expand All @@ -142,15 +145,19 @@ private Set<RegistrationInfo> removeFromSet(RegistrationInfo registrationInfo, S
}

public void removeAllForNode(String nodeId) {
List<Cache.Entry<IgniteRegistrationInfo, Boolean>> toRemove = map
.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.registrationInfo().nodeId().equals(nodeId)))
.getAll();
for (Cache.Entry<IgniteRegistrationInfo, Boolean> info : toRemove) {
try {
map.remove(info.getKey(), Boolean.TRUE);
} catch (IllegalStateException | CacheException t) {
log.warn("Could not remove subscriber: " + t.getMessage());
try {
List<Cache.Entry<String, Set<IgniteRegistrationInfo>>> allEntries = map
.query(new ScanQuery<String, Set<IgniteRegistrationInfo>>(null))
.getAll();
if (allEntries == null || allEntries.isEmpty()) {
return;
}
Set<String> keys = allEntries.stream()
.map(Cache.Entry::getKey)
.collect(Collectors.toSet());
map.invokeAll(keys, new RemoveNodeRegistrationsProcessor(nodeId));
} catch (IllegalStateException | CacheException t) {
log.warn("Could not remove subscribers for nodeId " + nodeId + ": " + t.getMessage());
}
}

Expand All @@ -169,21 +176,87 @@ private Future<List<RegistrationInfo>> getAndUpdate(String address, VertxInterna
registrationListener.registrationsUpdated(new RegistrationUpdateEvent(address, registrationInfos));
});
vertxInternal.executeBlocking(() ->
get(address), false
get(address), false
).onComplete(prom);
} else {
prom.complete();
}
return prom.future();
}

private void listen(final Iterable<CacheEntryEvent<? extends IgniteRegistrationInfo, ? extends Boolean>> events, final VertxInternal vertxInternal) {
private void listen(final Iterable<CacheEntryEvent<? extends String, ? extends Set<IgniteRegistrationInfo>>> events, final VertxInternal vertxInternal) {
vertxInternal.executeBlocking(() -> {
StreamSupport.stream(events.spliterator(), false)
.map(e -> e.getKey().address())
.distinct()
.forEach(this::fireRegistrationUpdateEvent);
.map(Cache.Entry::getKey)
.distinct()
.forEach(this::fireRegistrationUpdateEvent);
return null;
}, false);
}
}

private static class AddRegistrationProcessor implements CacheEntryProcessor<String, Set<IgniteRegistrationInfo>, Void> {
private static final long serialVersionUID = 1L;
private final IgniteRegistrationInfo info;

AddRegistrationProcessor(IgniteRegistrationInfo info) {
this.info = info;
}

@Override
public Void process(MutableEntry<String, Set<IgniteRegistrationInfo>> entry, Object... arguments) {
Set<IgniteRegistrationInfo> current = entry.getValue();
if (current == null) {
current = new HashSet<>();
}
current.add(info);
entry.setValue(current);
return null;
}
}

private static class RemoveRegistrationProcessor implements CacheEntryProcessor<String, Set<IgniteRegistrationInfo>, Void> {
private static final long serialVersionUID = 1L;
private final IgniteRegistrationInfo info;

RemoveRegistrationProcessor(IgniteRegistrationInfo info) {
this.info = info;
}

@Override
public Void process(MutableEntry<String, Set<IgniteRegistrationInfo>> entry, Object... arguments) {
Set<IgniteRegistrationInfo> current = entry.getValue();
if (current != null) {
current.remove(info);
if (current.isEmpty()) {
entry.remove();
} else {
entry.setValue(current);
}
}
return null;
}
}

private static class RemoveNodeRegistrationsProcessor implements CacheEntryProcessor<String, Set<IgniteRegistrationInfo>, Void> {
private static final long serialVersionUID = 1L;
private final String nodeId;

RemoveNodeRegistrationsProcessor(String nodeId) {
this.nodeId = nodeId;
}

@Override
public Void process(MutableEntry<String, Set<IgniteRegistrationInfo>> entry, Object... arguments) {
Set<IgniteRegistrationInfo> current = entry.getValue();
if (current != null) {
current.removeIf(info -> info.registrationInfo().nodeId().equals(nodeId));
if (current.isEmpty()) {
entry.remove();
} else {
entry.setValue(current);
}
}
return null;
}
}
}
Loading