-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathmulti_subscriber.rs
145 lines (127 loc) · 4.41 KB
/
multi_subscriber.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use enostr::{Filter, RelayPool};
use nostrdb::{Ndb, Subscription};
use tracing::{error, info};
use uuid::Uuid;
#[derive(Debug)]
pub struct MultiSubscriber {
pub filters: Vec<Filter>,
pub local_subid: Option<Subscription>,
pub remote_subid: Option<String>,
local_subscribers: u32,
remote_subscribers: u32,
}
impl MultiSubscriber {
/// Create a MultiSubscriber with an initial local subscription.
pub fn with_initial_local_sub(sub: Subscription, filters: Vec<Filter>) -> Self {
let mut msub = MultiSubscriber::new(filters);
msub.local_subid = Some(sub);
msub.local_subscribers = 1;
msub
}
pub fn new(filters: Vec<Filter>) -> Self {
Self {
filters,
local_subid: None,
remote_subid: None,
local_subscribers: 0,
remote_subscribers: 0,
}
}
fn unsubscribe_remote(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
let remote_subid = if let Some(remote_subid) = &self.remote_subid {
remote_subid
} else {
self.err_log(ndb, "unsubscribe_remote: nothing to unsubscribe from?");
return;
};
pool.unsubscribe(remote_subid.clone());
self.remote_subid = None;
}
/// Locally unsubscribe if we have one
fn unsubscribe_local(&mut self, ndb: &mut Ndb) {
let local_sub = if let Some(local_sub) = self.local_subid {
local_sub
} else {
self.err_log(ndb, "unsubscribe_local: nothing to unsubscribe from?");
return;
};
match ndb.unsubscribe(local_sub) {
Err(e) => {
self.err_log(ndb, &format!("Failed to unsubscribe: {e}"));
}
Ok(_) => {
self.local_subid = None;
}
}
}
pub fn unsubscribe(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) -> bool {
if self.local_subscribers == 0 && self.remote_subscribers == 0 {
self.err_log(
ndb,
"Called multi_subscriber unsubscribe when both sub counts are 0",
);
return false;
}
self.local_subscribers = self.local_subscribers.saturating_sub(1);
self.remote_subscribers = self.remote_subscribers.saturating_sub(1);
if self.local_subscribers == 0 && self.remote_subscribers == 0 {
self.info_log(ndb, "Locally unsubscribing");
self.unsubscribe_local(ndb);
self.unsubscribe_remote(ndb, pool);
self.local_subscribers = 0;
self.remote_subscribers = 0;
true
} else {
false
}
}
fn info_log(&self, ndb: &Ndb, msg: &str) {
info!(
"{msg}. {}/{}/{} active ndb/local/remote subscriptions.",
ndb.subscription_count(),
self.local_subscribers,
self.remote_subscribers,
);
}
fn err_log(&self, ndb: &Ndb, msg: &str) {
error!(
"{msg}. {}/{}/{} active ndb/local/remote subscriptions.",
ndb.subscription_count(),
self.local_subscribers,
self.remote_subscribers,
);
}
pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
self.local_subscribers += 1;
self.remote_subscribers += 1;
if self.remote_subscribers == 1 {
if self.remote_subid.is_some() {
self.err_log(
ndb,
"Object is first subscriber, but it already had a subscription",
);
return;
} else {
let subid = Uuid::new_v4().to_string();
pool.subscribe(subid.clone(), self.filters.clone());
self.info_log(ndb, "First remote subscription");
self.remote_subid = Some(subid);
}
}
if self.local_subscribers == 1 {
if self.local_subid.is_some() {
self.err_log(ndb, "Should not have a local subscription already");
return;
}
match ndb.subscribe(&self.filters) {
Ok(sub) => {
self.info_log(ndb, "First local subscription");
self.local_subid = Some(sub);
}
Err(err) => {
error!("multi_subscriber: error subscribing locally: '{err}'")
}
}
}
}
}