2020import java .io .IOException ;
2121import java .net .URI ;
2222import java .net .URISyntaxException ;
23+ import java .net .URLDecoder ;
24+ import java .net .URLEncoder ;
2325import java .time .Duration ;
2426import java .util .*;
2527import java .util .concurrent .ExecutorService ;
@@ -285,6 +287,32 @@ public long housekeeping() {
285287 return now + heartbeatInterval ;
286288 }
287289
290+ /***
291+ * Creates a zeroMQ subscription topic from the path and query part of a URI.
292+ * Since different URIs can represent the same endpoint (encoding, parameter ordering), we need to normalise the URI.
293+ * Otherwise, subscriptions cannot work reliably across different implementations.
294+ * This implementation ensures:
295+ * - path and each query key and value are individually encoded by encoding all special characters even if not mandated by the URI RFC
296+ * - the individual key-value pairs are sorted alphabetically by key
297+ * The java URI implementation
298+ */
299+ public static String getZMQTopicFromURI (final URI uri ) {
300+ if (uri .getRawQuery () == null ) {
301+ return URLEncoder .encode (uri .getPath (), UTF_8 ).replace ("+" , "%20" ).replace ("%2F" , "/" ) + "#" ;
302+ } // no query parameters
303+ Map <String , String > queryParameters = new TreeMap <>();
304+ Arrays .stream (uri .getRawQuery ().split ("&" ))
305+ .map (s -> s .split ("=" , 2 ))
306+ .forEach (a -> queryParameters .put (URLEncoder .encode (URLDecoder .decode (a [0 ], UTF_8 ), UTF_8 ).replace ("+" , "%20" ), URLEncoder .encode (URLDecoder .decode (a [1 ], UTF_8 ), UTF_8 ).replace ("+" , "%20" )));
307+ StringBuilder sb = new StringBuilder (URLEncoder .encode (uri .getPath (), UTF_8 ).replace ("+" , "%20" ).replace ("%2F" , "/" ) + "?" );
308+ queryParameters .forEach ((k , v ) -> {
309+ sb .append (k ).append ('=' ).append (v ).append ('&' );
310+ });
311+ sb .setLength (sb .length () - 1 );
312+ sb .append ('#' );
313+ return sb .toString ();
314+ }
315+
288316 @ Override
289317 public void subscribe (final String reqId , final URI endpoint , final byte [] rbacToken ) {
290318 subscriptions .put (reqId , endpoint );
@@ -296,7 +324,7 @@ public void subscribe(final String reqId, final URI endpoint, final byte[] rbacT
296324 LOGGER .atError ().addArgument (reqId ).addArgument (endpoint ).log ("subscription error (reqId: {}) for endpoint: {}" );
297325 }
298326 } else { // mds
299- final String id = endpoint . getPath () + '?' + endpoint . getQuery () + '#' ;
327+ final String id = getZMQTopicFromURI ( endpoint ) ;
300328 socket .subscribe (id .getBytes (UTF_8 ));
301329 }
302330 }
@@ -311,7 +339,8 @@ public void unsubscribe(final String reqId) {
311339 LOGGER .atError ().addArgument (reqId ).addArgument (endpoint ).log ("unsubscribe error (reqId: {}) for endpoint: {}" );
312340 }
313341 } else { // mds
314- socket .unsubscribe (serviceId );
342+ final String id = getZMQTopicFromURI (endpoint );
343+ socket .unsubscribe (id .getBytes (UTF_8 ));
315344 }
316345 }
317346
0 commit comments