@@ -217,37 +217,38 @@ impl TransportMode {
217217}
218218
219219async fn process_incoming_message (
220- registered_listeners : & Arc < RwLock < RegisteredListeners > > ,
220+ registered_listeners : & dyn listener_registry:: ListenerRegistry ,
221+ message_mapper : & dyn mapping:: MessageMapper ,
221222 mqtt_message : paho_mqtt:: Message ,
222223) {
223224 // extract uProtocol message from MQTT PUBLISH packet
224- let umessage = match mapping:: create_uattributes_from_mqtt_properties ( mqtt_message. properties ( ) )
225- {
226- Ok ( uattributes) => UMessage {
227- attributes : Some ( uattributes) . into ( ) ,
228- payload : Some ( Bytes :: copy_from_slice ( mqtt_message. payload ( ) ) ) ,
229- ..Default :: default ( )
230- } ,
231- Err ( e) => {
232- debug ! ( "Failed to map MQTT PUBLISH packet to uProtocol message: {e}" ) ;
233- return ;
234- }
235- } ;
225+ let umessage =
226+ match message_mapper. create_uattributes_from_mqtt_properties ( mqtt_message. properties ( ) ) {
227+ Ok ( uattributes) => UMessage {
228+ attributes : Some ( uattributes) . into ( ) ,
229+ payload : Some ( Bytes :: copy_from_slice ( mqtt_message. payload ( ) ) ) ,
230+ ..Default :: default ( )
231+ } ,
232+ Err ( e) => {
233+ // [impl->dsn~utransport-registerlistener-discard-invalid-messages~1]
234+ debug ! ( "Failed to map MQTT PUBLISH packet to uProtocol message: {e}" ) ;
235+ return ;
236+ }
237+ } ;
236238
237239 // [impl->dsn~utransport-registerlistener-start-invoking-listeners~1]
238240 // [impl->dsn~utransport-unregisterlistener-stop-invoking-listeners~1]
239241 let listeners_to_invoke = {
240- let registered_listeners_read = registered_listeners. read ( ) . await ;
241242 #[ cfg( not( test) ) ]
242243 {
243- determine_listeners ( & registered_listeners_read , & mqtt_message)
244+ determine_listeners ( registered_listeners , & mqtt_message)
244245 }
245246 #[ cfg( test) ]
246247 {
247- let mut listeners = determine_listeners ( & registered_listeners_read , & mqtt_message) ;
248+ let mut listeners = determine_listeners ( registered_listeners , & mqtt_message) ;
248249 if listeners. is_empty ( ) {
249250 if let Some ( ignored_message_handler) =
250- registered_listeners_read . get_ignored_message_listener ( )
251+ registered_listeners . get_ignored_message_listener ( )
251252 {
252253 debug ! ( "No listeners registered for MQTT message, invoking ignored message listener instead." ) ;
253254 listeners. insert ( ignored_message_handler) ;
@@ -269,7 +270,7 @@ async fn process_incoming_message(
269270}
270271
271272fn determine_listeners (
272- registered_listeners : & RegisteredListeners ,
273+ registered_listeners : & dyn listener_registry :: ListenerRegistry ,
273274 mqtt_message : & paho_mqtt:: Message ,
274275) -> HashSet < ComparableListener > {
275276 let subscription_ids: Vec < SubscriptionIdentifier > = mqtt_message
@@ -285,7 +286,6 @@ fn determine_listeners(
285286 . iter ( )
286287 . filter_map ( |& id| registered_listeners. determine_listeners_for_subscription_id ( id) )
287288 . flatten ( )
288- . cloned ( )
289289 . collect ( )
290290 }
291291}
@@ -327,6 +327,7 @@ pub struct Mqtt5Transport {
327327 /// Client instance for connecting to mqtt broker.
328328 mqtt_client : Arc < dyn MqttClientOperations > ,
329329 registered_listeners : Arc < RwLock < RegisteredListeners > > ,
330+ message_mapper : Arc < dyn mapping:: MessageMapper > ,
330331 /// My authority
331332 authority_name : String ,
332333 /// The transport's mode of operation.
@@ -369,6 +370,7 @@ impl Mqtt5Transport {
369370 let mut transport = Self {
370371 mqtt_client : Arc :: new ( client_operations) ,
371372 registered_listeners,
373+ message_mapper : Arc :: new ( mapping:: DefaultMessageValidator ) ,
372374 authority_name : authority,
373375 mode : options. mode ,
374376 message_callback_handle : None ,
@@ -381,7 +383,13 @@ impl Mqtt5Transport {
381383
382384 #[ cfg( test) ]
383385 pub ( crate ) async fn process_incoming_message ( & self , mqtt_message : paho_mqtt:: Message ) {
384- process_incoming_message ( & self . registered_listeners , mqtt_message) . await ;
386+ let registered_listeners_read = self . registered_listeners . read ( ) . await ;
387+ process_incoming_message (
388+ & * registered_listeners_read,
389+ & * self . message_mapper ,
390+ mqtt_message,
391+ )
392+ . await ;
385393 }
386394
387395 /// Establishes the initial connection to the MQTT broker.
@@ -424,6 +432,7 @@ impl Mqtt5Transport {
424432 fn create_cb_message_handler ( & mut self , mut message_stream : Receiver < Option < Message > > ) {
425433 let cloned_client_operations = self . mqtt_client . clone ( ) ;
426434 let cloned_registered_listeners = self . registered_listeners . clone ( ) ;
435+ let cloned_message_mapper = self . message_mapper . clone ( ) ;
427436 let handle = tokio:: spawn ( async move {
428437 while let Some ( msg_opt) = message_stream. next ( ) . await {
429438 let Some ( msg) = msg_opt else {
@@ -439,7 +448,9 @@ impl Mqtt5Transport {
439448 . get_string( paho_mqtt:: PropertyCode :: ContentType )
440449 . unwrap_or_else( || "N/A" . to_string( ) )
441450 ) ;
442- process_incoming_message ( & cloned_registered_listeners, msg) . await ;
451+ let registered_listeners_read = cloned_registered_listeners. read ( ) . await ;
452+ process_incoming_message ( & * registered_listeners_read, & * cloned_message_mapper, msg)
453+ . await ;
443454 }
444455 } ) ;
445456 self . message_callback_handle = Some ( handle) ;
@@ -467,7 +478,9 @@ impl Mqtt5Transport {
467478 payload : Option < Bytes > ,
468479 ) -> Result < ( ) , UStatus > {
469480 // put metadata into MQTT 5 message properties
470- let props = mapping:: create_mqtt_properties_from_uattributes ( attributes) ?;
481+ let props = self
482+ . message_mapper
483+ . create_mqtt_properties_from_uattributes ( attributes) ?;
471484
472485 // Get mqtt topic string from source and sink uuris
473486 let src_uri = attributes. source . as_ref ( ) . ok_or_else ( || {
@@ -602,6 +615,7 @@ mod tests {
602615 use super :: * ;
603616
604617 fn create_mqtt_publish_message (
618+ message_mapper : & dyn mapping:: MessageMapper ,
605619 uuid : & UUID ,
606620 source : & UUri ,
607621 payload : & str ,
@@ -613,13 +627,14 @@ mod tests {
613627 let mqtt_topic = TransportMode :: InVehicle
614628 . to_mqtt_topic ( source, None , "test_authority" )
615629 . expect ( "failed to create MQTT topic string" ) ;
616- let props = crate :: mapping:: create_mqtt_properties_from_uattributes (
617- umessage
618- . attributes
619- . as_ref ( )
620- . expect ( "UMessage has no attributes" ) ,
621- )
622- . expect ( "invalid uattributes" ) ;
630+ let props = message_mapper
631+ . create_mqtt_properties_from_uattributes (
632+ umessage
633+ . attributes
634+ . as_ref ( )
635+ . expect ( "UMessage has no attributes" ) ,
636+ )
637+ . expect ( "invalid uattributes" ) ;
623638 paho_mqtt:: MessageBuilder :: new ( )
624639 . topic ( mqtt_topic)
625640 . payload ( payload)
@@ -651,6 +666,7 @@ mod tests {
651666 let mqtt_transport = Mqtt5Transport {
652667 mqtt_client : Arc :: new ( client_operations) ,
653668 registered_listeners : Arc :: new ( RwLock :: new ( RegisteredListeners :: default ( ) ) ) ,
669+ message_mapper : Arc :: new ( mapping:: DefaultMessageValidator ) ,
654670 authority_name : "test" . to_string ( ) ,
655671 mode : TransportMode :: InVehicle ,
656672 message_callback_handle : None ,
@@ -672,13 +688,17 @@ mod tests {
672688 client_operations. expect_subscribe ( ) . return_const ( Ok ( ( ) ) ) ;
673689 client_operations. expect_unsubscribe ( ) . return_const ( Ok ( ( ) ) ) ;
674690
691+ let message_mapper = mapping:: DefaultMessageValidator :: default ( ) ;
692+
675693 let message_id_1 = UUID :: build ( ) ;
676- let message_1 = create_mqtt_publish_message ( & message_id_1, & source, "some payload" ) ;
694+ let message_1 =
695+ create_mqtt_publish_message ( & message_mapper, & message_id_1, & source, "some payload" ) ;
677696 let message_id_2 = UUID :: build ( ) ;
678- let message_2 = create_mqtt_publish_message ( & message_id_2, & source, "some payload" ) ;
697+ let message_2 =
698+ create_mqtt_publish_message ( & message_mapper, & message_id_2, & source, "some payload" ) ;
679699 let message_id_3 = UUID :: build ( ) ;
680- let message_3 = create_mqtt_publish_message ( & message_id_3 , & source , "some payload" ) ;
681-
700+ let message_3 =
701+ create_mqtt_publish_message ( & message_mapper , & message_id_3 , & source , "some payload" ) ;
682702 let mut expected_ignored_message_ids = VecDeque :: new ( ) ;
683703 expected_ignored_message_ids. push_front ( message_id_1. clone ( ) ) ;
684704 expected_ignored_message_ids. push_front ( message_id_3. clone ( ) ) ;
@@ -700,6 +720,7 @@ mod tests {
700720 let transport = Mqtt5Transport {
701721 mqtt_client : Arc :: new ( client_operations) ,
702722 registered_listeners : Arc :: new ( RwLock :: new ( listener_registry) ) ,
723+ message_mapper : Arc :: new ( mapping:: DefaultMessageValidator ) ,
703724 authority_name : "test" . to_string ( ) ,
704725 mode : TransportMode :: InVehicle ,
705726 message_callback_handle : None ,
@@ -765,6 +786,7 @@ mod tests {
765786 let mqtt_transport = Mqtt5Transport {
766787 mqtt_client : Arc :: new ( client_operations) ,
767788 registered_listeners : Arc :: new ( RwLock :: new ( registered_listeners) ) ,
789+ message_mapper : Arc :: new ( mapping:: DefaultMessageValidator ) ,
768790 authority_name : "test" . to_string ( ) ,
769791 mode : TransportMode :: InVehicle ,
770792 message_callback_handle : None ,
@@ -945,6 +967,7 @@ mod tests {
945967 let mqtt_transport = Mqtt5Transport {
946968 mqtt_client : Arc :: new ( client_operations) ,
947969 registered_listeners : Arc :: new ( RwLock :: new ( RegisteredListeners :: default ( ) ) ) ,
970+ message_mapper : Arc :: new ( mapping:: DefaultMessageValidator ) ,
948971 authority_name : "vin.vehicles" . to_string ( ) ,
949972 mode : TransportMode :: InVehicle ,
950973 message_callback_handle : None ,
@@ -962,10 +985,50 @@ mod tests {
962985 let mqtt_transport = Mqtt5Transport {
963986 mqtt_client : Arc :: new ( client_operations) ,
964987 registered_listeners : Arc :: new ( RwLock :: new ( RegisteredListeners :: default ( ) ) ) ,
988+ message_mapper : Arc :: new ( mapping:: DefaultMessageValidator ) ,
965989 authority_name : "vin.vehicles" . to_string ( ) ,
966990 mode : TransportMode :: InVehicle ,
967991 message_callback_handle : None ,
968992 } ;
969993 mqtt_transport. shutdown ( ) . await ;
970994 }
995+
996+ #[ tokio:: test]
997+ // [utest->dsn~utransport-registerlistener-discard-invalid-messages~1]
998+ async fn test_process_incoming_message_discards_invalid_message ( ) {
999+ use listener_registry:: MockListenerRegistry ;
1000+ use mapping:: MockMessageMapper ;
1001+
1002+ // Create a mock message validator that returns an error (simulating invalid message)
1003+ let mut mock_validator = MockMessageMapper :: new ( ) ;
1004+ mock_validator
1005+ . expect_create_uattributes_from_mqtt_properties ( )
1006+ . once ( )
1007+ . returning ( |_| {
1008+ Err ( UStatus :: fail_with_code (
1009+ UCode :: INVALID_ARGUMENT ,
1010+ "Invalid MQTT message properties" ,
1011+ ) )
1012+ } ) ;
1013+
1014+ // Create a mock listener registry that should NOT be called
1015+ let mut mock_registry = MockListenerRegistry :: new ( ) ;
1016+ mock_registry. expect_determine_listeners_for_topic ( ) . never ( ) ;
1017+ mock_registry
1018+ . expect_determine_listeners_for_subscription_id ( )
1019+ . never ( ) ;
1020+
1021+ // Create a test MQTT message
1022+ let mqtt_message = paho_mqtt:: MessageBuilder :: new ( )
1023+ . topic ( "test.authority/A000/0/2/8A50" )
1024+ . payload ( "test payload" )
1025+ . finalize ( ) ;
1026+
1027+ // Process the message - it should be discarded due to validation error
1028+ process_incoming_message ( & mock_registry, & mock_validator, mqtt_message) . await ;
1029+
1030+ // If the test completes without panic, it means:
1031+ // 1. The validator was called exactly once
1032+ // 2. The listener registry methods were never called (message was discarded)
1033+ }
9711034}
0 commit comments