Skip to content

Commit d6cf8d9

Browse files
wirew0rmRalphSteinhagen
authored andcommitted
Client: improve handling of reply context
- Future: set atomic to ready only after the reply has been set - ReplyQuery: only include entries from reply context, do not duplicate entries from request url - DataSourcePublisher.get: allow access to the reply context from the returned future - DataSourcePublsher subscription listener: fix handling of Context type none Signed-off-by: Alexander Krimm <[email protected]>
1 parent c2bd196 commit d6cf8d9

File tree

4 files changed

+40
-27
lines changed

4 files changed

+40
-27
lines changed

client/src/main/java/io/opencmw/client/DataSourcePublisher.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Map;
2222
import java.util.Objects;
23-
import java.util.concurrent.ConcurrentHashMap;
24-
import java.util.concurrent.ExecutorService;
25-
import java.util.concurrent.Executors;
26-
import java.util.concurrent.Future;
23+
import java.util.concurrent.*;
2724
import java.util.concurrent.atomic.AtomicBoolean;
2825
import java.util.concurrent.atomic.AtomicInteger;
2926
import java.util.concurrent.atomic.AtomicReference;
@@ -317,9 +314,9 @@ protected boolean handleDataSourceSockets() {
317314
return dataAvailable;
318315
}
319316

320-
protected <R> ThePromisedFuture<R, ?> newRequestFuture(final URI endpoint, final Class<R> requestedDomainObjType, final Command requestType, final String requestId) {
317+
protected <R, C> ThePromisedFuture<R, C> newRequestFuture(final URI endpoint, final Class<R> requestedDomainObjType, final Command requestType, final String requestId) {
321318
FilterRegistry.checkClassForNewFilters(requestedDomainObjType);
322-
final ThePromisedFuture<R, ?> requestFuture = new ThePromisedFuture<>(endpoint, requestedDomainObjType, null, requestType, requestId, null);
319+
final ThePromisedFuture<R, C> requestFuture = new ThePromisedFuture<>(endpoint, requestedDomainObjType, null, requestType, requestId, null);
323320
final Object oldEntry = requests.put(requestId, requestFuture);
324321
assert oldEntry == null : "requestID '" + requestId + "' already present in requestFutureMap";
325322
return requestFuture;
@@ -367,19 +364,19 @@ protected void internalEventHandler(final RingBufferEvent event, final long sequ
367364
replyDomainObject = ioClassSerialiser.deserialiseObject(reqClassType);
368365
ioClassSerialiser.setDataBuffer(byteBuffer); // allow received byte array to be released
369366
}
370-
if (notifyFuture) {
371-
domainObject.future.castAndSetReply(replyDomainObject); // notify callback
367+
final Object contextObject;
368+
if (domainObject.future.contextType == null || domainObject.future.contextType.equals(Map.class)) {
369+
contextObject = QueryParameterParser.getMap(endpointURI.getQuery());
370+
} else {
371+
contextObject = QueryParameterParser.parseQueryParameter(domainObject.future.contextType, endpointURI.getQuery());
372372
}
373373
if (domainObject.future.listener != null) {
374374
final var finalDomainObj = replyDomainObject;
375-
final Object contextObject;
376-
if (domainObject.future.contextType == null) {
377-
contextObject = QueryParameterParser.getMap(endpointURI.getQuery());
378-
} else {
379-
contextObject = QueryParameterParser.parseQueryParameter(domainObject.future.contextType, endpointURI.getQuery());
380-
}
381375
executor.submit(() -> domainObject.future.notifyListener(finalDomainObj, contextObject)); // NOPMD - threads are ok, not a webapp
382376
}
377+
if (notifyFuture) {
378+
domainObject.future.castAndSetReplyWithContext(replyDomainObject, contextObject); // notify callback
379+
}
383380
} catch (Exception e) { // NOPMD: exception is forwarded to client
384381
final var sw = new StringWriter();
385382
final var pw = new PrintWriter(sw);
@@ -453,18 +450,18 @@ private Client() { // accessed via outer class method
453450
clientSocket.connect(inprocCtrl);
454451
}
455452

456-
public <R, C> Future<R> get(URI endpoint, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
453+
public <R, C> ThePromisedFuture<R, C> get(URI endpoint, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
457454
final String requestId = clientId + internalReqIdGenerator.incrementAndGet();
458455
final URI endpointQuery = getEndpointQuery(endpoint, requestContext);
459-
final ThePromisedFuture<R, ?> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.GET_REQUEST, requestId);
456+
final ThePromisedFuture<R, C> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.GET_REQUEST, requestId);
460457
request(requestId, Command.GET_REQUEST, endpointQuery, null, requestContext, rbacProvider);
461458
return rThePromisedFuture;
462459
}
463460

464-
public <R, C> Future<R> set(final URI endpoint, final R requestBody, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
461+
public <R, C> ThePromisedFuture<R, C> set(final URI endpoint, final R requestBody, final C requestContext, final Class<R> requestedDomainObjType, final RbacProvider... rbacProvider) {
465462
final String requestId = clientId + internalReqIdGenerator.incrementAndGet();
466463
final URI endpointQuery = getEndpointQuery(endpoint, requestContext);
467-
final ThePromisedFuture<R, ?> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.SET_REQUEST, requestId);
464+
final ThePromisedFuture<R, C> rThePromisedFuture = newRequestFuture(endpointQuery, requestedDomainObjType, Command.SET_REQUEST, requestId);
468465
request(requestId, Command.SET_REQUEST, endpointQuery, requestBody, requestContext, rbacProvider);
469466
return rThePromisedFuture;
470467
}
@@ -551,8 +548,9 @@ private <R, C> void request(final String requestId, final Command requestType, f
551548
}
552549
}
553550

554-
protected static class ThePromisedFuture<R, C> extends CustomFuture<R> { // NOPMD - no need for setters/getters here
551+
public static class ThePromisedFuture<R, C> extends CustomFuture<R> { // NOPMD - no need for setters/getters here
555552
private final URI endpoint;
553+
private C replyContext;
556554
private final Class<R> requestedDomainObjType;
557555
private final Class<C> contextType;
558556
private final Command requestType;
@@ -586,10 +584,12 @@ public String getInternalRequestID() {
586584
}
587585

588586
public void notifyListener(final Object obj, final Object contextObject) {
589-
if (obj == null || !requestedDomainObjType.isAssignableFrom(obj.getClass()) || !contextType.isAssignableFrom(contextObject.getClass())) {
587+
if (obj == null || !requestedDomainObjType.isAssignableFrom(obj.getClass())) {
590588
LOGGER.atError().addArgument(requestedDomainObjType.getName()).addArgument(obj == null ? "null" : obj.getClass().getName()).log("Got wrong type for notification, got {} expected {}");
589+
} else if (contextType != null && !contextType.isAssignableFrom(contextObject.getClass())) {
590+
LOGGER.atError().addArgument(contextObject.getClass().getName()).addArgument(contextType.getName()).log("Got wrong context type for notification, got {} expected {}");
591591
} else {
592-
//noinspection unchecked - cast is checked dynamically
592+
// noinspection unchecked - cast is checked dynamically
593593
listener.dataUpdate((R) obj, (C) contextObject); // NOPMD NOSONAR - cast is checked before implicitly
594594
}
595595
}
@@ -598,6 +598,17 @@ public void notifyListener(final Object obj, final Object contextObject) {
598598
protected void castAndSetReply(final Object newValue) {
599599
this.setReply((R) newValue);
600600
}
601+
602+
@SuppressWarnings("unchecked")
603+
protected void castAndSetReplyWithContext(final Object newValue, final Object contextObject) {
604+
this.replyContext = (C) contextObject;
605+
this.setReply((R) newValue);
606+
}
607+
608+
public C getReplyContext() throws ExecutionException, InterruptedException {
609+
super.get();
610+
return this.replyContext;
611+
}
601612
}
602613

603614
protected static class InternalDomainObject {

client/src/test/java/io/opencmw/client/OpenCmwDataSourceTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.io.IOException;
99
import java.net.URI;
1010
import java.time.Duration;
11+
import java.util.List;
1112
import java.util.Map;
1213
import java.util.Objects;
1314
import java.util.Queue;
@@ -257,18 +258,21 @@ void testGetRequest() throws InterruptedException, ExecutionException, TimeoutEx
257258
// get request
258259
final URI requestURI = URI.create(brokerAddress + "/testWorker?ctx=FAIR.SELECTOR.C=3&contentType=application/octet-stream");
259260
LOGGER.atDebug().addArgument(requestURI).log("requesting GET from endpoint: {}");
260-
final Future<TestObject> future;
261+
final DataSourcePublisher.ThePromisedFuture<TestObject, Map<String, List<String>>> future;
261262
try (final DataSourcePublisher.Client client = dataSourcePublisher.getClient()) {
262263
future = client.get(requestURI, null, TestObject.class); // uri_without_query oder serviceName + resolver, requestContext, type
263264
}
264265

265266
// assert result
266267
final TestObject result = future.get(1000, TimeUnit.MILLISECONDS);
267268
assertEquals(referenceObject, result);
269+
final Map<String, List<String>> context = future.getReplyContext();
270+
assertEquals("FAIR.SELECTOR.C=3:P=5", context.get("ctx").get(0));
268271

269272
eventStore.stop();
270273
dataSourcePublisher.stop();
271274
}
275+
272276
@Test
273277
void testGetRequestWithContext() throws InterruptedException, ExecutionException, TimeoutException {
274278
final TestObject referenceObject = new TestObject("asdf", 42);

core/src/main/java/io/opencmw/utils/CustomFuture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,18 @@ public boolean isDone() {
9292
* @throws IllegalStateException in case this method has been already called or Future has been cancelled
9393
*/
9494
public void setReply(final T newValue) {
95+
this.reply = newValue;
9596
if (done.getAndSet(true)) {
9697
throw new IllegalStateException("future is not running anymore (either cancelled or already notified)");
9798
}
98-
this.reply = newValue;
9999
notifyListener();
100100
}
101101

102102
public void setException(final Throwable exception) {
103+
this.exception = exception;
103104
if (done.getAndSet(true)) {
104105
throw new IllegalStateException("future is not running anymore (either cancelled or already notified)");
105106
}
106-
this.exception = exception;
107107
notifyListener();
108108
}
109109

server/src/main/java/io/opencmw/server/MajordomoWorker.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,7 @@ protected void serialiseData(final IoClassSerialiser classSerialiser, final IoBu
214214
final URI reqTopic = rawCtx.req.topic;
215215
rawCtx.rep.topic = new URI(reqTopic.getScheme(), reqTopic.getAuthority(), reqTopic.getPath(), replyQuery, reqTopic.getFragment());
216216
} else {
217-
final String oldQuery = rawCtx.rep.topic.getQuery();
218-
final String newQuery = oldQuery == null || oldQuery.isBlank() ? replyQuery : (oldQuery + "&" + replyQuery);
219-
rawCtx.rep.topic = new URI(rawCtx.rep.topic.getScheme(), rawCtx.rep.topic.getAuthority(), rawCtx.rep.topic.getPath(), newQuery, null);
217+
rawCtx.rep.topic = new URI(rawCtx.rep.topic.getScheme(), rawCtx.rep.topic.getAuthority(), rawCtx.rep.topic.getPath(), replyQuery, null);
220218
}
221219
final MimeType replyMimeType = QueryParameterParser.getMimeType(replyQuery);
222220
// no MIME type given -> stick with the one specified in the request (if it exists) or keep default: copy of raw binary data

0 commit comments

Comments
 (0)