diff --git a/README.adoc b/README.adoc index e7df5c2..8df54f7 100644 --- a/README.adoc +++ b/README.adoc @@ -66,20 +66,19 @@ $ python setup.py bdist_wheel === Usage -$ pip install uprotocol_vsomeip_python-0.1.0.dev0-py3-none-any.whl +$ pip install uprotocol_vsomeip_python-0.2.0.dev0-py3-none-any.whl uPClient vsomeip Transport [source] ---- -from uprotocol_vsomeip.vsomeip_utransport import VsomeipTransport -from uprotocol_vsomeip.vsomeip_utransport import VsomeipHelper +from uprotocol_vsomeip.vsomeip_utransport import VsomeipHelper, VsomeipTransport # Create a helper class and override the services_info method to start the mock services by Vsomeip class Helper(VsomeipHelper): def services_info(self) -> List[VsomeipHelper.UEntityInfo]: - return [VsomeipHelper.UEntityInfo(Name="entity", Id=1, Events=[0, 1, 2, 3, 4, 5, 6, 7, 8, 10], Port=30509, MajorVersion=1)] + return [VsomeipHelper.UEntityInfo(Name="entity", Id=1, Events=[0x8000], Port=30509, MajorVersion=1)] # Create an object of Vsomeip transport to use and pass on the above helper class created someip = VsomeipTransport(helper=Helper()) diff --git a/requirements.txt b/requirements.txt index 4048dbf..a648766 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -up-python==0.1.2-dev -vsomeip_py @git+https://github.com/COVESA/vsomeip_py.git@16f3e4c \ No newline at end of file +vsomeip_py @git+https://github.com/COVESA/vsomeip_py.git@825061d +up-python==0.2.0.dev0 \ No newline at end of file diff --git a/setup.py b/setup.py index 95f21bd..46dcfe5 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ setup( name=project_name, - version="0.1.0-dev", + version="0.2.0-dev", python_requires=">=3.8", description="", author="", diff --git a/uprotocol_vsomeip/examples/publish.py b/uprotocol_vsomeip/examples/publish.py new file mode 100644 index 0000000..fc879a1 --- /dev/null +++ b/uprotocol_vsomeip/examples/publish.py @@ -0,0 +1,72 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import logging +from typing import List + +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.v1.uattributes_pb2 import UPayloadFormat +from uprotocol.v1.uri_pb2 import UUri + +from uprotocol_vsomeip.vsomeip_utransport import VsomeipHelper, VsomeipTransport + +logger = logging.getLogger() +LOG_FORMAT = "%(asctime)s [%(levelname)s] @ %(filename)s.%(module)s.%(funcName)s:%(lineno)d \n %(message)s" +logging.basicConfig(format=LOG_FORMAT, level=logging.getLevelName("DEBUG")) + + +class Helper(VsomeipHelper): + """ + Helper class to provide list of services to be offered + """ + + def services_info(self) -> List[VsomeipHelper.UEntityInfo]: + return [ + VsomeipHelper.UEntityInfo( + service_id=1, + events=[0x8000], + port=30509, + major_version=1, + ) + ] + + +someip = VsomeipTransport(helper=Helper(), source=UUri(ue_id=1, ue_version_major=1, resource_id=0)) +uuri = UUri(ue_id=1, ue_version_major=1, resource_id=0x8000) + + +async def publish(): + """ + Publish data to a topic + """ + data = "Hello World!" + payload = UPayload.pack_from_data_and_format(data.encode("utf-8"), UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + message = UMessageBuilder.publish(uuri).build_from_upayload(payload) + logger.debug("Sending %s to %s...", data, uuri) + await someip.send(message) + + +async def main() -> None: + """ + Main function to demonstrate publish and subscribe + """ + while True: + await publish() + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/uprotocol_vsomeip/examples/publish_subscribe.py b/uprotocol_vsomeip/examples/publish_subscribe_unsubscribe.py similarity index 53% rename from uprotocol_vsomeip/examples/publish_subscribe.py rename to uprotocol_vsomeip/examples/publish_subscribe_unsubscribe.py index 0bcb656..36053be 100644 --- a/uprotocol_vsomeip/examples/publish_subscribe.py +++ b/uprotocol_vsomeip/examples/publish_subscribe_unsubscribe.py @@ -12,17 +12,18 @@ SPDX-License-Identifier: Apache-2.0 """ +import asyncio import logging import time from typing import List -from uprotocol.proto.uattributes_pb2 import UPriority -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UEntity, UResource, UUri -from uprotocol.proto.ustatus_pb2 import UStatus -from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder from uprotocol.transport.ulistener import UListener +from uprotocol.v1.uattributes_pb2 import UPayloadFormat +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus from uprotocol_vsomeip.vsomeip_utransport import VsomeipHelper, VsomeipTransport @@ -30,10 +31,6 @@ LOG_FORMAT = "%(asctime)s [%(levelname)s] @ %(filename)s.%(module)s.%(funcName)s:%(lineno)d \n %(message)s" logging.basicConfig(format=LOG_FORMAT, level=logging.getLevelName("DEBUG")) -""" -Publish Subscribe Example -""" - class Helper(VsomeipHelper): """ @@ -43,32 +40,27 @@ class Helper(VsomeipHelper): def services_info(self) -> List[VsomeipHelper.UEntityInfo]: return [ VsomeipHelper.UEntityInfo( - Name="publisher", - Id=1, - Events=[0, 1, 2, 3, 4, 5, 6, 7, 8, 10], - Port=30509, - MajorVersion=1, + service_id=1, + events=[0x8000], + port=30509, + major_version=1, ) ] -someip = VsomeipTransport(helper=Helper()) -uuri = UUri( - entity=UEntity(name="publisher", id=1, version_major=1, version_minor=1), - resource=UResource(name="door", instance="front_left", message="Door", id=5), -) +someip = VsomeipTransport(helper=Helper(), source=UUri(ue_id=1, ue_version_major=1, resource_id=0)) +uuri = UUri(ue_id=1, ue_version_major=1, resource_id=0x8000) -def publish(): +async def publish(): """ Publish data to a topic """ data = "Hello World!" - attributes = UAttributesBuilder.publish(uuri, UPriority.UPRIORITY_CS4).build() - payload = UPayload(value=data.encode("utf-8"), format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) - message = UMessage(attributes=attributes, payload=payload) - logger.debug(f"Sending {data} to {uuri}...") - someip.send(message) + payload = UPayload.pack_from_data_and_format(data.encode("utf-8"), UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + message = UMessageBuilder.publish(uuri).build_from_upayload(payload) + logger.debug("Sending %s to %s...", data, uuri) + await someip.send(message) class MyListener(UListener): @@ -76,7 +68,7 @@ class MyListener(UListener): Listener class to define callback """ - def on_receive(self, message: UMessage) -> UStatus: + async def on_receive(self, message: UMessage) -> UStatus: """ on_receive call back method :param message: @@ -84,8 +76,8 @@ def on_receive(self, message: UMessage) -> UStatus: """ logger.debug( "listener -> id: %s, data: %s", - message.attributes.source.resource.id, - message.payload.value, + message.attributes.source.resource_id, + message.payload, ) return UStatus(message="Received event") @@ -93,26 +85,33 @@ def on_receive(self, message: UMessage) -> UStatus: listener = MyListener() -def subscribe(): +async def subscribe(): """ Subscribe to a topic """ - someip.register_listener(uuri, listener) + await someip.register_listener(source_filter=uuri, listener=listener) -def unsubscribe(): +async def unsubscribe(): """ Unsubscribe to a topic """ - someip.unregister_listener(uuri, listener) + await someip.unregister_listener(source_filter=uuri, listener=listener) -if __name__ == "__main__": - subscribe() +async def main() -> None: + """ + Main function to demonstrate publish and subscribe + """ + await subscribe() time.sleep(1) - publish() + await publish() time.sleep(5) - unsubscribe() + await unsubscribe() time.sleep(1) - publish() + await publish() time.sleep(5) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/uprotocol_vsomeip/examples/rpc.py b/uprotocol_vsomeip/examples/rpc.py deleted file mode 100644 index f92594b..0000000 --- a/uprotocol_vsomeip/examples/rpc.py +++ /dev/null @@ -1,117 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -import logging -import time -from datetime import datetime -from typing import List - -from uprotocol.proto.uattributes_pb2 import CallOptions -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UEntity, UUri -from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder -from uprotocol.transport.ulistener import UListener -from uprotocol.uri.factory.uresource_builder import UResourceBuilder - -from uprotocol_vsomeip.vsomeip_utransport import VsomeipHelper, VsomeipTransport - -logger = logging.getLogger() -LOG_FORMAT = "%(asctime)s [%(levelname)s] @ %(filename)s.%(module)s.%(funcName)s:%(lineno)d \n %(message)s" -logging.basicConfig(format=LOG_FORMAT, level=logging.getLevelName("DEBUG")) - -""" -RPC Example -""" - - -class Helper(VsomeipHelper): - """ - Helper class to provide list of services to be offered - """ - - def services_info(self) -> List[VsomeipHelper.UEntityInfo]: - return [ - VsomeipHelper.UEntityInfo( - Name="rpc_server", - Id=17, - Events=[0, 10, 11], - Port=30511, - MajorVersion=1, - ) - ] - - -someip = VsomeipTransport(helper=Helper()) -uuri = UUri( - entity=UEntity(name="rpc_server", id=17, version_major=1, version_minor=0), - resource=UResourceBuilder.for_rpc_request("getTime", 5678), -) - - -class RPCRequestListener(UListener): - """ - Listener class to define callback - """ - - def on_receive(self, msg: UMessage): - """ - on_receive call back method - :param msg: UMessage object received - :return: None - """ - print("on rpc request received") - attributes = msg.attributes - payload = msg.payload - value = "".join(chr(c) for c in payload.value) - source = attributes.source - sink = attributes.sink - logger.debug(f"Receive {value} from {source} to {sink}") - response_payload = format(datetime.utcnow()).encode("utf-8") - payload = UPayload(value=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) - attributes = UAttributesBuilder.response(msg.attributes).build() - someip.send(UMessage(attributes=attributes, payload=payload)) - - -def service(): - """ - Register an RPC Method to a Service - """ - listener = RPCRequestListener() - someip.register_listener(uuri, listener) - - -def client(): - """ - Client requesting for an RPC method - """ - data = "GetCurrentTime" - payload = UPayload( - length=0, - format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, - value=bytes([ord(c) for c in data]), - ) - logger.debug(f"Send request to {uuri.entity}/{uuri.resource}") - res_future = someip.invoke_method(uuri, payload, CallOptions(ttl=1000)) - - while not res_future.done(): - time.sleep(1) - - print("FUTURE RESULT", res_future.result()) - - -if __name__ == "__main__": - service() - time.sleep(3) - client() diff --git a/uprotocol_vsomeip/examples/rpc_client.py b/uprotocol_vsomeip/examples/rpc_client.py new file mode 100644 index 0000000..df74f4d --- /dev/null +++ b/uprotocol_vsomeip/examples/rpc_client.py @@ -0,0 +1,50 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import logging + +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import UPayloadFormat +from uprotocol.v1.uri_pb2 import UUri + +from uprotocol_vsomeip.vsomeip_utransport import VsomeipTransport + +logger = logging.getLogger() +LOG_FORMAT = "%(asctime)s [%(levelname)s] @ %(filename)s.%(module)s.%(funcName)s:%(lineno)d \n %(message)s" +logging.basicConfig(format=LOG_FORMAT, level=logging.getLevelName("DEBUG")) + + +def create_method_uri(): + """ + Create a method URI + """ + return UUri(authority_name="", ue_id=1, ue_version_major=1, resource_id=3) + + +async def send_rpc_request_to_someip(): + """ + Function to send an RPC Request + """ + transport = VsomeipTransport(source=UUri(authority_name="vehicle", ue_id=1)) + data = "GetCurrentTime" + payload = UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, data=bytes([ord(c) for c in data])) + rpc_client = InMemoryRpcClient(transport) + response_payload = await rpc_client.invoke_method(create_method_uri(), payload, None) + print("RESPONSE....", response_payload) + + +if __name__ == "__main__": + asyncio.run(send_rpc_request_to_someip()) diff --git a/uprotocol_vsomeip/examples/rpc_server.py b/uprotocol_vsomeip/examples/rpc_server.py new file mode 100644 index 0000000..6e5dd27 --- /dev/null +++ b/uprotocol_vsomeip/examples/rpc_server.py @@ -0,0 +1,87 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import logging +from datetime import datetime +from typing import List + +from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import UPayloadFormat +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + +from uprotocol_vsomeip.vsomeip_utransport import VsomeipHelper, VsomeipTransport + +logger = logging.getLogger() +LOG_FORMAT = "%(asctime)s [%(levelname)s] @ %(filename)s.%(module)s.%(funcName)s:%(lineno)d \n %(message)s" +logging.basicConfig(format=LOG_FORMAT, level=logging.getLevelName("DEBUG")) + + +class Helper(VsomeipHelper): + """ + Helper class to provide list of services to be offered + """ + + def services_info(self) -> List[VsomeipHelper.UEntityInfo]: + return [ + VsomeipHelper.UEntityInfo( + service_id=1, + events=[0, 10, 11], + port=30511, + major_version=1, + ) + ] + + +class MyRequestHandler(RequestHandler): + """ + Request Handler for the Service + """ + + def handle_request(self, msg: UMessage) -> UPayload: + logger.debug("Request Received by Service Request Handler") + attributes = msg.attributes + payload = msg.payload + source = attributes.source + sink = attributes.sink + logger.debug("Receive %s from %s to %s", payload, source, sink) + response_payload = format(datetime.utcnow()).encode("utf-8") + payload = UPayload(data=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + return payload + + +def create_method_uri(): + """ + Create a method URI + """ + return UUri(authority_name="", ue_id=1, ue_version_major=1, resource_id=3) + + +async def register_rpc(): + """ + Main function to Register RPC to a service + """ + transport = VsomeipTransport(helper=Helper(), source=UUri(ue_id=1, ue_version_major=1, resource_id=0)) + rpc_server = InMemoryRpcServer(transport) + await rpc_server.register_request_handler(create_method_uri(), MyRequestHandler()) + + while True: + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(register_rpc()) diff --git a/uprotocol_vsomeip/examples/subscribe.py b/uprotocol_vsomeip/examples/subscribe.py new file mode 100644 index 0000000..8af9168 --- /dev/null +++ b/uprotocol_vsomeip/examples/subscribe.py @@ -0,0 +1,83 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import logging + +from uprotocol.client.usubscription.v3.inmemoryusubcriptionclient import ( + InMemoryUSubscriptionClient, +) +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + +from uprotocol_vsomeip.vsomeip_utransport import VsomeipTransport + +logger = logging.getLogger() +LOG_FORMAT = "%(asctime)s [%(levelname)s] @ %(filename)s.%(module)s.%(funcName)s:%(lineno)d \n %(message)s" +logging.basicConfig(format=LOG_FORMAT, level=logging.getLevelName("DEBUG")) + +source = UUri(authority_name="vehicle", ue_id=18) +someip = VsomeipTransport(source=UUri(ue_id=1, ue_version_major=1, resource_id=0)) +uuri = UUri(ue_id=1, ue_version_major=1, resource_id=0x8000) + + +class MyListener(UListener): + """ + Listener class to define callback + """ + + async def on_receive(self, message: UMessage) -> UStatus: + """ + on_receive call back method + :param message: + :return: UStatus + """ + logger.debug( + "listener -> id: %s, data: %s", + message.attributes.source.resource_id, + message.payload, + ) + return UStatus(message="Received event") + + +listener = MyListener() + + +async def subscribe_to_someip_if_subscription_service_is_not_running(): + """ + Subscribe to a topic + """ + await someip.register_listener(source_filter=uuri, listener=listener) + + +async def subscribe_if_subscription_service_is_running(): + client = InMemoryUSubscriptionClient(someip) + await client.subscribe(uuri, MyListener()) + while True: + await asyncio.sleep(1) + + +async def main() -> None: + """ + Main function to demonstrate publish and subscribe + """ + await subscribe_to_someip_if_subscription_service_is_not_running() + while True: + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/uprotocol_vsomeip/helper.py b/uprotocol_vsomeip/helper.py index 718d06d..fb9ae30 100644 --- a/uprotocol_vsomeip/helper.py +++ b/uprotocol_vsomeip/helper.py @@ -23,13 +23,18 @@ class VsomeipHelper: @dataclass class UEntityInfo: - """ """ + """ + UEntityInfo + """ - Name: str - Id: int - Events: List[int] - Port: int - MajorVersion: int + service_id: int + events: List[int] + port: int + major_version: int def services_info(self) -> List[UEntityInfo]: + """ + services_info + :return: List of services + """ return [] diff --git a/uprotocol_vsomeip/message_conversion.py b/uprotocol_vsomeip/message_conversion.py new file mode 100644 index 0000000..b1e4ef1 --- /dev/null +++ b/uprotocol_vsomeip/message_conversion.py @@ -0,0 +1,73 @@ +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.v1.uattributes_pb2 import UPayloadFormat +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + +from uprotocol_vsomeip.utils import Utils + + +class VsomeipToUMessage: + """ + Convert a VSOMEIP message to a UProtocol message. + """ + + @staticmethod + def convert_to_publish_message(service_id, instance, event_id, data) -> UMessage: + """ + Convert a VSOMEIP message to a UProtocol Publish message. + :param service_id: The service ID. + :param instance: The instance. + :param event_id: The event ID. + :param data: bytearray data from someip + + :return: The UProtocol message. + """ + payload = UPayload.pack_from_data_and_format(bytes(data), UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) + ue_id = Utils.pack_to_u32(instance, service_id) + # fixing major version to 1 as callback doesn't have major version + message = UMessageBuilder.publish( + source=UUri( + ue_id=ue_id, resource_id=event_id, ue_version_major=1 + ) # todo: Need to reconstruct resource_id as well + ).build_from_upayload(payload) + + return message + + @staticmethod + def convert_to_request_message(service_id, instance, method_id, data, source_uri) -> UMessage: + """ + Convert a VSOMEIP message to a UProtocol Request message. + :param service_id: The service ID. + :param instance: The instance. + :param method_id: The RPC method ID. + :param data: bytearray data from someip + :param source_uri: The source URI. + + :return: The UProtocol message. + """ + payload = UPayload.pack_from_data_and_format(bytes(data), UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) + message = UMessageBuilder.request( + source=source_uri, + sink=UUri( + ue_id=Utils.pack_to_u32(instance, service_id), + ue_version_major=1, + resource_id=method_id, + ), + ttl=10000, + ).build_from_upayload(payload) + return message + + @staticmethod + def convert_to_response_message(request_attributes, data) -> UMessage: + """ + Convert a VSOMEIP message to a UProtocol Response message. + :param request_attributes: The request attributes. + :param data: bytearray data from someip + + :return: The UProtocol message. + """ + payload = UPayload.pack_from_data_and_format(bytes(data), UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) + message = UMessageBuilder.response_for_request(request_attributes).build_from_upayload(payload) + + return message diff --git a/uprotocol_vsomeip/utils.py b/uprotocol_vsomeip/utils.py new file mode 100644 index 0000000..b74619a --- /dev/null +++ b/uprotocol_vsomeip/utils.py @@ -0,0 +1,195 @@ +from enum import IntFlag +from typing import Union + +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.uri.serializer.uriserializer import UriSerializer +from uprotocol.uri.validator.urivalidator import UriValidator +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri + + +class MessageFlag(IntFlag): + """ + Message Flags for different types of messages + """ + + PUBLISH = 1 + NOTIFICATION = 2 + REQUEST = 4 + RESPONSE = 8 + + +class Utils: + """ + Utils + """ + + @staticmethod + def split_u32_to_u16(value): + """ + Split a 32-bit unsigned integer into two 16-bit unsigned integers + :param value: 32-bit unsigned integer + :return: Tuple of two 16-bit unsigned integers + """ + if not 0 <= value <= 0xFFFFFFFF: + raise ValueError("Value must be an unsigned 32-bit integer (0 to 4294967295)") + + most_significant_bits = (value >> 16) & 0xFFFF + least_significant_bits = value & 0xFFFF + + return most_significant_bits, least_significant_bits + + @staticmethod + def split_u32_to_u8(value): + """ + Split a 32-bit unsigned integer into four 8-bit unsigned integers + :param value: 32-bit unsigned integer + :return: Tuple of four 8-bit unsigned integers + """ + if not 0 <= value <= 0xFFFFFFFF: + raise ValueError("Value must be an unsigned 32-bit integer (0 to 4294967295)") + + byte1 = (value >> 24) & 0xFF + byte2 = (value >> 16) & 0xFF + byte3 = (value >> 8) & 0xFF + byte4 = value & 0xFF + + return byte1, byte2, byte3, byte4 + + @staticmethod + def pack_to_u32(value_one: int, value_two: int): + """ + Pack two integers into a 32-bit unsigned integer + :return: + """ + return ((value_one & 0xFFFF) << 16) | (value_two & 0xFFFF) + + @staticmethod + def any_uuri() -> UUri: + """ + Create a UUri that matches any URI + :return: UUri that matches any URI + """ + return UUri( + authority_name="*", + ue_id=0x0000_FFFF, # any instance, any service + ue_version_major=0xFF, # any + resource_id=0xFFFF, # any + ) + + @staticmethod + def get_listener_message_type(source_uuri: UUri, sink_uuri: UUri = None) -> Union[MessageFlag, UStatusError]: + """ + The table for mapping resource ID to message type: + + | src rid | sink rid | Publish | Notification | Request | Response | + |-------------|----------|---------|--------------|---------|----------| + | [8000-FFFF) | None | V | | | | + | [8000-FFFF) | 0 | | V | | | + | 0 | (0-8000) | | | V | | + | (0-8000) | 0 | | | | V | + | FFFF | 0 | | V | | V | + | FFFF | (0-8000) | | | V | | + | 0 | FFFF | | | V | | + | (0-8000) | FFFF | | | | V | + | [8000-FFFF) | FFFF | | V | | | + | FFFF | FFFF | | V | V | V | + + Some organization: + - Publish: {[8000-FFFF), None} + - Notification: {[8000-FFFF), 0}, {[8000-FFFF), FFFF}, {FFFF, 0}, {FFFF, FFFF} + - Request: {0, (0-8000)}, {0, FFFF}, {FFFF, (0-8000)}, {FFFF, FFFF} + - Response: {(0-8000), 0}, {(0-8000), FFFF}, (FFFF, 0), {FFFF, FFFF} + + :param source_uuri: The source UUri. + :param sink_uuri: Optional sink UUri for request-response types. + :return: MessageFlag indicating the type of message. + :raises Exception: If the combination of source UUri and sink UUri is invalid. + """ + flag = MessageFlag(0) + + rpc_range = range(1, 0x7FFF) + nonrpc_range = range(0x8000, 0xFFFE) + wildcard_resource_id = UriFactory.WILDCARD_RESOURCE_ID + + src_resource = source_uuri.resource_id + + # Notification / Request / Response + if sink_uuri: + dst_resource = sink_uuri.resource_id + + if ( + (src_resource in nonrpc_range and dst_resource == 0) + or (src_resource in nonrpc_range and dst_resource == wildcard_resource_id) + or (src_resource == wildcard_resource_id and dst_resource == 0) + or (src_resource == wildcard_resource_id and dst_resource == wildcard_resource_id) + ): + flag |= MessageFlag.NOTIFICATION + + if ( + (src_resource == 0 and dst_resource in rpc_range) + or (src_resource == 0 and dst_resource == wildcard_resource_id) + or (src_resource == wildcard_resource_id and dst_resource in rpc_range) + or (src_resource == wildcard_resource_id and dst_resource == wildcard_resource_id) + ): + flag |= MessageFlag.REQUEST + + if ( + (src_resource in rpc_range and dst_resource == 0) + or (src_resource in rpc_range and dst_resource == wildcard_resource_id) + or (src_resource == wildcard_resource_id and dst_resource == 0) + or (src_resource == wildcard_resource_id and dst_resource == wildcard_resource_id) + ): + flag |= MessageFlag.RESPONSE + + if dst_resource == wildcard_resource_id and ( + src_resource in nonrpc_range or src_resource == wildcard_resource_id + ): + flag |= MessageFlag.PUBLISH + + # Publish + elif src_resource in nonrpc_range or src_resource == wildcard_resource_id: + flag |= MessageFlag.PUBLISH + + # Error handling + if flag == MessageFlag(0): + raise UStatusError.from_code_message( + code=UCode.INTERNAL, + message="Wrong combination of source UUri and sink UUri", + ) + else: + return flag + + @staticmethod + def get_uuri_string(uri: UUri) -> str: + """ + Get the string representation of the UUri. + + :param uri: Source/Sink UUri + :return: String representation of the UUri. + """ + if uri is None: + return "" + return UriSerializer.serialize(uri) + + @staticmethod + def matches(source: str, sink: str, attributes) -> bool: + """ + Return True if the source and sink URIs match the attributes. + + :param source: The source URI. + :param sink: The sink URI. + :param attributes: The attributes to compare against + + :return: True if the source and sink URIs match the attributes. + """ + if attributes is None: + return False + source = UriSerializer.deserialize(source) + sink = UriSerializer.deserialize(sink) + if source == UriFactory.ANY: + return UriValidator.matches(sink, attributes.sink) + if sink == UriFactory.ANY: + return UriValidator.matches(source, attributes.source) + return UriValidator.matches(source, attributes.source) and UriValidator.matches(sink, attributes.sink) diff --git a/uprotocol_vsomeip/vsomeip_utransport.py b/uprotocol_vsomeip/vsomeip_utransport.py index c9a3355..b3f2dbc 100644 --- a/uprotocol_vsomeip/vsomeip_utransport.py +++ b/uprotocol_vsomeip/vsomeip_utransport.py @@ -12,52 +12,58 @@ SPDX-License-Identifier: Apache-2.0 """ +import asyncio import json import logging import os +import random import socket import sys -import threading import time from builtins import str -from concurrent.futures import Future from enum import Enum +from threading import Lock from typing import Final, Tuple -from uprotocol.proto.uattributes_pb2 import CallOptions, UMessageType, UPriority -from uprotocol.proto.umessage_pb2 import UMessage -from uprotocol.proto.upayload_pb2 import UPayload, UPayloadFormat -from uprotocol.proto.uri_pb2 import UEntity, UUri -from uprotocol.proto.ustatus_pb2 import UCode, UStatus -from uprotocol.rpc.rpcclient import RpcClient -from uprotocol.transport.builder.uattributesbuilder import UAttributesBuilder from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport +from uprotocol.transport.validator.uattributesvalidator import Validators +from uprotocol.uri.factory.uri_factory import UriFactory from uprotocol.uri.validator.urivalidator import UriValidator -from uprotocol.uuid.serializer.longuuidserializer import LongUuidSerializer +from uprotocol.uuid.serializer.uuidserializer import UuidSerializer +from uprotocol.v1.uattributes_pb2 import UMessageType +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus from vsomeip_py import vsomeip from .helper import VsomeipHelper +from .message_conversion import VsomeipToUMessage +from .utils import MessageFlag, Utils _logger = logging.getLogger("vsomeip_utransport" + "." + __name__) IS_WINDOWS: Final = sys.platform.startswith("win") -class VsomeipTransport(UTransport, RpcClient): +class VsomeipTransport(UTransport): """ Vsomeip Transport """ - _futures = {} - _registers = {} - _responses = {} + _event_listeners = {} + _request_listeners = {} + _response_listeners = {} _instances = {} _configuration = {} _requests = {} - _published = {} - _lock = threading.Lock() + _responses = {} + _lock = Lock() + _response_lock = Lock() + _request_lock = Lock() + _subscribe_lock = Lock() + _uuid_mapping_for_responses = {} - INSTANCE_ID: Final = 0x0000 MINOR_VERSION: Final = 0x0000 class VSOMEIPType(Enum): @@ -70,7 +76,7 @@ class VSOMEIPType(Enum): def __init__( self, - source: UUri = UUri(), + source: UUri, unicast: str = "127.0.0.1", multicast: Tuple[str, int] = ("224.244.224.245", 30490), helper: VsomeipHelper = VsomeipHelper(), @@ -100,13 +106,6 @@ def __init__( if self._helper.services_info(): self._create_services() - @staticmethod - def _replace_special_chars(string: str) -> str: - """ - Replace . with _ to name the vsomeip application - """ - return string.replace(".", "_") - def _create_services(self) -> None: """ Instantiate all COVESA Services @@ -124,35 +123,41 @@ def _create_services(self) -> None: self._configuration["service-discovery"]["port"] = str(self._multicast[1]) for service in services: - service_name = service.Name - service_id = service.Id - service_name = ( - self._replace_special_chars(service_name) + "_" + VsomeipTransport.VSOMEIPType.SERVICE.value - ) + instance_id, service_id = Utils.split_u32_to_u16(service.service_id) + + if instance_id == 0x0000: + instance_id = 1 + service_name = str(service_id) + "_" + VsomeipTransport.VSOMEIPType.SERVICE.value if service_name not in self._instances: self._configuration["applications"].append( - {"id": str(len(self._instances) + 1), "name": service_name} + { + "id": str(random.randint(1, 1000000)), + "name": service_name, + } # todo: do better way to generate id ) - self._configuration["services"].append( { - "instance": str(self.INSTANCE_ID), + "instance": str(instance_id), "service": str(service_id), - "unreliable": str(service.Port), + "unreliable": str(service.port), } ) instance = vsomeip.vSOMEIP( name=service_name, id=service_id, - instance=self.INSTANCE_ID, + instance=instance_id, configuration=self._configuration, - version=(service.MajorVersion, self.MINOR_VERSION), + version=(service.major_version, self.MINOR_VERSION), ) if service_id not in service_instances: service_instances[service_id] = {} service_instances[service_id]["instance"] = instance - service_instances[service_id]["events"] = service.Events + for event in service.events: + _, event_id = Utils.split_u32_to_u16(event) + if "events" not in service_instances[service_id]: + service_instances[service_id]["events"] = [] + service_instances[service_id]["events"].append(event_id) self._instances[service_name] = instance for _, service in service_instances.items(): @@ -162,30 +167,38 @@ def _create_services(self) -> None: service["instance"].offer(events=service["events"]) - def _get_instance(self, entity: UEntity, entity_type: VSOMEIPType) -> vsomeip.vSOMEIP: + def _get_instance(self, uuri: UUri, entity_type: VSOMEIPType) -> vsomeip.vSOMEIP: """ configure and create instances of vsomeip - :param entity: uEntity object + :param uuri: UUri object :param entity_type: client/service """ - entity_id = entity.id - entity_name = self._replace_special_chars(entity.name) + "_" + entity_type.value + instance_id, entity_id = Utils.split_u32_to_u16(uuri.ue_id) + if instance_id == 0x0000: + instance_id = 1 + + entity_name = str(entity_id) + "_" + entity_type.value with self._lock: if entity_name not in self._instances and entity_type == VsomeipTransport.VSOMEIPType.CLIENT: - self._configuration["applications"].append({"id": str(len(self._instances)), "name": entity_name}) + self._configuration["applications"].append( + { + "id": str(random.randint(1, 1000)), + "name": entity_name, + } # todo: do better way to generate id + ) self._configuration["clients"].append( { - "instance": str(self.INSTANCE_ID), + "instance": str(instance_id), "service": str(entity_id), } ) instance = vsomeip.vSOMEIP( name=entity_name, id=entity_id, - instance=self.INSTANCE_ID, + instance=instance_id, configuration=self._configuration, - version=(entity.version_major, self.MINOR_VERSION), + version=(uuri.ue_version_major, self.MINOR_VERSION), ) instance.create() instance.register() @@ -195,256 +208,454 @@ def _get_instance(self, entity: UEntity, entity_type: VSOMEIPType) -> vsomeip.vS if entity_name in self._instances: return self._instances[entity_name] - def _invoke_handler(self, message_type: int, _: int, __: int, data: bytearray, request_id: int) -> bytearray: + def _on_event_handler( + self, + message_type: int, + service_id: int, + instance: int, + event_id: int, + data: bytearray, + _: int, + ) -> None: """ - callback for RPC method to set Future + Handle responses from SOME/IP service with callback to Publish UListener registered + + :param message_type: The SOME/IP message type + :param service_id: The service ID + :param instance: instance of the service + :param event_id: The event/topic ID + :param data: The data + :return: None """ if message_type == vsomeip.vSOMEIP.Message_Type.REQUEST.value: - return - - req_id = LongUuidSerializer.instance().serialize(self._requests[request_id].attributes.id) - parsed_message = UMessage( - payload=UPayload( - value=bytes(data), - format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, - ), - attributes=self._requests[request_id].attributes, - ) + return None + + if instance == 1: + instance_id = 0x0000 + parsed_message = VsomeipToUMessage.convert_to_publish_message(service_id, instance_id, event_id, data) - if not self._futures[req_id].done(): - self._futures[req_id].set_result(parsed_message) - else: - _logger.info("Future result state is already finished or cancelled") + for listener in self._event_listeners[service_id][event_id]: + if listener is not None: + asyncio.run(listener.on_receive(parsed_message)) # call actual callback now... + return None - def _on_event_handler(self, message_type: int, service_id: int, event_id: int, data: bytearray, _: int) -> None: + def _on_service_sending_response_handler( + self, + message_type: int, + _: int, + __: int, + ___: int, + ____: bytearray, + request_id: int, + ) -> bytearray: """ - handle responses from service with callback to listener registered + Service return the send response set for the initial request message + :param message_type: The SOME/IP message type + :param request_id: The SOME/IP request id, application_id+session_id + :return: Response data """ - if message_type == vsomeip.vSOMEIP.Message_Type.REQUEST.value: - return None - - payload_data = bytes(data) - message = self._published[service_id][event_id] # Note: is mem shared copy + if message_type != vsomeip.vSOMEIP.Message_Type.REQUEST.value: + return None # do nothing - message_payload = message.payload - if message.payload.value != payload_data: - hint = UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY - message_payload = UPayload(value=payload_data, hint=hint) + response_data = None - parsed_message = UMessage(payload=message_payload, attributes=message.attributes) + req_id = self._uuid_mapping_for_responses[request_id] + timed_out = 120 # ~ 3sec. + while True: # todo: with locks instead + timed_out = timed_out - 1 + if timed_out < 0: + break + if req_id not in self._responses: + time.sleep(0.025) + continue + response_data = self._responses[req_id][0].payload + del self._responses[req_id][0] + break - for _, listener in self._registers[service_id][event_id]: - if listener: - listener.on_receive(parsed_message) # call actual callback now... - return None + return bytearray(response_data) # note: return data is what is sent over transport (i.e. someip) as response - def _on_method_handler( + def _on_method_request_received_handler( self, message_type: int, service_id: int, + instance: int, method_id: int, data: bytearray, request_id: int, ) -> None: """ - handle responses from service with callback to listener registered + Handle responses from service on a method request with callback to Request UListener registered + + :param message_type: The SOME/IP message type + :param service_id: The service ID + :param instance: instance of the service + :param method_id: The RPC method ID + :param data: The data + :param request_id: The SOME/IP request id, application_id+session_id + :return: None """ if message_type != vsomeip.vSOMEIP.Message_Type.REQUEST.value: return None - payload_data = bytes(data) - message = self._requests[request_id] # Note: is mem shared copy + if instance == 1: + instance_id = 0x0000 + # todo: if instance not 1 from someip then how to reconstruct? + parsed_message = VsomeipToUMessage.convert_to_request_message( + service_id, instance_id, method_id, data, self.get_source() + ) - message_payload = message.payload - if message.payload.value != payload_data: - hint = UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY - message_payload = UPayload(value=payload_data, hint=hint) + self._uuid_mapping_for_responses[request_id] = UuidSerializer.serialize(parsed_message.attributes.id) - parsed_message = UMessage(payload=message_payload, attributes=message.attributes) + for (source_uri, sink_uri), listener in self._request_listeners.items(): + is_match = Utils.matches(source_uri, sink_uri, parsed_message.attributes) + if is_match and listener is not None: + asyncio.run(listener.on_receive(parsed_message)) # call actual callback now... - for _, listener in self._registers[service_id][method_id]: - if listener: - listener.on_receive(parsed_message) # call actual callback now... + return None + + def _on_client_receiving_response_handler( + self, + message_type: int, + _: int, + __: int, + ___: int, + data: bytearray, + request_id: int, + ) -> None: + """ + Handle response received by client with callback to the response UListener registered + + :param message_type: The SOME/IP message type + :param data: The data + :param request_id: The SOME/IP request id, application_id+session_id + :return: None + """ + if message_type != vsomeip.vSOMEIP.Message_Type.RESPONSE.value: + return None + + message = self._requests[request_id] # Note: is mem shared copy + parsed_message = VsomeipToUMessage.convert_to_response_message(message.attributes, data) + + for (source_uri, sink_uri), listener in self._response_listeners.items(): + is_match = Utils.matches(source_uri, sink_uri, parsed_message.attributes) + if is_match and listener is not None: + asyncio.run(listener.on_receive(parsed_message)) # call actual callback now... return None - def _for_response_handler(self, message_type: int, _: int, __: int, ___: bytearray, request_id: int) -> bytearray: + def _send_publish(self, message) -> UStatus: """ - Return from the send response set for the response of the initial request message + Send a Publish message over SOME/IP transport + + :param message: UProtocol message + :return: UStatus """ - if message_type != vsomeip.vSOMEIP.Message_Type.REQUEST.value: - return None # do nothing + _logger.debug("SEND PUBLISH") + uri = message.attributes.source + if not UriValidator.is_topic(uri): + return UStatus(code=UCode.INVALID_ARGUMENT, message="uri provided is not a topic") + instance = self._get_instance(uri, VsomeipTransport.VSOMEIPType.SERVICE) + _, event_id = Utils.split_u32_to_u16(uri.resource_id) + payload_data = bytearray(message.payload) + try: + instance.offer(events=[event_id]) + if payload_data: + instance.notify(id=event_id, data=payload_data) + except Exception as ex: + return UStatus(message=str(ex), code=UCode.UNKNOWN) + return UStatus(message="publish", code=UCode.OK) + + def _send_request(self, message) -> UStatus: + """ + Send a Request message over SOME/IP transport - response_data = None + :param message: UProtocol message + :return: UStatus + """ + _logger.debug("SEND REQUEST") + uri = message.attributes.sink + if not UriValidator.is_rpc_method(uri): + return UStatus( + code=UCode.INVALID_ARGUMENT, + message="uri provided is not an RPC request", + ) + instance = self._get_instance(uri, VsomeipTransport.VSOMEIPType.CLIENT) + _, method_id = Utils.split_u32_to_u16(uri.resource_id) + payload_data = bytearray(message.payload) + try: + request_id = instance.request(id=method_id, data=payload_data) + self._requests[request_id] = message # Important: in memory ONLY, thus stored per application level + except Exception as ex: + return UStatus(message=str(ex), code=UCode.UNKNOWN) + return UStatus(message="request", code=UCode.OK) - message = self._requests[request_id] # Note: is mem shared copy, CHEATER!!! - req_id = LongUuidSerializer.instance().serialize(message.attributes.id) + def _send_response(self, message) -> UStatus: + """ + Save a Response message against the uProtocol request id - timed_out = 120 # ~ 3sec. - while True: # todo: with locks instead - timed_out = timed_out - 1 - if timed_out < 0: - break + :param message: UProtocol message + :return: UStatus + """ + _logger.debug("SEND RESPONSE") + uri = message.attributes.sink + if not UriValidator.is_rpc_response(uri): + return UStatus( + code=UCode.INVALID_ARGUMENT, + message="uri provided is not an RPC response", + ) + req_id = UuidSerializer().serialize(message.attributes.reqid) + try: if req_id not in self._responses: - time.sleep(0.025) - continue - response_data = self._responses[req_id][0].payload.value - del self._responses[req_id][0] - break + self._responses[req_id] = [] + self._responses[req_id].append(message) - return bytearray(response_data) # note: return data is what is sent over transport (i.e. someip) as response + except Exception as ex: + return UStatus(message=str(ex), code=UCode.UNKNOWN) + return UStatus(message="response", code=UCode.OK) - def send(self, message: UMessage) -> UStatus: + def _register_request_listener(self, source_filter, listener, sink_filter) -> UStatus: """ - Service/Client Sends a message (in parts) over the transport. + Register a Request UListener for source and sink filters to be called when + a message is received. - :param message: UMessage to be sent. - :return: UStatus with UCode set to the status code (successful or failure). + :param source_filter: The source address pattern + :param sink_filter: The sink address pattern + :param listener: The Request UListener that will execute when the message is received on the given UUri. + :return: Returns UStatus + """ + try: + source_uri = Utils.get_uuri_string(source_filter) + sink_uri = Utils.get_uuri_string(sink_filter) + + with self._request_lock: + self._request_listeners[source_uri, sink_uri] = listener + _, resource_id = Utils.split_u32_to_u16(sink_filter.resource_id) + instance = self._get_instance(sink_filter, VsomeipTransport.VSOMEIPType.SERVICE) + instance.on_message(resource_id, self._on_method_request_received_handler) + instance.on_message( + resource_id, self._on_service_sending_response_handler + ) # handles returning a response of data + except Exception as err: + return UStatus(message=str(err), code=UCode.UNKNOWN) + return UStatus(message="listener", code=UCode.OK) + + def _register_response_listener(self, source_filter, listener, sink_filter) -> UStatus: """ - if message.attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH: - _logger.debug("SEND PUBLISH") - uri = message.attributes.source - status = UriValidator.validate(uri) - if status.is_failure(): - return status.to_status() - instance = self._get_instance(uri.entity, VsomeipTransport.VSOMEIPType.SERVICE) - - event_id = uri.resource.id - service_id = uri.entity.id - payload_data = bytearray(message.payload.value) - try: - instance.offer(events=[event_id]) - if payload_data: - if service_id not in self._published: - self._published[service_id] = {} - self._published[service_id][event_id] = message - instance.notify(id=event_id, data=payload_data) - except Exception as ex: - return UStatus(message=str(ex), code=UCode.UNKNOWN) - return UStatus(message="publish", code=UCode.OK) - if message.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: - _logger.debug("SEND REQUEST") - uri = message.attributes.sink - status = UriValidator.validate(uri) - if status.is_failure(): - return status.to_status() - instance = self._get_instance(uri.entity, VsomeipTransport.VSOMEIPType.CLIENT) - - method_id = uri.resource.id - payload_data = bytearray(message.payload.value) - try: - request_id = instance.request(id=method_id, data=payload_data) - self._requests[request_id] = message # Important: in memory ONLY, thus stored per application level - except Exception as ex: - return UStatus(message=str(ex), code=UCode.UNKNOWN) - return UStatus(message="request", code=UCode.OK) - if message.attributes.type == UMessageType.UMESSAGE_TYPE_RESPONSE: - _logger.debug("SEND RESPONSE") - uri = message.attributes.source - status = UriValidator.validate(uri) - if status.is_failure(): - return status.to_status() - req_id = LongUuidSerializer.instance().serialize(message.attributes.reqid) - try: - if req_id not in self._responses: - self._responses[req_id] = [] - self._responses[req_id].append(message) - except NotImplementedError as ex: - raise ex - except Exception as ex: - return UStatus(message=str(ex), code=UCode.UNKNOWN) - return UStatus(message="response", code=UCode.OK) - return UStatus(message="", code=UCode.UNIMPLEMENTED) - - def register_listener(self, uri: UUri, listener: UListener) -> UStatus: - """ - Register a listener for topic to be called when a message is received. - - :param uri: UUri to listen for messages from. - :param listener: The UListener that will be executed when the message - is received on the given UUri. - - :return: Returns UStatus with UCode.OK if the listener is registered - correctly, otherwise it returns with the appropriate failure. - """ - is_method = UriValidator.validate_rpc_method(uri).is_success() - resource_id = uri.resource.id - service_id = uri.entity.id - - if service_id not in self._registers: - self._registers[service_id] = {} - if resource_id not in self._registers[service_id]: - self._registers[service_id][resource_id] = [] - self._registers[service_id][resource_id].append((uri, listener)) + Register a Response UListener for source and sink filters to be called when + a message is received. + :param source_filter: The source address pattern + :param sink_filter: The sink address pattern + :param listener: The Response UListener that will execute when the message is received on the given UUri. + :return: Returns UStatus + """ try: - if is_method: - instance = self._get_instance(uri.entity, VsomeipTransport.VSOMEIPType.SERVICE) - instance.on_message(resource_id, self._on_method_handler) # handles the UListener - instance.on_message(resource_id, self._for_response_handler) # handles returning a response of data - else: - instance = self._get_instance(uri.entity, VsomeipTransport.VSOMEIPType.CLIENT) - instance.on_event(resource_id, self._on_event_handler) + source_uri = Utils.get_uuri_string(source_filter) + sink_uri = Utils.get_uuri_string(sink_filter) + + with self._response_lock: + self._response_listeners[source_uri, sink_uri] = listener + _, resource_id = Utils.split_u32_to_u16(sink_filter.resource_id) + instance = self._get_instance(sink_filter, VsomeipTransport.VSOMEIPType.CLIENT) + for resource_id in range(0, 0xFF): # todo: should register to a range? as its wildcard? + instance.on_message(resource_id, self._on_client_receiving_response_handler) except Exception as err: return UStatus(message=str(err), code=UCode.UNKNOWN) return UStatus(message="listener", code=UCode.OK) - def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + def _register_publish_listener(self, source_filter, listener, _) -> UStatus: """ - Unregister a listener for topic. Messages arriving at this topic will - no longer be processed by this listener. + Register a Publish UListener for source and sink filters to be called when + a message is received. - :param topic: UUri to the listener was registered for. - :param listener: UListener that will no longer want to be registered to - receive messages. + :param source_filter: The source address pattern + :param listener: The Publish UListener that will execute when the message is received on the given UUri. + :return: Returns UStatus + """ + try: + with self._subscribe_lock: + _, service_id = Utils.split_u32_to_u16(source_filter.ue_id) + _, event_id = Utils.split_u32_to_u16(source_filter.resource_id) + if service_id not in self._event_listeners: + self._event_listeners[service_id] = {} + if event_id not in self._event_listeners[service_id]: + self._event_listeners[service_id][event_id] = [] + + self._event_listeners[service_id][event_id].append(listener) + + instance = self._get_instance(source_filter, VsomeipTransport.VSOMEIPType.CLIENT) + instance.on_event(source_filter.resource_id, self._on_event_handler) + except Exception as err: + return UStatus(message=str(err), code=UCode.UNKNOWN) + return UStatus(message="listener", code=UCode.OK) + + def _unregister_request_listener(self, source_filter, listener, sink_filter) -> UStatus: + """ + Unregister a Request UListener for UUri source and sink filters - :return: Returns UStatus with UCode.OK if the listener is unregistered - correctly, otherwise it returns with the appropriate failure. + :param source_filter: Source address pattern + :param sink_filter: Sink address pattern + :param listener: The Request UListener that will no longer want to be registered + :return: Returns UStatus """ - instance = self._get_instance(topic.entity, VsomeipTransport.VSOMEIPType.SERVICE) - service_id = topic.entity.id - event_id = topic.resource.id try: - instance.remove(event_id) - if service_id in self._registers: - if event_id in self._registers[service_id]: - if (topic, listener) in self._registers[service_id][event_id]: - self._registers[service_id][event_id].remove((topic, listener)) + source_uri = Utils.get_uuri_string(source_filter) + sink_uri = Utils.get_uuri_string(sink_filter) + + with self._request_lock: + if self._request_listeners[(source_uri, sink_uri)] != listener: + raise Exception("Listener not registered") + del self._request_listeners[(source_uri, sink_uri)] + _, resource_id = Utils.split_u32_to_u16(sink_filter.resource_id) + if resource_id == 0: + instance = self._get_instance(sink_filter, VsomeipTransport.VSOMEIPType.SERVICE) + instance.remove(resource_id) except Exception as err: return UStatus(message=str(err), code=UCode.UNKNOWN) return UStatus(message="unregister listener", code=UCode.OK) - def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: CallOptions) -> Future: + def _unregister_response_listener(self, source_filter, listener, sink_filter) -> UStatus: """ - API for clients to invoke a method (send an RPC request) and - receive the response (the returned Future UMessage). + Unregister a Response UListener for UUri source and sink filters - :param method_uri: The method URI to be invoked - :param request_payload: The request payload to be sent to the service. - :param options: RPC method invocation call options, see CallOptions + :param source_filter: Source address pattern + :param sink_filter: Sink address pattern + :param listener: The Response UListener that will no longer want to be registered + :return: Returns UStatus + """ + try: + source_uri = Utils.get_uuri_string(source_filter) + sink_uri = Utils.get_uuri_string(sink_filter) + + with self._response_lock: + if self._response_listeners[(source_uri, sink_uri)] != listener: + raise Exception("Listener not registered") + del self._response_listeners[(source_uri, sink_uri)] + _, resource_id = Utils.split_u32_to_u16(sink_filter.resource_id) + if resource_id != 0: + instance = self._get_instance(sink_filter, VsomeipTransport.VSOMEIPType.CLIENT) + instance.remove(resource_id) + except Exception as err: + return UStatus(message=str(err), code=UCode.UNKNOWN) + return UStatus(message="unregister listener", code=UCode.OK) - :return: Returns the CompletableFuture with the result or exception. + def _unregister_publish_listener(self, source_filter, listener, _) -> UStatus: """ - _logger.debug("INVOKE METHOD") - if method_uri is None or method_uri == UUri(): - raise ValueError("Method Uri is empty") - if request_payload is None: - raise ValueError("Payload is None") - if options is None: - raise ValueError("Call Options cannot be None") - timeout = options.ttl - if timeout <= 0: - raise ValueError("TTl is invalid or missing") + Unregister a Publish UListener for UUri source and sink filters - source = self._source - attributes = UAttributesBuilder.request(source, method_uri, UPriority.UPRIORITY_CS4, options.ttl).build() - instance = self._get_instance(method_uri.entity, VsomeipTransport.VSOMEIPType.CLIENT) - method_id = method_uri.resource.id - uuid = LongUuidSerializer.instance().serialize(attributes.id) + :param source_filter: Source address pattern + :param listener: The Request UListener that will no longer want to be registered + :return: Returns UStatus + """ + try: + with self._subscribe_lock: + _, service_id = Utils.split_u32_to_u16(source_filter.ue_id) + _, resource_id = Utils.split_u32_to_u16(source_filter.resource_id) + for listener_registered in self._event_listeners[service_id][resource_id]: + if listener_registered == listener: + self._event_listeners[service_id][resource_id].remove(listener) + instance = self._get_instance(source_filter, VsomeipTransport.VSOMEIPType.CLIENT) + instance.remove(resource_id) + except Exception as err: + return UStatus(message=str(err), code=UCode.UNKNOWN) + return UStatus(message="unregister listener", code=UCode.OK) + + async def send(self, message: UMessage) -> UStatus: + """ + Service/Client Sends a message (in parts) over the transport. + + :param message: UMessage to be sent. + :return: UStatus with UCode set to the status code (successful or failure). + """ + attributes = message.attributes + message_type = attributes.type + if not attributes.source: + return UStatus( + code=UCode.INVALID_ARGUMENT, + message="attributes.source shouldn't be empty", + ) + + if message_type == UMessageType.UMESSAGE_TYPE_PUBLISH: + Validators.PUBLISH.validator().validate(attributes) + return self._send_publish(message) + + if message_type == UMessageType.UMESSAGE_TYPE_NOTIFICATION: + Validators.NOTIFICATION.validator().validate(attributes) + return self._send_publish(message) + + if message_type == UMessageType.UMESSAGE_TYPE_REQUEST: + Validators.REQUEST.validator().validate(attributes) + return self._send_request(message) + + if message_type == UMessageType.UMESSAGE_TYPE_RESPONSE: + Validators.RESPONSE.validator().validate(attributes) + return self._send_response(message) + + return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid Message type in UAttributes") + + async def register_listener( + self, + source_filter: UUri, + listener: UListener, + sink_filter: UUri = UriFactory.ANY, + ) -> UStatus: + """ + Register a listener for source and sink filters to be called when + a message is received. + + :param source_filter: The source address pattern + :param sink_filter: The sink address pattern + :param listener: The UListener that will execute when the message is received on the given UUri. + :return: Returns UStatus + """ + flag = Utils.get_listener_message_type(source_filter, sink_filter) + + if flag & MessageFlag.REQUEST: + return self._register_request_listener(source_filter, listener, sink_filter) + + if flag & MessageFlag.RESPONSE: + return self._register_response_listener(source_filter, listener, sink_filter) + + if flag & (MessageFlag.PUBLISH | MessageFlag.NOTIFICATION): + return self._register_publish_listener(source_filter, listener, sink_filter) + + async def unregister_listener( + self, + source_filter: UUri, + listener: UListener, + sink_filter: UUri = UriFactory.ANY, + ) -> UStatus: + """ + Unregister UListener for UUri source and sink filters. Messages + arriving at this topic will no longer be processed by this listener. + + :param source_filter: Source address pattern + :param sink_filter: Sink address pattern + :param listener: The UListener that will no longer want to be registered + :return: Returns UStatus + """ + flag = Utils.get_listener_message_type(source_filter, sink_filter) - self._futures[uuid] = Future() - instance.on_message(method_id, self._invoke_handler) - message = UMessage(attributes=attributes, payload=request_payload) - self.send(message) + if flag & MessageFlag.REQUEST: + return self._unregister_request_listener(source_filter, listener, sink_filter) - return self._futures[uuid] + if flag & MessageFlag.RESPONSE: + return self._unregister_response_listener(source_filter, listener, sink_filter) + + if flag & (MessageFlag.PUBLISH | MessageFlag.NOTIFICATION): + return self._unregister_publish_listener(source_filter, listener, sink_filter) + + def get_source(self) -> UUri: + """ + Get the source URI of the transport. + + :return: source URI + """ + return self._source + + async def close(self) -> None: + """ + Close the connection to the transport that will trigger any registered listeners + to be unregistered. + """ + pass