Skip to content

Commit e7123f3

Browse files
committed
Fix notification manager implementation and req traces
1 parent 2bb27b7 commit e7123f3

File tree

10 files changed

+365
-148
lines changed

10 files changed

+365
-148
lines changed

up-subscription/src/configuration.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use up_rust::{
1919
LocalUriProvider, UUri,
2020
};
2121

22+
/// What uSubscription service uses as the resource ID for LocalUriProvider::get_source_uri()
23+
pub(crate) const SOURCE_URI_RESOURCE_ID: u16 = 0x00FF;
24+
2225
/// Default subscription and notification command channel buffer size
2326
pub(crate) const DEFAULT_COMMAND_BUFFER_SIZE: usize = 1024;
2427

@@ -128,7 +131,7 @@ impl LocalUriProvider for USubscriptionConfiguration {
128131
&self.authority_name,
129132
USUBSCRIPTION_TYPE_ID,
130133
USUBSCRIPTION_VERSION_MAJOR,
131-
0x0,
134+
SOURCE_URI_RESOURCE_ID,
132135
)
133136
.expect("Error constructing usubscription UUri")
134137
}

up-subscription/src/handlers/reset.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ mod tests {
8989
use super::*;
9090
use tokio::sync::mpsc::{self};
9191

92+
use up_rust::UUri;
93+
9294
use crate::{helpers, tests::test_lib};
9395

9496
// [utest->dsn~usubscription-reset-protobuf~1]
@@ -254,9 +256,12 @@ mod tests {
254256
helpers::init_once();
255257

256258
// create request and other required object(s)
259+
let bad_source =
260+
UUri::try_from("up://LOCAL/1000/1/F").expect("Error during test case setup");
261+
257262
let request_payload = UPayload::try_from_protobuf(ResetRequest::default()).unwrap();
258263
let message_attributes = UAttributes {
259-
source: Some(test_lib::helpers::subscriber_uri1()).into(),
264+
source: Some(bad_source).into(),
260265
..Default::default()
261266
};
262267

up-subscription/src/handlers/unregister_for_notifications.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl RequestHandler for UnregisterNotificationsRequestHandler {
7070
// Interact with notification manager backend
7171
let se = NotificationEvent::RemoveNotifyee {
7272
subscriber: source.clone(),
73+
topic: topic.clone(),
7374
};
7475

7576
if let Err(e) = self.notification_sender.send(se).await {
@@ -139,8 +140,9 @@ mod tests {
139140
// validate subscription manager interaction
140141
let notification_event = notification_receiver.recv().await.unwrap();
141142
match notification_event {
142-
NotificationEvent::RemoveNotifyee { subscriber } => {
143+
NotificationEvent::RemoveNotifyee { subscriber, topic } => {
143144
assert_eq!(subscriber, test_lib::helpers::subscriber_uri1());
145+
assert_eq!(topic, test_lib::helpers::local_topic1_uri());
144146
}
145147
_ => panic!("Wrong event type"),
146148
}

up-subscription/src/notification_manager.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
********************************************************************************/
1313

1414
use log::*;
15-
use std::collections::HashMap;
1615
use std::sync::Arc;
1716
use tokio::sync::{mpsc::Receiver, mpsc::Sender, oneshot, Notify};
1817

@@ -21,7 +20,7 @@ use up_rust::{
2120
usubscription_uri, SubscriberInfo, SubscriptionStatus, Update,
2221
RESOURCE_ID_SUBSCRIPTION_CHANGE,
2322
},
24-
UMessageBuilder, UTransport, UUID,
23+
LocalUriProvider, UMessageBuilder, UTransport, UUID,
2524
};
2625

2726
use crate::{
@@ -31,6 +30,9 @@ use crate::{
3130
USubscriptionConfiguration,
3231
};
3332

33+
// From usubscription.proto, the uprotocol.notification_topic resource ID
34+
pub(crate) const SOURCE_URI_RESOURCE_ID: u16 = 0x8000;
35+
3436
// This is the core business logic for tracking and sending subscription update notifications. It is currently implemented as a single
3537
// event-consuming function `notification_engine()`, which is supposed to be spawned into a task, and process the various notification
3638
// `Events` that it can receive via tokio mpsc channel.
@@ -44,6 +46,7 @@ pub(crate) enum NotificationEvent {
4446
},
4547
RemoveNotifyee {
4648
subscriber: SubscriberUUri,
49+
topic: TopicUUri,
4750
},
4851
StateChange {
4952
subscriber: Option<SubscriberUUri>,
@@ -55,12 +58,12 @@ pub(crate) enum NotificationEvent {
5558
respond_to: oneshot::Sender<Result<(), PersistencyError>>,
5659
},
5760
GetNotificationTopics {
58-
respond_to: oneshot::Sender<HashMap<SubscriberUUri, TopicUUri>>,
61+
respond_to: oneshot::Sender<Vec<(SubscriberUUri, TopicUUri)>>,
5962
},
6063
// Purely for use during testing: force-set new notifyees ledger
6164
#[cfg(test)]
6265
SetNotificationTopics {
63-
notification_topics_replacement: HashMap<SubscriberUUri, TopicUUri>,
66+
notification_topics_replacement: Vec<(SubscriberUUri, TopicUUri)>,
6467
respond_to: oneshot::Sender<()>,
6568
},
6669
}
@@ -93,9 +96,15 @@ impl PartialEq for NotificationEvent {
9396
},
9497
) => s1 == s2 && t1 == t2,
9598
(
96-
NotificationEvent::RemoveNotifyee { subscriber: s1 },
97-
NotificationEvent::RemoveNotifyee { subscriber: s2 },
98-
) => s1 == s2,
99+
NotificationEvent::RemoveNotifyee {
100+
subscriber: s1,
101+
topic: t1,
102+
},
103+
NotificationEvent::RemoveNotifyee {
104+
subscriber: s2,
105+
topic: t2,
106+
},
107+
) => s1 == s2 && t1 == t2,
99108
// Don't care about the test-only variants
100109
_ => false,
101110
}
@@ -129,6 +138,7 @@ pub(crate) async fn notification_engine(
129138
},
130139
};
131140
match event {
141+
// [impl->req~usubscription-register-notifications~1]
132142
NotificationEvent::AddNotifyee { subscriber, topic } => {
133143
if !topic.is_event() {
134144
error!("Topic UUri is not a valid event target");
@@ -142,8 +152,9 @@ pub(crate) async fn notification_engine(
142152
}
143153
};
144154
}
145-
NotificationEvent::RemoveNotifyee { subscriber } => {
146-
match notifications.remove_notifyee(&subscriber) {
155+
// [impl->req~usubscription-unregister-notifications~1]
156+
NotificationEvent::RemoveNotifyee { subscriber, topic } => {
157+
match notifications.remove_notifyee(&subscriber, &topic) {
147158
Ok(_) => {}
148159
Err(e) => {
149160
error!("Persistency failure {e}")
@@ -158,7 +169,7 @@ pub(crate) async fn notification_engine(
158169
} => {
159170
// [impl->dsn~usubscription-change-notification-type~1]
160171
let update = Update {
161-
topic: Some(topic).into(),
172+
topic: Some(topic.clone()).into(),
162173
subscriber: Some(SubscriberInfo {
163174
uri: subscriber.into(),
164175
..Default::default()
@@ -188,29 +199,27 @@ pub(crate) async fn notification_engine(
188199
}
189200
}
190201

191-
// Send Update message to any dedicated registered notification-subscribers
202+
// Send Update notification message to any dedicated registered notification-subscribers
192203
// [impl->req~usubscription-register-notifications~1]
193-
if let Ok(topics) = notifications.get_topics() {
194-
for topic_entry in topics {
204+
if let Ok(subscribers) = notifications.get_subscribers_registered_for_topic(&topic)
205+
{
206+
for subscribers_entry in subscribers {
195207
debug!(
196-
"Sending notification to ({}): topic {}, subscriber {}, status {}",
197-
topic_entry.to_uri(INCLUDE_SCHEMA),
208+
"Sending notification to ({}), about topic {} changing state to {}",
209+
subscribers_entry.to_uri(INCLUDE_SCHEMA),
198210
update
199211
.topic
200212
.as_ref()
201213
.unwrap_or_default()
202214
.to_uri(INCLUDE_SCHEMA),
203-
update
204-
.subscriber
205-
.uri
206-
.as_ref()
207-
.unwrap_or_default()
208-
.to_uri(INCLUDE_SCHEMA),
209215
update.status.as_ref().unwrap_or_default()
210216
);
211217

212-
match UMessageBuilder::publish(topic_entry.clone())
213-
.build_with_protobuf_payload(&update)
218+
match UMessageBuilder::notification(
219+
configuration.get_resource_uri(SOURCE_URI_RESOURCE_ID),
220+
subscribers_entry.clone(),
221+
)
222+
.build_with_protobuf_payload(&update)
214223
{
215224
Ok(update_msg) => {
216225
let _r = up_transport.send(update_msg).await.inspect_err(|e|
@@ -254,7 +263,7 @@ pub(crate) async fn notification_engine(
254263

255264
// Convenience wrapper for sending state change notification messages
256265
// `susbcriber` is an Option, because in the case ob remote subscription state changes, there is no subscriber (other than local usubscription service)
257-
pub(crate) async fn notify(
266+
pub(crate) async fn notify_state_change(
258267
notification_sender: Sender<NotificationEvent>,
259268
subscriber: Option<SubscriberUUri>,
260269
topic: TopicUUri,
@@ -282,9 +291,9 @@ pub(crate) async fn notify(
282291
// Might return an empty list if data retrieval fails for some reason, but will perform the reset in any case.
283292
pub(crate) async fn reset(
284293
notification_sender: Sender<NotificationEvent>,
285-
) -> Result<HashMap<SubscriberUUri, TopicUUri>, Box<dyn std::error::Error>> {
294+
) -> Result<Vec<(SubscriberUUri, TopicUUri)>, Box<dyn std::error::Error>> {
286295
// Get current notification registrations
287-
let (respond_to, receive_from) = oneshot::channel::<HashMap<SubscriberUUri, TopicUUri>>();
296+
let (respond_to, receive_from) = oneshot::channel::<Vec<(SubscriberUUri, TopicUUri)>>();
288297
notification_sender
289298
.send(NotificationEvent::GetNotificationTopics { respond_to })
290299
.await?;

0 commit comments

Comments
 (0)