Skip to content

Commit a2f9e6c

Browse files
committed
Made the SubReceiver scoped, unsubscribes when it goes out of scope
1 parent eedc105 commit a2f9e6c

File tree

1 file changed

+83
-21
lines changed

1 file changed

+83
-21
lines changed

src/submgr.rs

+83-21
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#![allow(unused)]
22

3-
use std::collections::HashMap;
3+
use std::cmp::Ordering;
4+
use std::collections::BTreeMap;
45
use std::error::Error;
56
use std::fmt;
7+
use std::sync::{Arc, Mutex};
68

79
use enostr::Filter;
810
use nostrdb;
@@ -13,35 +15,44 @@ use nostrdb;
1315
/// already is one. Using a lame (but short) placeholder name instead
1416
/// for now ...
1517
///
16-
/// ```ignore
18+
/// ```no_run
1719
/// use std::error::Error;
20+
/// use std::sync::{Arc, Mutex};
1821
///
1922
/// use notedeck::submgr::{SubMgr, SubSpecBuilder, SubError};
2023
/// use enostr::Filter;
2124
///
2225
/// #[tokio::main]
2326
/// async fn main() -> Result<(), Box<dyn Error>> {
24-
/// let mut submgr = SubMgr::new();
27+
/// let submgr = SubMgr::new();
2528
///
29+
/// // Define a filter and build the subscription specification
2630
/// let filter = Filter::new().kinds(vec![1, 2, 3]).build();
27-
/// let ep = submgr.subscribe(SubSpecBuilder::new(vec![filter]).build())?;
31+
/// let spec = SubSpecBuilder::new(vec![filter]).build();
32+
///
33+
/// // Subscribe and obtain a SubReceiver
34+
/// let receiver = SubMgr::subscribe(submgr.clone(), spec)?;
35+
///
36+
/// // Process incoming note keys
2837
/// loop {
29-
/// match ep.next().await {
30-
/// Ok(nks) => {
31-
/// // process the note keys
38+
/// match receiver.next().await {
39+
/// Ok(note_keys) => {
40+
/// // Process the note keys
41+
/// println!("Received note keys: {:?}", note_keys);
3242
/// },
3343
/// Err(SubError::ReevaluateState) => {
34-
/// // not really an error, break out of loop and reevaluate state
44+
/// // Not really an error; break out to reevaluate the state
3545
/// break;
3646
/// },
3747
/// Err(err) => {
38-
/// // something bad happened
48+
/// // Handle other errors
3949
/// eprintln!("Error: {:?}", err);
4050
/// break;
4151
/// },
4252
/// }
4353
/// }
44-
/// submgr.unsubscribe(ep)?;
54+
///
55+
/// // The subscription will automatically be cleaned up when the receiver goes out of scope
4556
/// Ok(())
4657
/// }
4758
/// ```
@@ -73,9 +84,30 @@ impl Error for SubError {}
7384

7485
pub type SubResult<T> = Result<T, SubError>;
7586

87+
#[derive(Debug, Clone, Copy)]
7688
pub struct SubId(nostrdb::Subscription);
7789

78-
#[derive(Debug)]
90+
impl Ord for SubId {
91+
fn cmp(&self, other: &Self) -> Ordering {
92+
self.0.id().cmp(&other.0.id()) // Access the inner `u64` and compare
93+
}
94+
}
95+
96+
impl PartialOrd for SubId {
97+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
98+
Some(self.cmp(other)) // Delegate to `cmp`
99+
}
100+
}
101+
102+
impl PartialEq for SubId {
103+
fn eq(&self, other: &Self) -> bool {
104+
self.0.id() == other.0.id() // Compare the inner `u64`
105+
}
106+
}
107+
108+
impl Eq for SubId {}
109+
110+
#[derive(Debug, Clone)]
79111
pub enum SubConstraint {
80112
OneShot, // terminate subscription after initial query
81113
Local, // only query the local db, no remote subs
@@ -108,6 +140,7 @@ impl SubSpecBuilder {
108140
}
109141
}
110142

143+
#[derive(Debug, Clone)]
111144
pub struct SubSpec {
112145
rmtid: String,
113146
filters: Vec<Filter>,
@@ -118,32 +151,61 @@ pub struct SubSpec {
118151
}
119152

120153
pub struct SubMgr {
121-
subs: HashMap<SubId, (SubSpec, SubEndpoint)>,
154+
subs: BTreeMap<SubId, (SubSpec, SubSender)>,
122155
}
123156

124157
impl SubMgr {
125-
pub fn new() -> Self {
126-
SubMgr {
127-
subs: HashMap::new(),
128-
}
158+
pub fn new() -> Arc<Mutex<Self>> {
159+
Arc::new(Mutex::new(SubMgr {
160+
subs: BTreeMap::new(),
161+
}))
129162
}
130163

131-
pub fn subscribe(&mut self, sub: SubSpec) -> SubResult<SubEndpoint> {
132-
unimplemented!();
164+
pub fn subscribe(sub_mgr: Arc<Mutex<SubMgr>>, spec: SubSpec) -> SubResult<SubReceiver> {
165+
let mut mgr = sub_mgr.lock().unwrap();
166+
let (id, sender, receiver) = mgr.make_subscription(&spec)?;
167+
mgr.subs.insert(id, (spec, sender));
168+
Ok(SubReceiver {
169+
id,
170+
sub_mgr: sub_mgr.clone(),
171+
})
172+
}
173+
174+
pub fn unsubscribe(sub_mgr: Arc<Mutex<SubMgr>>, id: SubId) -> SubResult<()> {
175+
let mut mgr = sub_mgr.lock().unwrap();
176+
mgr.subs.remove(&id);
177+
Ok(())
133178
}
134179

135-
pub fn unsubscribe(&mut self, ep: SubEndpoint) -> SubResult<()> {
180+
fn make_subscription(&mut self, sub: &SubSpec) -> SubResult<(SubId, SubSender, SubReceiver)> {
136181
unimplemented!();
137182
}
138183
}
139184

140-
pub struct SubEndpoint {
185+
pub struct SubSender {
186+
// internals omitted ...
187+
}
188+
189+
pub struct SubReceiver {
190+
sub_mgr: Arc<Mutex<SubMgr>>,
141191
id: SubId,
142192
// internals omitted ...
143193
}
144194

145-
impl SubEndpoint {
195+
impl SubReceiver {
196+
pub fn new(id: SubId, sub_mgr: Arc<Mutex<SubMgr>>) -> Self {
197+
SubReceiver { id, sub_mgr }
198+
}
199+
146200
pub async fn next(&self) -> SubResult<Vec<nostrdb::NoteKey>> {
147201
unimplemented!();
148202
}
149203
}
204+
205+
impl Drop for SubReceiver {
206+
fn drop(&mut self) {
207+
if let Err(err) = SubMgr::unsubscribe(self.sub_mgr.clone(), self.id.clone()) {
208+
eprintln!("Failed to unsubscribe: {:?}", err);
209+
}
210+
}
211+
}

0 commit comments

Comments
 (0)