Skip to content

Commit 424f4fb

Browse files
Implement progressive call results
1 parent 2857299 commit 424f4fb

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

lib/src/session.dart

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class Session {
3030
}
3131

3232
final Map<int, Completer<Result>> _callRequests = {};
33+
final Map<int, Function(List<dynamic> args, Map<String, dynamic> kwargs)> _progressHandlerByRequestID = {};
3334
final Map<int, RegisterRequest> _registerRequests = {};
3435
final Map<int, Result Function(Invocation)> _registrations = {};
3536
final Map<int, UnregisterRequest> _unregisterRequests = {};
@@ -40,9 +41,18 @@ class Session {
4041

4142
void _processIncomingMessage(msg.Message message) {
4243
if (message is msg.Result) {
43-
var request = _callRequests.remove(message.requestID);
44-
if (request != null) {
45-
request.complete(Result(args: message.args, kwargs: message.kwargs, details: message.details));
44+
var progress = message.details["progress"] ?? false;
45+
if (progress) {
46+
var progressHandler = _progressHandlerByRequestID[message.requestID];
47+
if (progressHandler != null) {
48+
progressHandler(message.args, message.kwargs);
49+
}
50+
} else {
51+
var request = _callRequests.remove(message.requestID);
52+
if (request != null) {
53+
request.complete(Result(args: message.args, kwargs: message.kwargs, details: message.details));
54+
}
55+
_progressHandlerByRequestID.remove(message.requestID);
4656
}
4757
} else if (message is msg.Registered) {
4858
var request = _registerRequests.remove(message.requestID);
@@ -53,7 +63,15 @@ class Session {
5363
} else if (message is msg.Invocation) {
5464
var endpoint = _registrations[message.registrationID];
5565
if (endpoint != null) {
56-
Result result = endpoint(Invocation(args: message.args, kwargs: message.kwargs, details: message.details));
66+
var invocation = Invocation(args: message.args, kwargs: message.kwargs, details: message.details);
67+
if (message.details["receive_progress"] ?? false) {
68+
invocation.sendProgress = (args, kwargs) {
69+
var yield = msg.Yield(message.requestID, args: args, kwargs: kwargs, options: {"progress": true});
70+
var data = _wampSession.sendMessage(yield);
71+
_baseSession.send(data);
72+
};
73+
}
74+
Result result = endpoint(invocation);
5775
Object data = _wampSession.sendMessage(
5876
msg.Yield(message.requestID, args: result.args, kwargs: result.kwargs, options: result.details),
5977
);
@@ -133,6 +151,24 @@ class Session {
133151
return completer.future;
134152
}
135153

154+
Future<Result> callProgressive(
155+
String procedure,
156+
Function(List<dynamic> args, Map<String, dynamic> kwargs) progressHandler, {
157+
List<dynamic>? args,
158+
Map<String, dynamic>? kwargs,
159+
Map<String, dynamic>? options,
160+
}) async {
161+
var call = msg.Call(_nextID, procedure, args: args, kwargs: kwargs, options: {"receive_progress": true});
162+
163+
var completer = Completer<Result>();
164+
_callRequests[call.requestID] = completer;
165+
166+
_baseSession.send(_wampSession.sendMessage(call));
167+
_progressHandlerByRequestID[call.requestID] = progressHandler;
168+
169+
return completer.future;
170+
}
171+
136172
Future<Registration> register(String procedure, Result Function(Invocation) endpoint) {
137173
var register = msg.Register(_nextID, procedure);
138174

lib/src/types.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ class Invocation {
162162
final List<dynamic> args;
163163
final Map<String, dynamic> kwargs;
164164
final Map<String, dynamic> details;
165+
166+
late Function(List<dynamic> args, Map<String, dynamic> kwargs) sendProgress;
165167
}
166168

167169
class UnregisterRequest {

0 commit comments

Comments
 (0)