Skip to content

Commit ad025f7

Browse files
committed
Update Message Handler to support wildcards
1 parent 799496a commit ad025f7

File tree

1 file changed

+27
-16
lines changed

1 file changed

+27
-16
lines changed

up_client_mqtt5_python/mqtt5_utransport.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,9 @@ def _listen(self, client, userdata, msg):
141141

142142
message_type_handlers = {
143143
UMessageType.UMESSAGE_TYPE_UNSPECIFIED: self._handle_unspecified_message,
144-
UMessageType.UMESSAGE_TYPE_PUBLISH: self._handle_publish_message,
145-
UMessageType.UMESSAGE_TYPE_REQUEST: self._handle_publish_message,
144+
UMessageType.UMESSAGE_TYPE_PUBLISH: self._handle_gen_message,
145+
UMessageType.UMESSAGE_TYPE_REQUEST: self._handle_gen_message,
146+
UMessageType.UMESSAGE_TYPE_NOTIFICATION: self._handle_gen_message,
146147
UMessageType.UMESSAGE_TYPE_RESPONSE: self._handle_response_message,
147148
}
148149

@@ -167,14 +168,24 @@ def _handle_response_message(self, topic: str, umsg: UMessage):
167168

168169
del self.reqid_to_future[request_id_b]
169170

170-
def _handle_publish_message(self, topic: str, umsg: UMessage):
171-
if topic in self.topic_to_listener:
172-
logger.info("%s Handle Publish Message on Topic", self.__class__.__name__)
173-
174-
for listener in self.topic_to_listener[topic]:
175-
listener.on_receive(umsg)
176-
else:
177-
logger.info("%s %s not found in Listener Map", self.__class__.__name__, topic)
171+
def _handle_gen_message(self, topic: str, umsg: UMessage):
172+
173+
topic = topic.replace("FFFF", "+").replace("FF", "+")
174+
175+
pieces_of_topic = topic.split("/")
176+
for x in self.topic_to_listener.keys():
177+
pieces_of_x = x.split("/")
178+
if len(pieces_of_topic) == len(pieces_of_x):
179+
matches = True
180+
for i in range(len(pieces_of_topic)):
181+
if pieces_of_x[i] == "+":
182+
continue
183+
if pieces_of_x[i] != pieces_of_topic[i]:
184+
matches = False
185+
if matches:
186+
logger.info("%s Handle Message on Topic", self.__class__.__name__)
187+
for listener in self.topic_to_listener[x]:
188+
listener.on_receive(umsg)
178189

179190
def mqtt_topic_builder(self, source: UUri, sink: UUri = None) -> str:
180191
"""
@@ -189,15 +200,15 @@ def mqtt_topic_builder(self, source: UUri, sink: UUri = None) -> str:
189200
device = "c" if self.cloud_device else "d"
190201
if source != UUri():
191202
src_auth_name = source.authority_name if source != UUri() else "+"
192-
src_ue_id = uuri_field_resolver(source.ue_id, 0xFFFF, "ffff")
193-
src_ue_version_major = uuri_field_resolver(source.ue_version_major, 0xFF, "ff")
194-
src_resource_id = uuri_field_resolver(source.resource_id, 0xFFFF, "ffff")
203+
src_ue_id = uuri_field_resolver(source.ue_id, 0xFFFF, "+")
204+
src_ue_version_major = uuri_field_resolver(source.ue_version_major, 0xFF, "+")
205+
src_resource_id = uuri_field_resolver(source.resource_id, 0xFFFF, "+")
195206
topic = device + "/" + src_auth_name + "/" + src_ue_id + "/" + src_ue_version_major + "/" + src_resource_id
196207
if sink is not None and sink != UUri():
197208
sink_auth_name = sink.authority_name
198-
sink_ue_id = uuri_field_resolver(sink.ue_id, 0xFFFF, "ffff")
199-
sink_ue_version_major = uuri_field_resolver(sink.ue_version_major, 0xFF, "ff")
200-
sink_resource_id = uuri_field_resolver(sink.resource_id, 0xFFFF, "ffff")
209+
sink_ue_id = uuri_field_resolver(sink.ue_id, 0xFFFF, "+")
210+
sink_ue_version_major = uuri_field_resolver(sink.ue_version_major, 0xFF, "+")
211+
sink_resource_id = uuri_field_resolver(sink.resource_id, 0xFFFF, "+")
201212
topic += "/" + sink_auth_name + "/" + sink_ue_id + "/" + sink_ue_version_major + "/" + sink_resource_id
202213
return topic
203214

0 commit comments

Comments
 (0)