Skip to content

Commit ff6aba8

Browse files
committed
WIP: Add dispatcher and task modules
1 parent 53cfce1 commit ff6aba8

File tree

4 files changed

+198
-1
lines changed

4 files changed

+198
-1
lines changed

src/app.rs

+24-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::{
44
app_style::user_requested_visuals_change,
55
args::Args,
66
column::Columns,
7+
dispatcher::{self, HandlerTable},
78
draft::Drafts,
89
error::{Error, FilterError},
910
filter::{self, FilterState},
@@ -16,6 +17,7 @@ use crate::{
1617
notes_holder::NotesHolderStorage,
1718
profile::Profile,
1819
subscriptions::{SubKind, Subscriptions},
20+
task,
1921
thread::Thread,
2022
timeline::{Timeline, TimelineId, TimelineKind, ViewFilter},
2123
ui::{self, DesktopSidePanel},
@@ -32,6 +34,7 @@ use egui_extras::{Size, StripBuilder};
3234

3335
use nostrdb::{Config, Filter, Ndb, Note, Transaction};
3436

37+
use futures::SinkExt;
3538
use std::collections::HashMap;
3639
use std::path::Path;
3740
use std::time::Duration;
@@ -61,6 +64,7 @@ pub struct Damus {
6164
pub img_cache: ImageCache,
6265
pub accounts: AccountManager,
6366
pub subscriptions: Subscriptions,
67+
pub dispatch: HandlerTable,
6468

6569
frame_history: crate::frame_history::FrameHistory,
6670

@@ -472,6 +476,11 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
472476
.insert("unknownids".to_string(), SubKind::OneShot);
473477
setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns)
474478
.expect("home subscription failed");
479+
480+
let damusref = damus.reference();
481+
tokio::spawn(async move {
482+
task::setup_user_relays(damusref).await;
483+
});
475484
}
476485

477486
DamusState::NewTimelineSub(new_timeline_id) => {
@@ -511,14 +520,26 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
511520
damus.columns.attempt_perform_deletion_request();
512521
}
513522

514-
fn process_event(damus: &mut Damus, _subid: &str, event: &str) {
523+
fn process_event(damus: &mut Damus, subid: &str, event: &str) {
515524
#[cfg(feature = "profiling")]
516525
puffin::profile_function!();
517526

518527
debug!("processing event {}", event);
519528
if let Err(_err) = damus.ndb.process_event(event) {
520529
error!("error processing event {}", event);
521530
}
531+
532+
// Notify waiting subscribers that a pool event has happened
533+
if let Some(handler) = damus.dispatch.get(subid) {
534+
let mut handler_clone = handler.clone();
535+
tokio::spawn(async move {
536+
handler_clone
537+
.sender
538+
.send(dispatcher::Event::Pool)
539+
.await
540+
.ok();
541+
});
542+
}
522543
}
523544

524545
fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> {
@@ -726,6 +747,7 @@ impl Damus {
726747
debug,
727748
unknown_ids: UnknownIds::default(),
728749
subscriptions: Subscriptions::default(),
750+
dispatch: HandlerTable::default(),
729751
since_optimize: parsed_args.since_optimize,
730752
threads: NotesHolderStorage::default(),
731753
profiles: NotesHolderStorage::default(),
@@ -822,6 +844,7 @@ impl Damus {
822844
debug,
823845
unknown_ids: UnknownIds::default(),
824846
subscriptions: Subscriptions::default(),
847+
dispatch: HandlerTable::default(),
825848
since_optimize: true,
826849
threads: NotesHolderStorage::default(),
827850
profiles: NotesHolderStorage::default(),

src/dispatcher.rs

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use futures::channel::mpsc;
2+
use std::collections::HashMap;
3+
use std::error::Error;
4+
use std::fmt;
5+
use uuid::Uuid;
6+
7+
use nostrdb::Filter;
8+
9+
use crate::Damus;
10+
11+
#[allow(dead_code)] // until InternalError is used
12+
#[derive(Debug)]
13+
pub enum DispatcherError {
14+
InternalError(String),
15+
}
16+
17+
impl fmt::Display for DispatcherError {
18+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19+
match self {
20+
DispatcherError::InternalError(msg) => write!(f, "Internal error: {}", msg),
21+
}
22+
}
23+
}
24+
25+
impl Error for DispatcherError {}
26+
27+
pub type DispatcherResult<T> = Result<T, DispatcherError>;
28+
29+
#[derive(Debug)]
30+
pub enum Event {
31+
Pool,
32+
}
33+
34+
/// Used by the relay code to dispatch events to a waiting handlers
35+
#[derive(Debug, Clone)]
36+
pub struct SubscriptionHandler {
37+
pub sender: mpsc::Sender<Event>,
38+
}
39+
40+
/// Maps subscription id to handler for the subscription
41+
pub type HandlerTable = HashMap<String, SubscriptionHandler>;
42+
43+
/// Used by async tasks to receive events
44+
#[allow(dead_code)] // until id is read
45+
#[derive(Debug)]
46+
pub struct Subscription {
47+
pub id: String,
48+
pub receiver: mpsc::Receiver<Event>,
49+
}
50+
51+
pub fn subscribe(
52+
damus: &mut Damus,
53+
filters: &[Filter],
54+
bufsz: usize,
55+
) -> DispatcherResult<Subscription> {
56+
let (sender, receiver) = mpsc::channel::<Event>(bufsz);
57+
let id = Uuid::new_v4().to_string();
58+
damus
59+
.dispatch
60+
.insert(id.clone(), SubscriptionHandler { sender });
61+
damus.pool.subscribe(id.clone(), filters.into());
62+
Ok(Subscription { id, receiver })
63+
}
64+
65+
pub fn _unsubscribe(_sub: Subscription) -> DispatcherResult<()> {
66+
unimplemented!()
67+
}

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ mod app_style;
1111
mod args;
1212
mod colors;
1313
mod column;
14+
mod dispatcher;
1415
mod draft;
1516
mod filter;
1617
mod fonts;
@@ -32,6 +33,7 @@ pub mod relay_pool_manager;
3233
mod result;
3334
mod route;
3435
mod subscriptions;
36+
mod task;
3537
mod test_data;
3638
mod thread;
3739
mod time;

src/task.rs

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use futures::stream::StreamExt;
2+
use tracing::{debug, error};
3+
4+
use enostr::RelayPool;
5+
use nostrdb::{Filter, Ndb, Transaction};
6+
7+
use crate::dispatcher;
8+
use crate::note::NoteRef;
9+
use crate::{with_mut_damus, DamusRef};
10+
11+
pub async fn setup_user_relays(damusref: DamusRef) {
12+
debug!("do_setup_user_relays starting");
13+
14+
let filter = with_mut_damus(&damusref, |damus| {
15+
debug!("setup_user_relays: acquired damus for filter");
16+
17+
let account = damus
18+
.accounts
19+
.get_selected_account()
20+
.as_ref()
21+
.map(|a| a.pubkey.bytes())
22+
.expect("selected account");
23+
24+
// NIP-65
25+
Filter::new()
26+
.authors([account])
27+
.kinds([10002])
28+
.limit(1)
29+
.build()
30+
});
31+
32+
let mut sub = with_mut_damus(&damusref, |damus| {
33+
debug!("setup_user_relays: acquired damus for query + subscribe");
34+
let txn = Transaction::new(&damus.ndb).expect("transaction");
35+
let relays = query_nip65_relays(&damus.ndb, &txn, &filter);
36+
debug!("setup_user_relays: query #1 relays: {:#?}", relays);
37+
add_relays(&mut damus.pool, relays);
38+
39+
// Add a relay subscription to the pool
40+
dispatcher::subscribe(damus, &[filter.clone()], 10).expect("subscribe")
41+
});
42+
debug!("setup_user_relays: sub {}", sub.id);
43+
44+
loop {
45+
match sub.receiver.next().await {
46+
Some(ev) => {
47+
debug!("setup_user_relays: saw {:?}", ev);
48+
with_mut_damus(&damusref, |damus| {
49+
let txn = Transaction::new(&damus.ndb).expect("transaction");
50+
let relays = query_nip65_relays(&damus.ndb, &txn, &filter);
51+
debug!("setup_user_relays: query #2 relays: {:#?}", relays);
52+
add_relays(&mut damus.pool, relays);
53+
})
54+
}
55+
None => {
56+
debug!("setup_user_relays: saw None");
57+
break;
58+
}
59+
}
60+
}
61+
62+
debug!("do_setup_user_relays finished");
63+
}
64+
65+
fn _query_note_json(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec<String> {
66+
let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32;
67+
let results = ndb
68+
.query(txn, &[filter.clone()], lim)
69+
.expect("query results");
70+
results
71+
.iter()
72+
.map(|qr| NoteRef::new(qr.note_key, qr.note.created_at()))
73+
.filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok())
74+
.map(|n| n.json().unwrap())
75+
.collect()
76+
}
77+
78+
fn query_nip65_relays(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec<String> {
79+
let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32;
80+
let results = ndb
81+
.query(txn, &[filter.clone()], lim)
82+
.expect("query results");
83+
results
84+
.iter()
85+
.map(|qr| NoteRef::new(qr.note_key, qr.note.created_at()))
86+
.filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok())
87+
.flat_map(|n| {
88+
n.tags()
89+
.iter()
90+
.filter_map(|ti| ti.get_unchecked(1).variant().str())
91+
.map(|s| s.to_string())
92+
})
93+
.collect()
94+
}
95+
96+
fn add_relays(pool: &mut RelayPool, relays: Vec<String>) {
97+
let wakeup = move || {
98+
// FIXME - how do we repaint?
99+
};
100+
for relay in relays {
101+
if let Err(e) = pool.add_url(relay, wakeup.clone()) {
102+
error!("{:?}", e)
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)