1212 ********************************************************************************/
1313
1414use log:: * ;
15- use std:: collections:: HashMap ;
1615use std:: sync:: Arc ;
1716use tokio:: sync:: { mpsc:: Receiver , mpsc:: Sender , oneshot, Notify } ;
1817
@@ -21,7 +20,7 @@ use up_rust::{
2120 usubscription_uri, SubscriberInfo , SubscriptionStatus , Update ,
2221 RESOURCE_ID_SUBSCRIPTION_CHANGE ,
2322 } ,
24- UMessageBuilder , UTransport , UUID ,
23+ LocalUriProvider , UMessageBuilder , UTransport , UUID ,
2524} ;
2625
2726use crate :: {
@@ -31,6 +30,9 @@ use crate::{
3130 USubscriptionConfiguration ,
3231} ;
3332
33+ // From usubscription.proto, the uprotocol.notification_topic resource ID
34+ pub ( crate ) const SOURCE_URI_RESOURCE_ID : u16 = 0x8000 ;
35+
3436// This is the core business logic for tracking and sending subscription update notifications. It is currently implemented as a single
3537// event-consuming function `notification_engine()`, which is supposed to be spawned into a task, and process the various notification
3638// `Events` that it can receive via tokio mpsc channel.
@@ -44,6 +46,7 @@ pub(crate) enum NotificationEvent {
4446 } ,
4547 RemoveNotifyee {
4648 subscriber : SubscriberUUri ,
49+ topic : TopicUUri ,
4750 } ,
4851 StateChange {
4952 subscriber : Option < SubscriberUUri > ,
@@ -55,12 +58,12 @@ pub(crate) enum NotificationEvent {
5558 respond_to : oneshot:: Sender < Result < ( ) , PersistencyError > > ,
5659 } ,
5760 GetNotificationTopics {
58- respond_to : oneshot:: Sender < HashMap < SubscriberUUri , TopicUUri > > ,
61+ respond_to : oneshot:: Sender < Vec < ( SubscriberUUri , TopicUUri ) > > ,
5962 } ,
6063 // Purely for use during testing: force-set new notifyees ledger
6164 #[ cfg( test) ]
6265 SetNotificationTopics {
63- notification_topics_replacement : HashMap < SubscriberUUri , TopicUUri > ,
66+ notification_topics_replacement : Vec < ( SubscriberUUri , TopicUUri ) > ,
6467 respond_to : oneshot:: Sender < ( ) > ,
6568 } ,
6669}
@@ -93,9 +96,15 @@ impl PartialEq for NotificationEvent {
9396 } ,
9497 ) => s1 == s2 && t1 == t2,
9598 (
96- NotificationEvent :: RemoveNotifyee { subscriber : s1 } ,
97- NotificationEvent :: RemoveNotifyee { subscriber : s2 } ,
98- ) => s1 == s2,
99+ NotificationEvent :: RemoveNotifyee {
100+ subscriber : s1,
101+ topic : t1,
102+ } ,
103+ NotificationEvent :: RemoveNotifyee {
104+ subscriber : s2,
105+ topic : t2,
106+ } ,
107+ ) => s1 == s2 && t1 == t2,
99108 // Don't care about the test-only variants
100109 _ => false ,
101110 }
@@ -129,6 +138,7 @@ pub(crate) async fn notification_engine(
129138 } ,
130139 } ;
131140 match event {
141+ // [impl->req~usubscription-register-notifications~1]
132142 NotificationEvent :: AddNotifyee { subscriber, topic } => {
133143 if !topic. is_event ( ) {
134144 error ! ( "Topic UUri is not a valid event target" ) ;
@@ -142,8 +152,9 @@ pub(crate) async fn notification_engine(
142152 }
143153 } ;
144154 }
145- NotificationEvent :: RemoveNotifyee { subscriber } => {
146- match notifications. remove_notifyee ( & subscriber) {
155+ // [impl->req~usubscription-unregister-notifications~1]
156+ NotificationEvent :: RemoveNotifyee { subscriber, topic } => {
157+ match notifications. remove_notifyee ( & subscriber, & topic) {
147158 Ok ( _) => { }
148159 Err ( e) => {
149160 error ! ( "Persistency failure {e}" )
@@ -158,7 +169,7 @@ pub(crate) async fn notification_engine(
158169 } => {
159170 // [impl->dsn~usubscription-change-notification-type~1]
160171 let update = Update {
161- topic : Some ( topic) . into ( ) ,
172+ topic : Some ( topic. clone ( ) ) . into ( ) ,
162173 subscriber : Some ( SubscriberInfo {
163174 uri : subscriber. into ( ) ,
164175 ..Default :: default ( )
@@ -188,29 +199,27 @@ pub(crate) async fn notification_engine(
188199 }
189200 }
190201
191- // Send Update message to any dedicated registered notification-subscribers
202+ // Send Update notification message to any dedicated registered notification-subscribers
192203 // [impl->req~usubscription-register-notifications~1]
193- if let Ok ( topics) = notifications. get_topics ( ) {
194- for topic_entry in topics {
204+ if let Ok ( subscribers) = notifications. get_subscribers_registered_for_topic ( & topic)
205+ {
206+ for subscribers_entry in subscribers {
195207 debug ! (
196- "Sending notification to ({}): topic {}, subscriber {}, status {}" ,
197- topic_entry . to_uri( INCLUDE_SCHEMA ) ,
208+ "Sending notification to ({}), about topic {} changing state to {}" ,
209+ subscribers_entry . to_uri( INCLUDE_SCHEMA ) ,
198210 update
199211 . topic
200212 . as_ref( )
201213 . unwrap_or_default( )
202214 . to_uri( INCLUDE_SCHEMA ) ,
203- update
204- . subscriber
205- . uri
206- . as_ref( )
207- . unwrap_or_default( )
208- . to_uri( INCLUDE_SCHEMA ) ,
209215 update. status. as_ref( ) . unwrap_or_default( )
210216 ) ;
211217
212- match UMessageBuilder :: publish ( topic_entry. clone ( ) )
213- . build_with_protobuf_payload ( & update)
218+ match UMessageBuilder :: notification (
219+ configuration. get_resource_uri ( SOURCE_URI_RESOURCE_ID ) ,
220+ subscribers_entry. clone ( ) ,
221+ )
222+ . build_with_protobuf_payload ( & update)
214223 {
215224 Ok ( update_msg) => {
216225 let _r = up_transport. send ( update_msg) . await . inspect_err ( |e|
@@ -254,7 +263,7 @@ pub(crate) async fn notification_engine(
254263
255264// Convenience wrapper for sending state change notification messages
256265// `susbcriber` is an Option, because in the case ob remote subscription state changes, there is no subscriber (other than local usubscription service)
257- pub ( crate ) async fn notify (
266+ pub ( crate ) async fn notify_state_change (
258267 notification_sender : Sender < NotificationEvent > ,
259268 subscriber : Option < SubscriberUUri > ,
260269 topic : TopicUUri ,
@@ -282,9 +291,9 @@ pub(crate) async fn notify(
282291// Might return an empty list if data retrieval fails for some reason, but will perform the reset in any case.
283292pub ( crate ) async fn reset (
284293 notification_sender : Sender < NotificationEvent > ,
285- ) -> Result < HashMap < SubscriberUUri , TopicUUri > , Box < dyn std:: error:: Error > > {
294+ ) -> Result < Vec < ( SubscriberUUri , TopicUUri ) > , Box < dyn std:: error:: Error > > {
286295 // Get current notification registrations
287- let ( respond_to, receive_from) = oneshot:: channel :: < HashMap < SubscriberUUri , TopicUUri > > ( ) ;
296+ let ( respond_to, receive_from) = oneshot:: channel :: < Vec < ( SubscriberUUri , TopicUUri ) > > ( ) ;
288297 notification_sender
289298 . send ( NotificationEvent :: GetNotificationTopics { respond_to } )
290299 . await ?;
0 commit comments