Skip to content

Commit 3b2643d

Browse files
committed
Update RegisterAndInvoke to 1.5.8
* Update registerandinvoke feature file to 1.5.8 * Update Java and Python Test Agents to support 1.5.8
1 parent 76a0b73 commit 3b2643d

6 files changed

Lines changed: 64 additions & 49 deletions

File tree

test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,19 @@ private static Object handleInvokeMethodCommand(Map<String, Object> jsonData) {
164164
Map<String, Object> data = (Map<String, Object>) jsonData.get("data");
165165
// Convert data and payload to protocol buffers
166166
UUri uri = (UUri) ProtoConverter.dictToProto(data, UUri.newBuilder());
167-
ByteString payloadBytes = (ByteString) data.get("data");
168-
UPayloadFormat format = (UPayloadFormat) data.get("format");
169-
UPayload payload = new UPayload(payloadBytes, format);
170-
CompletionStage<UPayload> responseFuture = transport.invokeMethod(uri, payload,
171-
CallOptions.DEFAULT);
167+
String payload = (String) data.get("payload");
168+
ByteString value = null;
169+
if (payload instanceof String && (payload).startsWith("BYTES:")) {
170+
String byteString = (payload).substring(6); // Remove 'BYTES:' prefix
171+
value = ByteString.copyFromUtf8(byteString);
172+
} else if (payload instanceof String) {
173+
value = ByteString.copyFromUtf8(payload);
174+
}
175+
176+
UPayload setPayload = new UPayload(value, UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
177+
CompletionStage<UPayload> responseFuture = transport.invokeMethod(uri, setPayload, CallOptions.DEFAULT);
172178
responseFuture.whenComplete((responseMessage, exception) -> {
173-
sendToTestManager(responseMessage, ActionCommands.INVOKE_METHOD_COMMAND, (String) jsonData.get("test_id"));
179+
sendToTestManager(Map.of("payload", responseMessage.data().toStringUtf8()), ActionCommands.INVOKE_METHOD_COMMAND, (String) jsonData.get("test_id"));
174180
});
175181
return null;
176182
}

test_agent/python/testagent.py

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,20 +66,17 @@ class SocketUListener(UListener):
6666
def on_receive(self, umsg: UMessage) -> None:
6767
logger.info("Listener received")
6868
if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST:
69-
attributes = UMessageBuilder.response(
70-
umsg.attributes.sink,
71-
umsg.attributes.source,
72-
umsg.attributes.id,
73-
).build()
7469
any_obj = any_pb2.Any()
7570
any_obj.Pack(StringValue(value="SuccessRPCResponse"))
76-
res_msg = UMessage(
77-
attributes=attributes,
78-
payload=UPayload(
79-
value=any_obj.SerializeToString(),
80-
format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
81-
),
71+
payload = UPayload(
72+
data=any_obj.SerializeToString(),
73+
format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
8274
)
75+
res_msg = UMessageBuilder.response(
76+
umsg.attributes.sink,
77+
umsg.attributes.source,
78+
umsg.attributes.id,
79+
).build_from_upayload(payload)
8380
transport.send(res_msg)
8481
else:
8582
send_to_test_manager(umsg, actioncommands.RESPONSE_ON_RECEIVE)
@@ -201,8 +198,13 @@ def handle_unregister_listener_command(json_msg):
201198

202199
def handle_invoke_method_command(json_msg):
203200
uri = dict_to_proto(json_msg["data"], UUri())
204-
payload = dict_to_proto(json_msg["data"]["payload"], UPayload())
205-
res_future: Future = transport.invoke_method(uri, payload, CallOptions(ttl=10000))
201+
logger.info(json_msg["data"]["payload"])
202+
value = json_msg["data"]["payload"]
203+
if isinstance(value, str) and "BYTES:" in value:
204+
value = value.replace("BYTES:", "")
205+
value = value.encode("utf-8")
206+
payload = UPayload(data = value)
207+
res_future: Future = transport.invoke_method(uri, payload, CallOptions(timeout=10000))
206208

207209
def handle_response(message):
208210
message: Message = message.result()

test_manager/features/tests/transport_rpc/register_and_invoke.feature

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,38 +25,31 @@
2525
Feature: Testing RPC Functionality
2626

2727
Scenario Outline: To test the registerlistener and invoke_method apis
28-
Given "<uE1>" creates data for "registerlistener"
29-
And sets "entity.name" to "body.access"
30-
And sets "resource.name" to "door"
31-
And sets "resource.instance" to "front_left"
32-
And sets "resource.message" to "Door"
28+
Given "uE1" creates data for "registerlistener"
29+
And sets "ue_id" to "12345"
30+
And sets "ue_version_major" to "1"
31+
And sets "resource_id" to "32769"
3332

3433
When sends "registerlistener" request
3534
Then the status received with "code" is "OK"
3635

37-
Given "<uE2>" creates data for "invokemethod"
38-
And sets "entity.name" to "body.access"
39-
And sets "resource.name" to "door"
40-
And sets "resource.instance" to "front_left"
41-
And sets "resource.message" to "Door"
42-
And sets "payload.format" to "UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY"
43-
And sets "payload.value" to b".type.googleapis.com/google.protobuf.Int32Value\x12\x02\x08\x03"
36+
Given "uE2" creates data for "invokemethod"
37+
And sets "ue_id" to "12345"
38+
And sets "ue_version_major" to "1"
39+
And sets "resource_id" to "32769"
40+
And sets "payload" to b".type.googleapis.com/google.protobuf.Int32Value\x12\x02\x08\x03"
4441

4542
When sends "invokemethod" request
46-
Then "<uE2>" receives data field "payload.value" as b"\n/type.googleapis.com/google.protobuf.StringValue\x12\x14\n\x12SuccessRPCResponse"
43+
Then "uE2" receives data field "payload" as b"\n/type.googleapis.com/google.protobuf.StringValue\x12\x14\n\x12SuccessRPCResponse"
4744

48-
Given "<uE1>" creates data for "unregisterlistener"
49-
And sets "entity.name" to "body.access"
50-
And sets "resource.name" to "door"
51-
And sets "resource.instance" to "front_left"
52-
And sets "resource.message" to "Door"
45+
Given "uE1" creates data for "unregisterlistener"
46+
And sets "ue_id" to "12345"
47+
And sets "ue_version_major" to "1"
48+
And sets "resource_id" to "32769"
5349

5450
When sends "unregisterlistener" request
5551
Then the status received with "code" is "OK"
5652

5753
Examples:
58-
| uE1 | uE2 |
59-
| java | java |
60-
| python | python |
61-
| java | python |
62-
| python | java |
54+
| ignore | ignore |
55+
| ignore | ignore |

test_manager/testData/workflow_test_data.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@
2525
"ue2": ["rust"],
2626
"transports": ["zenoh"]
2727
},
28+
{
29+
"feature_name" : "register_and_invoke",
30+
"path": "transport_rpc",
31+
"ue1": ["python", "java"],
32+
"ue2": ["python", "java"],
33+
"transports": ["socket"]
34+
},
2835
{
2936
"feature_name" : "uri_deserializer",
3037
"path": "serializers",

up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class SocketUTransport implements UTransport, RpcClient {
5252
}
5353

5454
private final Socket socket;
55-
private final ConcurrentHashMap<UUID, CompletionStage<UMessage>> reqid_to_future;
55+
private final ConcurrentHashMap<UUID, CompletionStage<UPayload>> reqid_to_future;
5656
private final ConcurrentHashMap<UUri, ArrayList<UListener>> uri_to_listener;
5757
private final Object lock = new Object();
5858

@@ -164,10 +164,15 @@ private void notifyListeners(UUri uri, UMessage umsg) {
164164
* @param umsg The response message to handle.
165165
*/
166166
private void handleResponseMessage(UMessage umsg) {
167+
logger.info("HEYA");
168+
logger.info(umsg.toString());
169+
UPayload payload = new UPayload(umsg.getPayload(), umsg.getAttributes().getPayloadFormat());
167170
UUID requestId = umsg.getAttributes().getReqid();
168-
CompletionStage<UMessage> responseFuture = reqid_to_future.remove(requestId);
171+
logger.info(reqid_to_future.toString());
172+
CompletionStage<UPayload> responseFuture = reqid_to_future.remove(requestId);
169173
if (responseFuture != null) {
170-
responseFuture.toCompletableFuture().complete(umsg);
174+
logger.info("ITSCOMPLETE");
175+
responseFuture.toCompletableFuture().complete(payload);
171176
}
172177
}
173178

@@ -243,14 +248,16 @@ public CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UUri sinkF
243248
public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) {
244249
UMessage umsg = UMessageBuilder.request(RESPONSE_URI, methodUri, options.timeout()).build(requestPayload);
245250
UUID requestId = umsg.getAttributes().getId();
246-
CompletionStage<UMessage> responseFuture = new CompletableFuture<>();
251+
CompletionStage<UPayload> responseFuture = new CompletableFuture<>();
247252
reqid_to_future.put(requestId, responseFuture);
248253

254+
logger.info("REQIDTOFUTURETIMAP");
255+
logger.info(reqid_to_future.toString());
249256
Thread timeoutThread = new Thread(() -> timeoutCounter(responseFuture.toCompletableFuture(), requestId, options.timeout()));
250257
timeoutThread.start();
251258

252259
send(umsg);
253-
return CompletableFuture.completedFuture(null);
260+
return responseFuture;
254261
}
255262

256263
/**
@@ -260,7 +267,7 @@ public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPa
260267
* @param requestId The request ID associated with the response.
261268
* @param timeout The timeout duration.
262269
*/
263-
private void timeoutCounter(CompletableFuture<UMessage> responseFuture, UUID requestId, int timeout) {
270+
private void timeoutCounter(CompletableFuture<UPayload> responseFuture, UUID requestId, int timeout) {
264271
try {
265272
Thread.sleep(timeout);
266273
if (!responseFuture.isDone()) {

up_client_socket/python/socket_transport.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: Ca
196196
response = Future()
197197
self.reqid_to_future[request_id.SerializeToString()] = response
198198
# Start a thread to count the timeout
199-
timeout_thread = threading.Thread(target=timeout_counter, args=(response, request_id, options.ttl))
199+
timeout_thread = threading.Thread(target=timeout_counter, args=(response, request_id, options.timeout))
200200
timeout_thread.start()
201201

202202
self.send(umsg)

0 commit comments

Comments
 (0)