Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions up_transport_zenoh/uptransportzenoh.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ async def register_listener(
if flag & (MessageFlag.PUBLISH | MessageFlag.NOTIFICATION):
# Get Zenoh key
zenoh_key = ZenohUtils.to_zenoh_key_string(self.authority_name, source_filter, sink_filter)
logging.debug(f"Using zenoh key: {zenoh_key}")
return self.register_publish_notification_listener(zenoh_key, listener)

async def unregister_listener(
Expand Down
17 changes: 13 additions & 4 deletions up_transport_zenoh/zenohutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from uprotocol.v1.ustatus_pb2 import UStatus
from zenoh import Priority, ZBytes

UATTRIBUTE_VERSION: int = 1
UATTRIBUTE_VERSION: int = 10

# Configure the logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
Expand All @@ -45,11 +45,18 @@ class ZenohUtils:
def uri_to_zenoh_key(authority_name: str, uri: UUri) -> str:
authority = authority_name if not uri.authority_name else uri.authority_name
ue_id = "*" if uri.ue_id == UriFactory.WILDCARD_ENTITY_ID else f"{uri.ue_id:X}"
if ue_id == "*":
ue_type = "*"
ue_instance = "*"
else:
ue_id = int(ue_id)
ue_type = (ue_id >> 16) & 0xFFFF # Get upper 16 bits
ue_instance = ue_id & 0xFFFF # Get lower 16 bits
ue_version_major = (
"*" if uri.ue_version_major == UriFactory.WILDCARD_ENTITY_VERSION else f"{uri.ue_version_major:X}"
)
resource_id = "*" if uri.resource_id == UriFactory.WILDCARD_RESOURCE_ID else f"{uri.resource_id:X}"
return f"{authority}/{ue_id}/{ue_version_major}/{resource_id}"
return f"{authority}/{ue_type}/{ue_instance}/{ue_version_major}/{resource_id}"

@staticmethod
def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]:
Expand All @@ -70,7 +77,9 @@ def get_uauth_from_uuri(uri: UUri) -> Union[str, UStatus]:
@staticmethod
def to_zenoh_key_string(authority_name: str, src_uri: UUri, dst_uri: UUri = None) -> str:
src = ZenohUtils.uri_to_zenoh_key(authority_name, src_uri)
dst = ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri and dst_uri != UUri() else "{}/{}/{}/{}"
dst = (
ZenohUtils.uri_to_zenoh_key(authority_name, dst_uri) if dst_uri and dst_uri != UUri() else "{}/{}/{}/{}/{}"
)
return f"up/{src}/{dst}"

@staticmethod
Expand Down Expand Up @@ -215,7 +224,7 @@ def get_listener_message_type(source_uuri: UUri, sink_uuri: UUri = None) -> Unio
# Error handling
if flag == MessageFlag(0):
raise UStatusError.from_code_message(
code=UCode.INTERNAL, message="Wrong combination of source UUri and sink " "UUri"
code=UCode.INTERNAL, message="Wrong combination of source UUri and sink UUri"
)
else:
return flag