Skip to content

Commit 5adff7e

Browse files
Add capability to persist subscription expiry property (#16)
In preparation for implementing subscription expiry, add capability to receive and persist subscription expiry information.
1 parent 24b9cc6 commit 5adff7e

10 files changed

+331
-93
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ This codebase is approaching beta state, mirroring progress in up-spec and up-ru
1616
- [x] feed back learnings and clarifications into up-spec usubscription documentation
1717
- [ ] implement any changes necessary to align with update/re-write of uSubscription specification
1818
- [ ] create a little demo application for interacting with up-subscription
19+
- [ ] tests for subscriber persistency
20+
- [ ] implement subscription expiry
1921

2022
## Getting Started
2123

up-subscription/src/handlers/fetch_subscribers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ mod tests {
173173

174174
// create request and other required object(s)
175175
let subscribe_request =
176-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
176+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
177177
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
178178
let message_attributes = UAttributes {
179179
source: Some(test_lib::helpers::subscriber_uri1()).into(),
@@ -205,7 +205,7 @@ mod tests {
205205

206206
// create request and other required object(s)
207207
let subscribe_request =
208-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
208+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
209209
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
210210

211211
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);

up-subscription/src/handlers/fetch_subscriptions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ mod tests {
224224

225225
// create request and other required object(s)
226226
let subscribe_request =
227-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
227+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
228228
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
229229
let message_attributes = UAttributes {
230230
source: Some(test_lib::helpers::subscriber_uri1()).into(),
@@ -256,7 +256,7 @@ mod tests {
256256

257257
// create request and other required object(s)
258258
let subscribe_request =
259-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
259+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
260260
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
261261

262262
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);

up-subscription/src/handlers/subscribe.rs

Lines changed: 134 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use async_trait::async_trait;
1515
use log::*;
16+
use std::time::{SystemTime, UNIX_EPOCH};
1617
use tokio::{sync::mpsc::Sender, sync::oneshot};
1718

1819
use up_rust::{
@@ -24,7 +25,7 @@ use up_rust::{
2425
};
2526

2627
use crate::{
27-
helpers,
28+
helpers, usubscription,
2829
{notification_manager::NotificationEvent, subscription_manager::SubscriptionEvent},
2930
};
3031

@@ -66,11 +67,32 @@ impl RequestHandler for SubscriptionRequestHandler {
6667
));
6768
};
6869

70+
// Provisionally compute milliseconds to subscription expiry, from protobuf.google.Timestamp input in second granularity (we ignore the nanos).
71+
// Likely to change in the future, when we get rid of the protobuf.google.Timestamp type and track in milliseconds throughought.
72+
let expiry: Option<usubscription::ExpiryTimestamp> =
73+
match subscription_request.attributes.expire.seconds.try_into() {
74+
Ok(0) => None,
75+
Ok(seconds) => Some(seconds * 1000),
76+
Err(_) => None,
77+
};
78+
// Check if the expiry timestamp is in the future
79+
if let Some(expiry_ms) = expiry {
80+
let now_ms = SystemTime::now()
81+
.duration_since(UNIX_EPOCH)
82+
.expect("Time went backwards")
83+
.as_millis();
84+
if now_ms > expiry_ms {
85+
return Err(ServiceInvocationError::InvalidArgument(
86+
"Subscription Expiry time already passed".to_string(),
87+
));
88+
}
89+
}
6990
// Interact with subscription manager backend
7091
let (respond_to, receive_from) = oneshot::channel::<SubscriptionStatus>();
7192
let se = SubscriptionEvent::AddSubscription {
7293
subscriber: source.clone(),
7394
topic: topic.clone(),
95+
expiry,
7496
respond_to,
7597
};
7698

@@ -133,7 +155,7 @@ mod tests {
133155

134156
// create request and other required object(s)
135157
let subscribe_request =
136-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
158+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
137159
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
138160
let message_attributes = UAttributes {
139161
source: Some(test_lib::helpers::subscriber_uri1()).into(),
@@ -172,6 +194,7 @@ mod tests {
172194
SubscriptionEvent::AddSubscription {
173195
subscriber,
174196
topic,
197+
expiry: None,
175198
respond_to,
176199
} => {
177200
assert_eq!(subscriber, test_lib::helpers::subscriber_uri1());
@@ -214,7 +237,7 @@ mod tests {
214237

215238
// create request and other required object(s)
216239
let subscribe_request =
217-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
240+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
218241
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
219242
let message_attributes = UAttributes {
220243
source: Some(test_lib::helpers::subscriber_uri1()).into(),
@@ -249,7 +272,7 @@ mod tests {
249272

250273
// create request and other required object(s)
251274
let subscribe_request =
252-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
275+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
253276
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
254277

255278
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
@@ -336,4 +359,111 @@ mod tests {
336359
_ => panic!("Wrong error type"),
337360
}
338361
}
362+
363+
#[tokio::test]
364+
async fn test_future_subscription() {
365+
helpers::init_once();
366+
367+
let future_secs = SystemTime::now()
368+
.duration_since(UNIX_EPOCH)
369+
.unwrap()
370+
.as_secs()
371+
+ 600;
372+
373+
// create request and other required object(s)
374+
// SubscriptionRequest currently uses protobuf.google.Timestamp for expiry attribute, which tracks timestamp in second granularity (we ignore the nanos)
375+
// Also, protobuf can only do signed ints, and uses an i64 for the seconds... so we have to force our timestamp into that.
376+
let subscribe_request = test_lib::helpers::subscription_request(
377+
test_lib::helpers::local_topic1_uri(),
378+
Some(u32::try_from(future_secs).unwrap()),
379+
);
380+
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
381+
let message_attributes = UAttributes {
382+
source: Some(test_lib::helpers::subscriber_uri1()).into(),
383+
..Default::default()
384+
};
385+
386+
let (subscription_sender, mut subscription_receiver) =
387+
mpsc::channel::<SubscriptionEvent>(1);
388+
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
389+
390+
// create and spawn off handler, to make all the asnync goodness work
391+
let request_handler =
392+
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
393+
tokio::spawn(async move {
394+
let result = request_handler
395+
.handle_request(
396+
RESOURCE_ID_SUBSCRIBE,
397+
&message_attributes,
398+
Some(request_payload),
399+
)
400+
.await
401+
.unwrap();
402+
403+
let response: SubscriptionResponse = result.unwrap().extract_protobuf().unwrap();
404+
assert_eq!(
405+
response.topic.unwrap_or_default(),
406+
test_lib::helpers::local_topic1_uri()
407+
);
408+
assert_eq!(response.status.unwrap().state, State::SUBSCRIBED.into());
409+
});
410+
411+
// validate subscription manager interaction
412+
let subscription_event = subscription_receiver.recv().await.unwrap();
413+
match subscription_event {
414+
SubscriptionEvent::AddSubscription {
415+
subscriber,
416+
topic,
417+
expiry,
418+
respond_to,
419+
} => {
420+
assert_eq!(subscriber, test_lib::helpers::subscriber_uri1());
421+
assert_eq!(topic, test_lib::helpers::local_topic1_uri());
422+
// we're passing in seconds above, because of how SubscriptionRequest is defined - but internally we are handling milliseconds; therefore *1000
423+
assert_eq!(
424+
expiry,
425+
Some((future_secs as usubscription::ExpiryTimestamp) * 1000)
426+
);
427+
428+
let _ = respond_to.send(SubscriptionStatus {
429+
state: State::SUBSCRIBED.into(),
430+
..Default::default()
431+
});
432+
}
433+
_ => panic!("Wrong event type"),
434+
}
435+
}
436+
437+
#[tokio::test]
438+
async fn test_expired_subscription() {
439+
helpers::init_once();
440+
441+
// create request and other required object(s)
442+
let subscribe_request = test_lib::helpers::subscription_request(
443+
test_lib::helpers::local_topic1_uri(),
444+
Some(10),
445+
);
446+
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
447+
let message_attributes = UAttributes {
448+
source: Some(test_lib::helpers::subscriber_uri1()).into(),
449+
..Default::default()
450+
};
451+
452+
let (subscription_sender, _) = mpsc::channel::<SubscriptionEvent>(1);
453+
let (notification_sender, _) = mpsc::channel::<NotificationEvent>(1);
454+
455+
// create handler and perform tested operation
456+
let request_handler =
457+
SubscriptionRequestHandler::new(subscription_sender, notification_sender);
458+
459+
let result = request_handler
460+
.handle_request(
461+
up_rust::core::usubscription::RESOURCE_ID_SUBSCRIBE,
462+
&message_attributes,
463+
Some(request_payload),
464+
)
465+
.await;
466+
467+
assert!(result.is_err_and(|e| matches!(e, ServiceInvocationError::InvalidArgument(_))));
468+
}
339469
}

up-subscription/src/handlers/unsubscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ mod tests {
303303

304304
// create request and other required object(s)
305305
let subscribe_request =
306-
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri());
306+
test_lib::helpers::subscription_request(test_lib::helpers::local_topic1_uri(), None);
307307
let request_payload = UPayload::try_from_protobuf(subscribe_request.clone()).unwrap();
308308
let message_attributes = UAttributes {
309309
source: Some(test_lib::helpers::subscriber_uri1()).into(),

0 commit comments

Comments
 (0)