Skip to content

Implement progressive call results #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/rpc_progressive_call_results/callee.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import "dart:io";
import "package:xconn/xconn.dart";

const procedureDownload = "io.xconn.progress.download";

Future<void> main() async {
var client = Client();
var session = await client.connect("ws://localhost:8080/ws", "realm1");

// Define function to handle received Invocation for "io.xconn.progress.download"
Result downloadHandler(Invocation inv) {
const int fileSize = 100; // Simulate a file size of 100 units

for (int i = 0; i <= fileSize; i += 10) {
final int progress = i * 100 ~/ fileSize; // Calculate progress percentage
inv.sendProgress([progress], null); // Send progress
sleep(const Duration(milliseconds: 500));
}

return Result(args: ["Download complete!"]);
}

// Register procedure "io.xconn.progress.download"
var registration = await session.register(procedureDownload, downloadHandler);
print("Registered procedure $procedureDownload successfully");

// Define a signal handler to catch the interrupt signal (Ctrl+C)
ProcessSignal.sigint.watch().listen((signal) async {
await session.unregister(registration);
await session.close();
});
}
24 changes: 24 additions & 0 deletions examples/rpc_progressive_call_results/caller.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import "dart:io";

import "package:xconn/xconn.dart";

const procedureDownload = "io.xconn.progress.download";

Future<void> main() async {
var client = Client();
var session = await client.connect("ws://localhost:8080/ws", "realm1");

// Call procedure "io.xconn.progress.download"
var result = await session.callProgress(
procedureDownload,
(Result result) {
var progress = result.args[0]; // Current progress
print("Download progress: $progress%");
},
);

print(result.args[0]);

await session.close();
exit(0);
}
53 changes: 45 additions & 8 deletions lib/src/session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Session {
}

final Map<int, Completer<Result>> _callRequests = {};
final Map<int, Function(Result result)> _progressHandlers = {};
final Map<int, RegisterRequest> _registerRequests = {};
final Map<int, Result Function(Invocation)> _registrations = {};
final Map<int, UnregisterRequest> _unregisterRequests = {};
Expand All @@ -49,9 +50,18 @@ class Session {

void _processIncomingMessage(msg.Message message) {
if (message is msg.Result) {
var request = _callRequests.remove(message.requestID);
if (request != null) {
request.complete(Result(args: message.args, kwargs: message.kwargs, details: message.details));
var progress = message.details["progress"] ?? false;
if (progress) {
var progressHandler = _progressHandlers[message.requestID];
if (progressHandler != null) {
progressHandler(Result(args: message.args, kwargs: message.kwargs));
}
} else {
var request = _callRequests.remove(message.requestID);
if (request != null) {
request.complete(Result(args: message.args, kwargs: message.kwargs, details: message.details));
}
_progressHandlers.remove(message.requestID);
}
} else if (message is msg.Registered) {
var request = _registerRequests.remove(message.requestID);
Expand All @@ -62,9 +72,17 @@ class Session {
} else if (message is msg.Invocation) {
var endpoint = _registrations[message.registrationID];
if (endpoint != null) {
var invocation = Invocation(args: message.args, kwargs: message.kwargs, details: message.details);
if (message.details["receive_progress"] ?? false) {
invocation.sendProgress = (args, kwargs) {
var yield = msg.Yield(message.requestID, args: args, kwargs: kwargs, options: {"progress": true});
var data = _wampSession.sendMessage(yield);
_baseSession.send(data);
};
}
msg.Message msgToSend;
try {
var result = endpoint(Invocation(args: message.args, kwargs: message.kwargs, details: message.details));
var result = endpoint(invocation);
msgToSend = msg.Yield(message.requestID, args: result.args, kwargs: result.kwargs, options: result.details);
} on ApplicationError catch (e) {
msgToSend = msg.Error(message.messageType(), message.requestID, e.message, args: e.args, kwargs: e.kwargs);
Expand Down Expand Up @@ -158,6 +176,15 @@ class Session {
}
}

Future<Result> _call(msg.Call call) {
var completer = Completer<Result>();
_callRequests[call.requestID] = completer;

_baseSession.send(_wampSession.sendMessage(call));

return completer.future;
}

Future<Result> call(
String procedure, {
List<dynamic>? args,
Expand All @@ -166,12 +193,22 @@ class Session {
}) {
var call = msg.Call(_nextID, procedure, args: args, kwargs: kwargs, options: options);

var completer = Completer<Result>();
_callRequests[call.requestID] = completer;
return _call(call);
}

_baseSession.send(_wampSession.sendMessage(call));
Future<Result> callProgress(
String procedure,
Function(Result result) progressHandler, {
List<dynamic>? args,
Map<String, dynamic>? kwargs,
Map<String, dynamic>? options,
}) {
var call = msg.Call(_nextID, procedure, args: args, kwargs: kwargs, options: options);

return completer.future;
call.options["receive_progress"] = true;
_progressHandlers[call.requestID] = progressHandler;

return _call(call);
}

Future<Registration> register(
Expand Down
2 changes: 2 additions & 0 deletions lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class Invocation {
final List<dynamic> args;
final Map<String, dynamic> kwargs;
final Map<String, dynamic> details;

late Function(List<dynamic>? args, Map<String, dynamic>? kwargs) sendProgress;
}

class UnregisterRequest {
Expand Down
26 changes: 26 additions & 0 deletions test/session_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,30 @@ void main() {
test("unsubscribe from a topic", () async {
await session.unsubscribe(subscription);
});

test("progressive call results", () async {
await session.register("foo.bar.progress", (inv) {
// Send progress
for (int i = 1; i <= 3; i++) {
inv.sendProgress([i], null);
}
// Return final result
return Result(args: ["done"]);
});

// Store received progress updates
final List<int> progressUpdates = [];

var result = await session.callProgress("foo.bar.progress", (Result result) {
var progress = result.args[0];

// Collect received progress
progressUpdates.add(progress);
});

// Verify progressive updates received correctly
expect(progressUpdates, [1, 2, 3]);
// Verify the final result
expect(result.args[0], "done");
});
}