Skip to content

Commit 41cd68a

Browse files
Move all notification logic to subscription_manager (#17)
- move all subscription change notification logic to one place (subscription_manager). - fix a functional gap where state-changes of remote subscriptions (e.g. from SUBSCRIBE_PENDING to SUBSCRIBED) haven't resulted in notifications at all
1 parent 5adff7e commit 41cd68a

7 files changed

+317
-176
lines changed

up-subscription/src/handlers/subscribe.rs

Lines changed: 9 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,16 @@ use up_rust::{
2424
UAttributes,
2525
};
2626

27-
use crate::{
28-
helpers, usubscription,
29-
{notification_manager::NotificationEvent, subscription_manager::SubscriptionEvent},
30-
};
27+
use crate::{helpers, subscription_manager::SubscriptionEvent, usubscription};
3128

3229
pub(crate) struct SubscriptionRequestHandler {
3330
subscription_sender: Sender<SubscriptionEvent>,
34-
notification_sender: Sender<NotificationEvent>,
3531
}
3632

3733
impl SubscriptionRequestHandler {
38-
pub(crate) fn new(
39-
subscription_sender: Sender<SubscriptionEvent>,
40-
notification_sender: Sender<NotificationEvent>,
41-
) -> Self {
34+
pub(crate) fn new(subscription_sender: Sender<SubscriptionEvent>) -> Self {
4235
Self {
4336
subscription_sender,
44-
notification_sender,
4537
}
4638
}
4739
}
@@ -108,25 +100,6 @@ impl RequestHandler for SubscriptionRequestHandler {
108100
));
109101
};
110102

111-
// Notify update channel
112-
let (respond_to, receive_from) = oneshot::channel::<()>();
113-
if let Err(e) = self
114-
.notification_sender
115-
.send(NotificationEvent::StateChange {
116-
subscriber: source.clone(),
117-
topic: subscription_request.topic.clone().unwrap_or_default(),
118-
status: status.clone(),
119-
respond_to,
120-
})
121-
.await
122-
{
123-
error!("Error initiating subscription-change update notification: {e}");
124-
}
125-
if let Err(e) = receive_from.await {
126-
// Not returning an error here, as update notification is not a core concern wrt the actual subscription management
127-
warn!("Error sending subscription-change update notification: {e}");
128-
};
129-
130103
// Build and return result
131104
let response = SubscriptionResponse {
132105
topic: Some(subscription_request.topic.unwrap_or_default()).into(),
@@ -164,12 +137,9 @@ mod tests {
164137

165138
let (subscription_sender, mut subscription_receiver) =
166139
mpsc::channel::<SubscriptionEvent>(1);
167-
let (notification_sender, mut notification_receiver) =
168-
mpsc::channel::<NotificationEvent>(1);
169140

170141
// create and spawn off handler, to make all the asnync goodness work
171-
let request_handler =
172-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
142+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
173143
tokio::spawn(async move {
174144
let result = request_handler
175145
.handle_request(
@@ -207,28 +177,6 @@ mod tests {
207177
}
208178
_ => panic!("Wrong event type"),
209179
}
210-
211-
// validate notification manager interaction
212-
let notification_event = notification_receiver.recv().await.unwrap();
213-
match notification_event {
214-
NotificationEvent::StateChange {
215-
subscriber,
216-
topic,
217-
status,
218-
respond_to: _,
219-
} => {
220-
assert_eq!(subscriber, test_lib::helpers::subscriber_uri1());
221-
assert_eq!(topic, test_lib::helpers::local_topic1_uri());
222-
assert_eq!(
223-
status,
224-
SubscriptionStatus {
225-
state: State::SUBSCRIBED.into(),
226-
..Default::default()
227-
}
228-
);
229-
}
230-
_ => panic!("Wrong event type"),
231-
}
232180
}
233181

234182
#[tokio::test]
@@ -245,11 +193,9 @@ mod tests {
245193
};
246194

247195
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
248-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
249196

250197
// create handler and perform tested operation
251-
let request_handler =
252-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
198+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
253199

254200
let result = request_handler
255201
.handle_request(
@@ -276,11 +222,9 @@ mod tests {
276222
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
277223

278224
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
279-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
280225

281226
// create handler and perform tested operation
282-
let request_handler =
283-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
227+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
284228

285229
let result = request_handler
286230
.handle_request(
@@ -308,11 +252,9 @@ mod tests {
308252
};
309253

310254
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
311-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
312255

313256
// create handler and perform tested operation
314-
let request_handler =
315-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
257+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
316258

317259
let result = request_handler
318260
.handle_request(RESOURCE_ID_SUBSCRIBE, &message_attributes, None)
@@ -339,11 +281,9 @@ mod tests {
339281
};
340282

341283
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
342-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
343284

344285
// create handler and perform tested operation
345-
let request_handler =
346-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
286+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
347287

348288
let result = request_handler
349289
.handle_request(
@@ -385,11 +325,9 @@ mod tests {
385325

386326
let (subscription_sender, mut subscription_receiver) =
387327
mpsc::channel::<SubscriptionEvent>(1);
388-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
389328

390329
// create and spawn off handler, to make all the asnync goodness work
391-
let request_handler =
392-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
330+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
393331
tokio::spawn(async move {
394332
let result = request_handler
395333
.handle_request(
@@ -450,11 +388,9 @@ mod tests {
450388
};
451389

452390
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
453-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
454391

455392
// create handler and perform tested operation
456-
let request_handler =
457-
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
393+
let request_handler = SubscriptionRequestHandler::new(subscription_sender);
458394

459395
let result = request_handler
460396
.handle_request(

up-subscription/src/handlers/unsubscribe.rs

Lines changed: 8 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,16 @@ use up_rust::{
2323
UAttributes,
2424
};
2525

26-
use crate::{
27-
helpers, notification_manager::NotificationEvent, subscription_manager::SubscriptionEvent,
28-
};
26+
use crate::{helpers, subscription_manager::SubscriptionEvent};
2927

3028
pub(crate) struct UnubscribeRequestHandler {
3129
subscription_sender: Sender<SubscriptionEvent>,
32-
notification_sender: Sender<NotificationEvent>,
3330
}
3431

3532
impl UnubscribeRequestHandler {
36-
pub(crate) fn new(
37-
subscription_sender: Sender<SubscriptionEvent>,
38-
notification_sender: Sender<NotificationEvent>,
39-
) -> Self {
33+
pub(crate) fn new(subscription_sender: Sender<SubscriptionEvent>) -> Self {
4034
Self {
4135
subscription_sender,
42-
notification_sender,
4336
}
4437
}
4538
}
@@ -78,31 +71,12 @@ impl RequestHandler for UnubscribeRequestHandler {
7871
"Error processing request".to_string(),
7972
));
8073
}
81-
let Ok(status) = receive_from.await else {
74+
let Ok(_status) = receive_from.await else {
8275
return Err(ServiceInvocationError::Internal(
8376
"Error processing request".to_string(),
8477
));
8578
};
8679

87-
// Notify update channel
88-
let (respond_to, receive_from) = oneshot::channel::<()>();
89-
if let Err(e) = self
90-
.notification_sender
91-
.send(NotificationEvent::StateChange {
92-
subscriber: source.clone(),
93-
topic: unsubscribe_request.topic.clone().unwrap_or_default(),
94-
status: status.clone(),
95-
respond_to,
96-
})
97-
.await
98-
{
99-
error!("Error initiating subscription-change update notification: {e}");
100-
}
101-
if let Err(e) = receive_from.await {
102-
// Not returning an error here, as update notification is not a core concern wrt the actual subscription management
103-
warn!("Error sending subscription-change update notification: {e}");
104-
};
105-
10680
// Build and return result
10781
let response_payload = UPayload::try_from_protobuf(UnsubscribeResponse::default())
10882
.map_err(|e| {
@@ -136,12 +110,9 @@ mod tests {
136110

137111
let (subscription_sender, mut subscription_receiver) =
138112
mpsc::channel::<SubscriptionEvent>(1);
139-
let (notification_sender, mut notification_receiver) =
140-
mpsc::channel::<NotificationEvent>(1);
141113

142114
// create and spawn off handler, to make all the asnync goodness work
143-
let request_handler =
144-
UnubscribeRequestHandler::new(subscription_sender, notification_sender);
115+
let request_handler = UnubscribeRequestHandler::new(subscription_sender);
145116
tokio::spawn(async move {
146117
let result = request_handler
147118
.handle_request(
@@ -179,28 +150,6 @@ mod tests {
179150
}
180151
_ => panic!("Wrong event type"),
181152
}
182-
183-
// validate notification manager interaction
184-
let notification_event = notification_receiver.recv().await.unwrap();
185-
match notification_event {
186-
NotificationEvent::StateChange {
187-
subscriber,
188-
topic,
189-
status,
190-
respond_to: _,
191-
} => {
192-
assert_eq!(subscriber, test_lib::helpers::subscriber_uri1());
193-
assert_eq!(topic, test_lib::helpers::local_topic1_uri());
194-
assert_eq!(
195-
status,
196-
SubscriptionStatus {
197-
state: State::UNSUBSCRIBED.into(),
198-
..Default::default()
199-
}
200-
);
201-
}
202-
_ => panic!("Wrong event type"),
203-
}
204153
}
205154

206155
#[tokio::test]
@@ -217,11 +166,9 @@ mod tests {
217166
};
218167

219168
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
220-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
221169

222170
// create handler and perform tested operation
223-
let request_handler =
224-
UnubscribeRequestHandler::new(subscription_sender, notification_sender);
171+
let request_handler = UnubscribeRequestHandler::new(subscription_sender);
225172

226173
let result = request_handler
227174
.handle_request(
@@ -248,11 +195,9 @@ mod tests {
248195
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
249196

250197
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
251-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
252198

253199
// create handler and perform tested operation
254-
let request_handler =
255-
UnubscribeRequestHandler::new(subscription_sender, notification_sender);
200+
let request_handler = UnubscribeRequestHandler::new(subscription_sender);
256201

257202
let result = request_handler
258203
.handle_request(
@@ -280,11 +225,9 @@ mod tests {
280225
};
281226

282227
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
283-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
284228

285229
// create handler and perform tested operation
286-
let request_handler =
287-
UnubscribeRequestHandler::new(subscription_sender, notification_sender);
230+
let request_handler = UnubscribeRequestHandler::new(subscription_sender);
288231

289232
let result = request_handler
290233
.handle_request(RESOURCE_ID_UNSUBSCRIBE, &message_attributes, None)
@@ -311,11 +254,9 @@ mod tests {
311254
};
312255

313256
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
314-
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
315257

316258
// create handler and perform tested operation
317-
let request_handler =
318-
UnubscribeRequestHandler::new(subscription_sender, notification_sender);
259+
let request_handler = UnubscribeRequestHandler::new(subscription_sender);
319260

320261
let result = request_handler
321262
.handle_request(

up-subscription/src/notification_manager.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use log::*;
1515
#[cfg(test)]
1616
use std::collections::HashMap;
1717
use std::sync::Arc;
18-
use tokio::sync::{mpsc::Receiver, oneshot, Notify};
18+
use tokio::sync::{mpsc::Receiver, mpsc::Sender, oneshot, Notify};
1919

2020
use up_rust::{
2121
core::usubscription::{
@@ -42,7 +42,7 @@ pub(crate) enum NotificationEvent {
4242
subscriber: UUri,
4343
},
4444
StateChange {
45-
subscriber: UUri,
45+
subscriber: Option<UUri>,
4646
topic: UUri,
4747
status: SubscriptionStatus,
4848
respond_to: oneshot::Sender<()>,
@@ -117,7 +117,7 @@ pub(crate) async fn notification_engine(
117117
let update = Update {
118118
topic: Some(topic).into(),
119119
subscriber: Some(SubscriberInfo {
120-
uri: Some(subscriber.clone()).into(),
120+
uri: subscriber.into(),
121121
..Default::default()
122122
})
123123
.into(),
@@ -201,3 +201,29 @@ pub(crate) async fn notification_engine(
201201
}
202202
}
203203
}
204+
205+
// Convenience wrapper for sending state change notification messages
206+
// susbcriber is an Option, because in the case ob remote subscription state changes, there is no subscriber (other than local usubscription service)
207+
pub(crate) async fn notify(
208+
notification_sender: Sender<NotificationEvent>,
209+
subscriber: Option<UUri>,
210+
topic: UUri,
211+
status: SubscriptionStatus,
212+
) {
213+
let (respond_to, receive_from) = oneshot::channel::<()>();
214+
if let Err(e) = notification_sender
215+
.send(NotificationEvent::StateChange {
216+
subscriber,
217+
topic,
218+
status,
219+
respond_to,
220+
})
221+
.await
222+
{
223+
error!("Error initiating subscription-change update notification: {e}");
224+
}
225+
if let Err(e) = receive_from.await {
226+
// Not returning an error here, as update notification is not a core concern wrt the actual subscription management
227+
warn!("Error sending subscription-change update notification: {e}");
228+
};
229+
}

0 commit comments

Comments
 (0)