Skip to content

Commit e563950

Browse files
authored
Merge pull request #160 from zyclonite/fix_subsmaphelper_cursor_5_0
Continous query cursor cleanup, localSubs clear on leave
2 parents e816387 + c1f1fcf commit e563950

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

src/main/java/io/vertx/spi/cluster/ignite/impl/SubsMapHelper.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.ignite.IgniteCache;
2929
import org.apache.ignite.cache.CacheEntryProcessor;
3030
import org.apache.ignite.cache.query.ContinuousQuery;
31+
import org.apache.ignite.cache.query.QueryCursor;
3132
import org.apache.ignite.cache.query.ScanQuery;
3233

3334
import javax.cache.Cache;
@@ -50,14 +51,15 @@ public class SubsMapHelper {
5051
private final RegistrationListener registrationListener;
5152
private final ConcurrentMap<String, Set<RegistrationInfo>> localSubs = new ConcurrentHashMap<>();
5253
private final Throttling throttling;
54+
private final QueryCursor<Cache.Entry<String, Set<IgniteRegistrationInfo>>> continuousQuery;
5355
private volatile boolean shutdown;
5456

5557
public SubsMapHelper(Ignite ignite, RegistrationListener registrationListener, VertxInternal vertxInternal) {
5658
map = ignite.getOrCreateCache("__vertx.subs");
5759
this.registrationListener = registrationListener;
5860
throttling = new Throttling(vertxInternal, a -> getAndUpdate(a, vertxInternal));
5961
shutdown = false;
60-
map.query(new ContinuousQuery<String, Set<IgniteRegistrationInfo>>()
62+
continuousQuery = map.query(new ContinuousQuery<String, Set<IgniteRegistrationInfo>>()
6163
.setAutoUnsubscribe(true)
6264
.setTimeInterval(100L)
6365
.setPageSize(128)
@@ -163,13 +165,22 @@ public void removeAllForNode(String nodeId) {
163165

164166
public void leave() {
165167
shutdown = true;
168+
try {
169+
continuousQuery.close();
170+
} catch (Exception e) {
171+
log.debug("Failed to close continuous query cursor: " + e.getMessage());
172+
}
173+
localSubs.clear();
166174
}
167175

168176
private void fireRegistrationUpdateEvent(String address) {
169177
throttling.onEvent(address);
170178
}
171179

172180
private Future<List<RegistrationInfo>> getAndUpdate(String address, VertxInternal vertxInternal) {
181+
if (shutdown) {
182+
return Future.succeededFuture();
183+
}
173184
Promise<List<RegistrationInfo>> prom = Promise.promise();
174185
if (registrationListener.wantsUpdatesFor(address)) {
175186
prom.future().onSuccess(registrationInfos -> {
@@ -185,6 +196,9 @@ private Future<List<RegistrationInfo>> getAndUpdate(String address, VertxInterna
185196
}
186197

187198
private void listen(final Iterable<CacheEntryEvent<? extends String, ? extends Set<IgniteRegistrationInfo>>> events, final VertxInternal vertxInternal) {
199+
if (shutdown) {
200+
return;
201+
}
188202
vertxInternal.executeBlocking(() -> {
189203
StreamSupport.stream(events.spliterator(), false)
190204
.map(Cache.Entry::getKey)
@@ -259,4 +273,4 @@ public Void process(MutableEntry<String, Set<IgniteRegistrationInfo>> entry, Obj
259273
return null;
260274
}
261275
}
262-
}
276+
}

0 commit comments

Comments
 (0)