Skip to content

Commit 09d4702

Browse files
implement progressive call results
1 parent 9236159 commit 09d4702

File tree

8 files changed

+111
-36
lines changed

8 files changed

+111
-36
lines changed

autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -322,23 +322,32 @@ private void onPreSessionMessage(IMessage message) throws Exception {
322322
private void onMessage(IMessage message) throws Exception {
323323
if (message instanceof Result) {
324324
Result msg = (Result) message;
325+
325326
CallRequest request = getOrDefault(mCallRequests, msg.request, null);
326327
if (request == null) {
327328
throw new ProtocolError(String.format(
328329
"RESULT received for non-pending request ID %s", msg.request));
329330
}
330331

331-
mCallRequests.remove(msg.request);
332-
if (request.resultTypeRef != null) {
333-
// FIXME: check args length > 1 and == 0, and kwargs != null
334-
// we cannot currently POJO automap these cases!
335-
request.onReply.complete(mSerializer.convertValue(
336-
msg.args.get(0), request.resultTypeRef));
337-
} else if (request.resultTypeClass != null) {
338-
request.onReply.complete(mSerializer.convertValue(
339-
msg.args.get(0), request.resultTypeClass));
332+
if (msg.options.containsKey("progress") && (Boolean) msg.options.get("progress")) {
333+
if (request.options.progressHandler == null) {
334+
throw new ProtocolError("Caller not accepting progressive call result");
335+
}
336+
337+
request.options.progressHandler.onProgress(new CallResult(msg.args, msg.kwargs));
340338
} else {
341-
request.onReply.complete(new CallResult(msg.args, msg.kwargs));
339+
mCallRequests.remove(msg.request);
340+
if (request.resultTypeRef != null) {
341+
// FIXME: check args length > 1 and == 0, and kwargs != null
342+
// we cannot currently POJO automap these cases!
343+
request.onReply.complete(mSerializer.convertValue(
344+
msg.args.get(0), request.resultTypeRef));
345+
} else if (request.resultTypeClass != null) {
346+
request.onReply.complete(mSerializer.convertValue(
347+
msg.args.get(0), request.resultTypeClass));
348+
} else {
349+
request.onReply.complete(new CallResult(msg.args, msg.kwargs));
350+
}
342351
}
343352
} else if (message instanceof Subscribed) {
344353
Subscribed msg = (Subscribed) message;
@@ -452,10 +461,21 @@ private void onMessage(IMessage message) throws Exception {
452461
long callerSessionID = getOrDefault(msg.details, "caller", -1L);
453462
String callerAuthID = getOrDefault(msg.details, "caller_authid", null);
454463
String callerAuthRole = getOrDefault(msg.details, "caller_authrole", null);
455-
456-
InvocationDetails details = new InvocationDetails(
457-
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this);
458464

465+
Boolean progress = getOrDefault(msg.details, "receive_progress", false);
466+
InvocationDetails details;
467+
if (progress) {
468+
details = new InvocationDetails(
469+
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this,
470+
(args, kwargs) -> {
471+
HashMap<String, Object> options = new HashMap<>();
472+
options.put("progress", true);
473+
send(new Yield(msg.request, args, kwargs, options));
474+
});
475+
} else {
476+
details = new InvocationDetails(
477+
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this, null);
478+
}
459479
runAsync(() -> {
460480
Object result;
461481
if (registration.endpoint instanceof Supplier) {
@@ -494,22 +514,22 @@ private void onMessage(IMessage message) throws Exception {
494514
}
495515

496516
} else {
497-
send(new Yield(msg.request, invocRes.results, invocRes.kwresults));
517+
send(new Yield(msg.request, invocRes.results, invocRes.kwresults, null));
498518
}
499519
}, mExecutor);
500520
} else if (result instanceof InvocationResult) {
501521
InvocationResult res = (InvocationResult) result;
502-
send(new Yield(msg.request, res.results, res.kwresults));
522+
send(new Yield(msg.request, res.results, res.kwresults, null));
503523
} else if (result instanceof List) {
504-
send(new Yield(msg.request, (List) result, null));
524+
send(new Yield(msg.request, (List) result, null, null));
505525
} else if (result instanceof Map) {
506-
send(new Yield(msg.request, null, (Map) result));
526+
send(new Yield(msg.request, null, (Map) result, null));
507527
} else if (result instanceof Void) {
508-
send(new Yield(msg.request, null, null));
528+
send(new Yield(msg.request, null, null, null));
509529
} else {
510530
List<Object> item = new ArrayList<>();
511531
item.add(result);
512-
send(new Yield(msg.request, item, null));
532+
send(new Yield(msg.request, item, null, null));
513533
}
514534
}, mExecutor).whenCompleteAsync((aVoid, throwable) -> {
515535
// FIXME: implement better errors
@@ -1082,9 +1102,10 @@ private <T> CompletableFuture<T> reallyCall(
10821102
resultTypeReference, resultTypeClass));
10831103

10841104
if (options == null) {
1085-
send(new Call(requestID, procedure, args, kwargs, 0));
1105+
send(new Call(requestID, procedure, args, kwargs, 0, false));
10861106
} else {
1087-
send(new Call(requestID, procedure, args, kwargs, options.timeout));
1107+
boolean receiveProgress = options.progressHandler != null;
1108+
send(new Call(requestID, procedure, args, kwargs, options.timeout, receiveProgress));
10881109
}
10891110
return future;
10901111
}
@@ -1286,7 +1307,15 @@ private CompletableFuture<SessionDetails> reallyJoin(
12861307
roles.put("publisher", new HashMap<>());
12871308
roles.put("subscriber", new HashMap<>());
12881309
roles.put("caller", new HashMap<>());
1289-
roles.put("callee", new HashMap<>());
1310+
1311+
Map<String, Object> calleeFeatures = new HashMap<>();
1312+
calleeFeatures.put("progressive_call_results", true);
1313+
calleeFeatures.put("call_canceling", true);
1314+
1315+
Map<String, Object> callee = new HashMap<>();
1316+
callee.put("features", calleeFeatures);
1317+
roles.put("callee", callee);
1318+
12901319
if (mAuthenticators == null) {
12911320
send(new Hello(realm, roles));
12921321
} else {
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.crossbar.autobahn.wamp.interfaces;
2+
3+
import java.util.List;
4+
import java.util.Map;
5+
6+
public interface Progress {
7+
void sendProgress(List<Object> args, Map<String, Object> kwargs);
8+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.crossbar.autobahn.wamp.interfaces;
2+
3+
import io.crossbar.autobahn.wamp.types.CallResult;
4+
5+
public interface ProgressHandler {
6+
void onProgress(CallResult result);
7+
}

autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Call.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ public class Call implements IMessage {
3434
public final List<Object> args;
3535
public final Map<String, Object> kwargs;
3636
public final int timeout;
37+
public final boolean receiveProgress;
3738

38-
public Call(long request, String procedure, List<Object> args, Map<String, Object> kwargs, int timeout) {
39+
public Call(long request, String procedure, List<Object> args, Map<String, Object> kwargs,
40+
int timeout, boolean receiveProgress) {
3941
this.request = request;
4042
this.procedure = procedure;
4143
this.args = args;
@@ -45,6 +47,7 @@ public Call(long request, String procedure, List<Object> args, Map<String, Objec
4547
} else {
4648
this.timeout = timeout;
4749
}
50+
this.receiveProgress = receiveProgress;
4851
}
4952

5053
public static Call parse(List<Object> wmsg) {
@@ -69,7 +72,9 @@ public static Call parse(List<Object> wmsg) {
6972

7073
int timeout = getOrDefault(options, "timeout", TIMEOUT_DEFAULT);
7174

72-
return new Call(request, procedure, args, kwargs, timeout);
75+
boolean receiveProgress = getOrDefault(options, "receive_progress", false);
76+
77+
return new Call(request, procedure, args, kwargs, timeout, receiveProgress);
7378
}
7479

7580
@Override
@@ -78,6 +83,7 @@ public List<Object> marshal() {
7883
marshaled.add(MESSAGE_TYPE);
7984
marshaled.add(request);
8085
Map<String, Object> options = new HashMap<>();
86+
options.put("receive_progress", receiveProgress);
8187
if (timeout > TIMEOUT_DEFAULT) {
8288
options.put("timeout", timeout);
8389
}

autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Result.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,20 @@ public class Result implements IMessage {
2626
public final long request;
2727
public final List<Object> args;
2828
public final Map<String, Object> kwargs;
29+
public final Map<String, Object> options;
2930

30-
public Result(long request, List<Object> args, Map<String, Object> kwargs) {
31+
public Result(long request, List<Object> args, Map<String, Object> kwargs, Map<String, Object> options) {
3132
this.request = request;
3233
this.args = args;
3334
this.kwargs = kwargs;
35+
this.options = options;
3436
}
3537

3638
public static Result parse(List<Object> wmsg) {
3739
MessageUtil.validateMessage(wmsg, MESSAGE_TYPE, "RESULT", 3, 5);
3840

3941
long request = MessageUtil.parseLong(wmsg.get(1));
42+
Map<String, Object> options = (Map<String, Object>) wmsg.get(2);
4043
List<Object> args = null;
4144
if (wmsg.size() > 3) {
4245
if (wmsg.get(3) instanceof byte[]) {
@@ -48,15 +51,19 @@ public static Result parse(List<Object> wmsg) {
4851
if (wmsg.size() > 4) {
4952
kwargs = (Map<String, Object>) wmsg.get(4);
5053
}
51-
return new Result(request, args, kwargs);
54+
return new Result(request, args, kwargs, options);
5255
}
5356

5457
@Override
5558
public List<Object> marshal() {
5659
List<Object> marshaled = new ArrayList<>();
5760
marshaled.add(MESSAGE_TYPE);
5861
marshaled.add(request);
59-
marshaled.add(Collections.emptyMap());
62+
if (options == null) {
63+
marshaled.add(Collections.emptyMap());
64+
} else {
65+
marshaled.add(options);
66+
}
6067
if (kwargs != null) {
6168
if (args == null) {
6269
// Empty args.

autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Yield.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ public class Yield implements IMessage {
2828
public final long request;
2929
public final List<Object> args;
3030
public final Map<String, Object> kwargs;
31+
public final Map<String, Object> options;
3132

32-
public Yield(long request, List<Object> args, Map<String, Object> kwargs) {
33+
public Yield(long request, List<Object> args, Map<String, Object> kwargs, Map<String, Object> options) {
3334
this.request = request;
3435
this.args = args;
3536
this.kwargs = kwargs;
37+
this.options = options;
3638
}
3739

3840
public static Yield parse(List<Object> wmsg) {
@@ -50,16 +52,19 @@ public static Yield parse(List<Object> wmsg) {
5052
if (wmsg.size() > 4) {
5153
kwargs = (Map<String, Object>) wmsg.get(4);
5254
}
53-
return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs);
55+
return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs, options);
5456
}
5557

5658
@Override
5759
public List<Object> marshal() {
5860
List<Object> marshaled = new ArrayList<>();
5961
marshaled.add(MESSAGE_TYPE);
6062
marshaled.add(request);
61-
// Empty options.
62-
marshaled.add(Collections.emptyMap());
63+
if (options == null) {
64+
marshaled.add(Collections.emptyMap());
65+
} else {
66+
marshaled.add(options);
67+
}
6368
if (kwargs != null) {
6469
if (args == null) {
6570
// Empty args.

autobahn/src/main/java/io/crossbar/autobahn/wamp/types/CallOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,22 @@
1111

1212
package io.crossbar.autobahn.wamp.types;
1313

14+
import io.crossbar.autobahn.wamp.interfaces.ProgressHandler;
15+
1416
public class CallOptions {
15-
public final int timeout;
17+
public int timeout;
18+
public ProgressHandler progressHandler;
1619

1720
public CallOptions(int timeout) {
1821
this.timeout = timeout;
1922
}
23+
24+
public CallOptions(ProgressHandler progressHandler) {
25+
this.progressHandler = progressHandler;
26+
}
27+
28+
public CallOptions(int timeout, ProgressHandler progressHandler) {
29+
this.timeout = timeout;
30+
this.progressHandler = progressHandler;
31+
}
2032
}

autobahn/src/main/java/io/crossbar/autobahn/wamp/types/InvocationDetails.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package io.crossbar.autobahn.wamp.types;
1313

1414
import io.crossbar.autobahn.wamp.Session;
15+
import io.crossbar.autobahn.wamp.interfaces.Progress;
1516

1617
public class InvocationDetails {
1718

@@ -33,18 +34,18 @@ public class InvocationDetails {
3334
// The WAMP session on which this event is delivered.
3435
public final Session session;
3536

36-
// FIXME
37-
// we need a progress() callback here to allow
38-
// the user to produce progressive results.
37+
// callback produce progressive results.
38+
public final Progress progress;
3939

4040
// XXXX - Tentative, the constructor parameter order may change.
4141
public InvocationDetails(Registration registration, String procedure, long callerSessionID,
42-
String callerAuthID, String callerAuthRole, Session session) {
42+
String callerAuthID, String callerAuthRole, Session session, Progress progress) {
4343
this.registration = registration;
4444
this.procedure = procedure;
4545
this.callerSessionID = callerSessionID;
4646
this.callerAuthID = callerAuthID;
4747
this.callerAuthRole = callerAuthRole;
4848
this.session = session;
49+
this.progress = progress;
4950
}
5051
}

0 commit comments

Comments
 (0)