2626import io .vertx .core .spi .cluster .RegistrationUpdateEvent ;
2727import org .apache .ignite .Ignite ;
2828import org .apache .ignite .IgniteCache ;
29+ import org .apache .ignite .cache .CacheEntryProcessor ;
2930import org .apache .ignite .cache .query .ContinuousQuery ;
3031import org .apache .ignite .cache .query .ScanQuery ;
3132
3233import javax .cache .Cache ;
3334import javax .cache .CacheException ;
3435import javax .cache .event .CacheEntryEvent ;
36+ import javax .cache .processor .MutableEntry ;
3537import java .util .*;
3638import java .util .concurrent .ConcurrentHashMap ;
3739import java .util .concurrent .ConcurrentMap ;
40+ import java .util .stream .Collectors ;
3841import java .util .stream .StreamSupport ;
3942
4043/**
4346 */
4447public class SubsMapHelper {
4548 private static final Logger log = LoggerFactory .getLogger (SubsMapHelper .class );
46- private final IgniteCache <IgniteRegistrationInfo , Boolean > map ;
49+ private final IgniteCache <String , Set < IgniteRegistrationInfo > > map ;
4750 private final RegistrationListener registrationListener ;
4851 private final ConcurrentMap <String , Set <RegistrationInfo >> localSubs = new ConcurrentHashMap <>();
4952 private final Throttling throttling ;
@@ -54,23 +57,21 @@ public SubsMapHelper(Ignite ignite, RegistrationListener registrationListener, V
5457 this .registrationListener = registrationListener ;
5558 throttling = new Throttling (vertxInternal , a -> getAndUpdate (a , vertxInternal ));
5659 shutdown = false ;
57- map .query (new ContinuousQuery <IgniteRegistrationInfo , Boolean >()
58- .setAutoUnsubscribe (true )
59- .setTimeInterval (100L )
60- .setPageSize (128 )
61- .setLocalListener (l -> listen (l , vertxInternal )));
60+ map .query (new ContinuousQuery <String , Set < IgniteRegistrationInfo > >()
61+ .setAutoUnsubscribe (true )
62+ .setTimeInterval (100L )
63+ .setPageSize (128 )
64+ .setLocalListener (l -> listen (l , vertxInternal )));
6265 }
6366
6467 public List <RegistrationInfo > get (String address ) {
6568 if (shutdown ) {
66- return null ;
69+ return Collections . emptyList () ;
6770 }
6871 try {
69- List <Cache .Entry <IgniteRegistrationInfo , Boolean >> remote = map
70- .query (new ScanQuery <IgniteRegistrationInfo , Boolean >((k , v ) -> k .address ().equals (address )))
71- .getAll ();
72+ Set <IgniteRegistrationInfo > remote = map .get (address );
7273 List <RegistrationInfo > infos ;
73- int size = remote .size ();
74+ int size = ( remote != null ) ? remote .size () : 0 ;
7475 Set <RegistrationInfo > local = localSubs .get (address );
7576 if (local != null ) {
7677 synchronized (local ) {
@@ -86,8 +87,10 @@ public List<RegistrationInfo> get(String address) {
8687 } else {
8788 infos = new ArrayList <>(size );
8889 }
89- for (Cache .Entry <IgniteRegistrationInfo , Boolean > info : remote ) {
90- infos .add (info .getKey ().registrationInfo ());
90+ if (remote != null ) {
91+ for (IgniteRegistrationInfo info : remote ) {
92+ infos .add (info .registrationInfo ());
93+ }
9194 }
9295
9396 return infos ;
@@ -105,10 +108,10 @@ public Void put(String address, RegistrationInfo registrationInfo) {
105108 localSubs .compute (address , (add , curr ) -> addToSet (registrationInfo , curr ));
106109 fireRegistrationUpdateEvent (address );
107110 } else {
108- map .put ( new IgniteRegistrationInfo (address , registrationInfo ), Boolean . TRUE );
111+ map .invoke ( address , new AddRegistrationProcessor ( new IgniteRegistrationInfo (address , registrationInfo )) );
109112 }
110113 } catch (IllegalStateException | CacheException e ) {
111- throw new VertxException (e );
114+ throw new VertxException (e , true );
112115 }
113116 return null ;
114117 }
@@ -128,7 +131,7 @@ public Void remove(String address, RegistrationInfo registrationInfo) {
128131 localSubs .computeIfPresent (address , (add , curr ) -> removeFromSet (registrationInfo , curr ));
129132 fireRegistrationUpdateEvent (address );
130133 } else {
131- map .remove ( new IgniteRegistrationInfo (address , registrationInfo ), Boolean . TRUE );
134+ map .invoke ( address , new RemoveRegistrationProcessor ( new IgniteRegistrationInfo (address , registrationInfo )) );
132135 }
133136 } catch (IllegalStateException | CacheException e ) {
134137 throw new VertxException (e , true );
@@ -142,15 +145,19 @@ private Set<RegistrationInfo> removeFromSet(RegistrationInfo registrationInfo, S
142145 }
143146
144147 public void removeAllForNode (String nodeId ) {
145- List <Cache .Entry <IgniteRegistrationInfo , Boolean >> toRemove = map
146- .query (new ScanQuery <IgniteRegistrationInfo , Boolean >((k , v ) -> k .registrationInfo ().nodeId ().equals (nodeId )))
147- .getAll ();
148- for (Cache .Entry <IgniteRegistrationInfo , Boolean > info : toRemove ) {
149- try {
150- map .remove (info .getKey (), Boolean .TRUE );
151- } catch (IllegalStateException | CacheException t ) {
152- log .warn ("Could not remove subscriber: " + t .getMessage ());
148+ try {
149+ List <Cache .Entry <String , Set <IgniteRegistrationInfo >>> allEntries = map
150+ .query (new ScanQuery <String , Set <IgniteRegistrationInfo >>(null ))
151+ .getAll ();
152+ if (allEntries == null || allEntries .isEmpty ()) {
153+ return ;
153154 }
155+ Set <String > keys = allEntries .stream ()
156+ .map (Cache .Entry ::getKey )
157+ .collect (Collectors .toSet ());
158+ map .invokeAll (keys , new RemoveNodeRegistrationsProcessor (nodeId ));
159+ } catch (IllegalStateException | CacheException t ) {
160+ log .warn ("Could not remove subscribers for nodeId " + nodeId + ": " + t .getMessage ());
154161 }
155162 }
156163
@@ -169,21 +176,87 @@ private Future<List<RegistrationInfo>> getAndUpdate(String address, VertxInterna
169176 registrationListener .registrationsUpdated (new RegistrationUpdateEvent (address , registrationInfos ));
170177 });
171178 vertxInternal .executeBlocking (() ->
172- get (address ), false
179+ get (address ), false
173180 ).onComplete (prom );
174181 } else {
175182 prom .complete ();
176183 }
177184 return prom .future ();
178185 }
179186
180- private void listen (final Iterable <CacheEntryEvent <? extends IgniteRegistrationInfo , ? extends Boolean >> events , final VertxInternal vertxInternal ) {
187+ private void listen (final Iterable <CacheEntryEvent <? extends String , ? extends Set < IgniteRegistrationInfo > >> events , final VertxInternal vertxInternal ) {
181188 vertxInternal .executeBlocking (() -> {
182189 StreamSupport .stream (events .spliterator (), false )
183- . map ( e -> e . getKey (). address () )
184- .distinct ()
185- .forEach (this ::fireRegistrationUpdateEvent );
190+ . map ( Cache . Entry :: getKey )
191+ .distinct ()
192+ .forEach (this ::fireRegistrationUpdateEvent );
186193 return null ;
187194 }, false );
188195 }
189- }
196+
197+ private static class AddRegistrationProcessor implements CacheEntryProcessor <String , Set <IgniteRegistrationInfo >, Void > {
198+ private static final long serialVersionUID = 1L ;
199+ private final IgniteRegistrationInfo info ;
200+
201+ AddRegistrationProcessor (IgniteRegistrationInfo info ) {
202+ this .info = info ;
203+ }
204+
205+ @ Override
206+ public Void process (MutableEntry <String , Set <IgniteRegistrationInfo >> entry , Object ... arguments ) {
207+ Set <IgniteRegistrationInfo > current = entry .getValue ();
208+ if (current == null ) {
209+ current = new HashSet <>();
210+ }
211+ current .add (info );
212+ entry .setValue (current );
213+ return null ;
214+ }
215+ }
216+
217+ private static class RemoveRegistrationProcessor implements CacheEntryProcessor <String , Set <IgniteRegistrationInfo >, Void > {
218+ private static final long serialVersionUID = 1L ;
219+ private final IgniteRegistrationInfo info ;
220+
221+ RemoveRegistrationProcessor (IgniteRegistrationInfo info ) {
222+ this .info = info ;
223+ }
224+
225+ @ Override
226+ public Void process (MutableEntry <String , Set <IgniteRegistrationInfo >> entry , Object ... arguments ) {
227+ Set <IgniteRegistrationInfo > current = entry .getValue ();
228+ if (current != null ) {
229+ current .remove (info );
230+ if (current .isEmpty ()) {
231+ entry .remove ();
232+ } else {
233+ entry .setValue (current );
234+ }
235+ }
236+ return null ;
237+ }
238+ }
239+
240+ private static class RemoveNodeRegistrationsProcessor implements CacheEntryProcessor <String , Set <IgniteRegistrationInfo >, Void > {
241+ private static final long serialVersionUID = 1L ;
242+ private final String nodeId ;
243+
244+ RemoveNodeRegistrationsProcessor (String nodeId ) {
245+ this .nodeId = nodeId ;
246+ }
247+
248+ @ Override
249+ public Void process (MutableEntry <String , Set <IgniteRegistrationInfo >> entry , Object ... arguments ) {
250+ Set <IgniteRegistrationInfo > current = entry .getValue ();
251+ if (current != null ) {
252+ current .removeIf (info -> info .registrationInfo ().nodeId ().equals (nodeId ));
253+ if (current .isEmpty ()) {
254+ entry .remove ();
255+ } else {
256+ entry .setValue (current );
257+ }
258+ }
259+ return null ;
260+ }
261+ }
262+ }
0 commit comments