diff --git a/Cargo.lock b/Cargo.lock index 8edacc812..de8c0733e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "ab_glyph" @@ -1510,9 +1510,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1525,9 +1525,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1535,15 +1535,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1552,15 +1552,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1569,21 +1569,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -2615,6 +2615,7 @@ dependencies = [ "ehttp 0.2.0", "enostr", "env_logger 0.10.2", + "futures", "hex", "image", "indexmap", diff --git a/Cargo.toml b/Cargo.toml index e0d60cb6f..d095f801f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ dirs = "5.0.1" tracing-appender = "0.2.3" urlencoding = "2.1.3" open = "5.3.0" +futures = "0.3.31" [dev-dependencies] tempfile = "3.13.0" diff --git a/enostr/src/relay/pool.rs b/enostr/src/relay/pool.rs index de4e1efc1..4a87ef39c 100644 --- a/enostr/src/relay/pool.rs +++ b/enostr/src/relay/pool.rs @@ -2,6 +2,9 @@ use crate::relay::{Relay, RelayStatus}; use crate::{ClientMessage, Result}; use nostrdb::Filter; +use std::collections::BTreeSet; +use std::collections::HashMap; +use std::collections::HashSet; use std::time::{Duration, Instant}; use url::Url; @@ -10,7 +13,7 @@ use url::Url; use ewebsock::{WsEvent, WsMessage}; #[cfg(not(target_arch = "wasm32"))] -use tracing::{debug, error}; +use tracing::{debug, error, info}; #[derive(Debug)] pub struct PoolEvent<'a> { @@ -42,7 +45,16 @@ impl PoolRelay { pub struct RelayPool { pub relays: Vec, + pub subs: HashMap>, pub ping_rate: Duration, + /// Used when there are no others + pub bootstrapping_relays: BTreeSet, + /// Locally specified relays + pub local_relays: BTreeSet, + /// NIP-65 specified relays + pub advertised_relays: BTreeSet, + /// If non-empty force the relay pool to use exactly this set + pub forced_relays: BTreeSet, } impl Default for RelayPool { @@ -56,7 +68,12 @@ impl RelayPool { pub fn new() -> RelayPool { RelayPool { relays: vec![], + subs: HashMap::new(), ping_rate: Duration::from_secs(25), + bootstrapping_relays: BTreeSet::new(), + local_relays: BTreeSet::new(), + advertised_relays: BTreeSet::new(), + forced_relays: BTreeSet::new(), } } @@ -85,9 +102,11 @@ impl RelayPool { for relay in &mut self.relays { relay.relay.send(&ClientMessage::close(subid.clone())); } + self.subs.remove(&subid); } pub fn subscribe(&mut self, subid: String, filter: Vec) { + self.subs.insert(subid.clone(), filter.clone()); for relay in &mut self.relays { relay.relay.subscribe(subid.clone(), filter.clone()); } @@ -148,30 +167,112 @@ impl RelayPool { } } + pub fn configure_relays( + &mut self, + wakeup: impl Fn() + Send + Sync + Clone + 'static, + ) -> Result<()> { + let urls = if !self.forced_relays.is_empty() { + debug!("using forced relays"); + self.forced_relays.iter().cloned().collect::>() + } else { + let mut combined_relays = self + .local_relays + .union(&self.advertised_relays) + .cloned() + .collect::>(); + + // If the combined set is empty, use `bootstrapping_relays`. + if combined_relays.is_empty() { + debug!("using bootstrapping relays"); + combined_relays = self.bootstrapping_relays.clone(); + } else { + debug!("using local+advertised relays"); + } + + // Collect the resulting set into a vector. + combined_relays.into_iter().collect::>() + }; + + self.set_relays(&urls, wakeup) + } + // Adds a websocket url to the RelayPool. - pub fn add_url( + fn add_url( &mut self, url: String, wakeup: impl Fn() + Send + Sync + Clone + 'static, ) -> Result<()> { - let url = Self::canonicalize_url(url); + let url = Self::canonicalize_url(&url); // Check if the URL already exists in the pool. if self.has(&url) { return Ok(()); } let relay = Relay::new(url, wakeup)?; - let pool_relay = PoolRelay::new(relay); + let mut pool_relay = PoolRelay::new(relay); + + // Add all of the existing subscriptions to the new relay + for (subid, filters) in &self.subs { + pool_relay.relay.subscribe(subid.clone(), filters.clone()); + } self.relays.push(pool_relay); Ok(()) } - // standardize the format (ie, trailing slashes) - fn canonicalize_url(url: String) -> String { + // Add and remove relays to match the provided list + pub fn set_relays( + &mut self, + urls: &Vec, + wakeup: impl Fn() + Send + Sync + Clone + 'static, + ) -> Result<()> { + // Canonicalize the new URLs. + let new_urls = urls + .iter() + .map(|u| Self::canonicalize_url(u)) + .collect::>(); + + // Get the old URLs from the existing relays. + let old_urls = self + .relays + .iter() + .map(|pr| pr.relay.url.clone()) + .collect::>(); + + debug!("old relays: {:?}", old_urls); + debug!("new relays: {:?}", new_urls); + + if new_urls.len() == 0 { + info!("bootstrapping, not clearing the relay list ..."); + return Ok(()); + } + + // Remove the relays that are in old_urls but not in new_urls. + let to_remove: HashSet<_> = old_urls.difference(&new_urls).cloned().collect(); + for url in &to_remove { + debug!("removing relay {}", url); + } + self.relays.retain(|pr| !to_remove.contains(&pr.relay.url)); + + // FIXME - how do we close connections the removed relays? + + // Add the relays that are in new_urls but not in old_urls. + let to_add: HashSet<_> = new_urls.difference(&old_urls).cloned().collect(); + for url in to_add { + debug!("adding relay {}", url); + if let Err(e) = self.add_url(url.clone(), wakeup.clone()) { + error!("Failed to add relay with URL {}: {:?}", url, e); + } + } + + Ok(()) + } + + // standardize the format (ie, trailing slashes) to avoid dups + pub fn canonicalize_url(url: &String) -> String { match Url::parse(&url) { Ok(parsed_url) => parsed_url.to_string(), - Err(_) => url, // If parsing fails, return the original URL. + Err(_) => url.clone(), // If parsing fails, return the original URL. } } diff --git a/src/app.rs b/src/app.rs index 0f475170c..2d7d26ec4 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,6 +18,7 @@ use crate::{ storage::{Directory, FileKeyStorage, KeyStorageType}, subscriptions::{SubKind, Subscriptions}, support::Support, + task, thread::Thread, timeline::{Timeline, TimelineId, TimelineKind, ViewFilter}, ui::{self, DesktopSidePanel}, @@ -34,6 +35,7 @@ use egui_extras::{Size, StripBuilder}; use nostrdb::{Config, Filter, Ndb, Note, Transaction}; +use std::collections::BTreeSet; use std::collections::HashMap; use std::path::Path; use std::time::Duration; @@ -73,31 +75,6 @@ pub struct Damus { pub textmode: bool, } -fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { - let ctx = ctx.clone(); - let wakeup = move || { - ctx.request_repaint(); - }; - if let Err(e) = pool.add_url("ws://localhost:8080".to_string(), wakeup.clone()) { - error!("{:?}", e) - } - if let Err(e) = pool.add_url("wss://relay.damus.io".to_string(), wakeup.clone()) { - error!("{:?}", e) - } - //if let Err(e) = pool.add_url("wss://pyramid.fiatjaf.com".to_string(), wakeup.clone()) { - //error!("{:?}", e) - //} - if let Err(e) = pool.add_url("wss://nos.lol".to_string(), wakeup.clone()) { - error!("{:?}", e) - } - if let Err(e) = pool.add_url("wss://nostr.wine".to_string(), wakeup.clone()) { - error!("{:?}", e) - } - if let Err(e) = pool.add_url("wss://purplepag.es".to_string(), wakeup) { - error!("{:?}", e) - } -} - fn send_initial_timeline_filter( ndb: &Ndb, can_since_optimize: bool, @@ -475,6 +452,8 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { .insert("unknownids".to_string(), SubKind::OneShot); setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns) .expect("home subscription failed"); + + task::spawn_track_user_relays(damus); } DamusState::NewTimelineSub(new_timeline_id) => { @@ -703,23 +682,29 @@ impl Damus { } // setup relays if we have them - let pool = if parsed_args.relays.is_empty() { - let mut pool = RelayPool::new(); - relay_setup(&mut pool, &cc.egui_ctx); - pool - } else { - let ctx = cc.egui_ctx.clone(); - let wakeup = move || { - ctx.request_repaint(); - }; - let mut pool = RelayPool::new(); - for relay in parsed_args.relays { - if let Err(e) = pool.add_url(relay.clone(), wakeup.clone()) { - error!("error adding relay {}: {}", relay, e); - } - } - pool - }; + let mut pool = RelayPool::new(); + let bootstrapping_urls = [ + "ws://localhost:8080", + "wss://relay.damus.io", + //"wss://pyramid.fiatjaf.com", + "wss://nos.lol", + "wss://nostr.wine", + "wss://purplepag.es", + ]; + pool.bootstrapping_relays = bootstrapping_urls + .iter() + .map(|&s| s.to_string()) + .map(|s| RelayPool::canonicalize_url(&s)) + .collect(); + // normally empty + pool.forced_relays = parsed_args + .relays + .into_iter() + .map(|s| RelayPool::canonicalize_url(&s)) + .collect::>(); + // avoid relay thrash, don't call configure_relays here + // because the initial advertised set will be registered + // shortly and it will be called then let account = accounts .get_selected_account() diff --git a/src/lib.rs b/src/lib.rs index 4ed39bfd3..31faee147 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ mod result; mod route; mod subscriptions; mod support; +mod task; mod test_data; mod thread; mod time; diff --git a/src/relay_pool_manager.rs b/src/relay_pool_manager.rs index df339e46e..4dddd5e7c 100644 --- a/src/relay_pool_manager.rs +++ b/src/relay_pool_manager.rs @@ -41,9 +41,14 @@ impl<'a> RelayPoolManager<'a> { indices.iter().for_each(|index| self.remove_relay(*index)); } + // This wants to add the relay to one of the pool collections + // (bootstrapping, local, advertised, or forced) and then call + // configure_relays ... + /* pub fn add_relay(&mut self, ctx: &egui::Context, relay_url: String) { let _ = self.pool.add_url(relay_url, create_wakeup(ctx)); } + */ } pub fn create_wakeup(ctx: &egui::Context) -> impl Fn() + Send + Sync + Clone + 'static { diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 000000000..74a0ef5ce --- /dev/null +++ b/src/task.rs @@ -0,0 +1,149 @@ +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::Poll; +use tokio::task; + +use tracing::{debug, error}; + +use enostr::RelayPool; +use nostrdb::{Filter, Ndb, NoteKey, Transaction}; +use uuid::Uuid; + +use crate::note::NoteRef; +use crate::Damus; + +pub fn spawn_track_user_relays(damus: &mut Damus) { + // This is only safe because we are absolutely single threaded ... + let damus_ptr = &mut *damus as *mut Damus; + spawn_sendable(async move { + let damus = unsafe { &mut *damus_ptr }; + track_user_relays(damus).await; + }); +} + +pub async fn track_user_relays(damus: &mut Damus) { + debug!("track_user_relays starting"); + + let filter = user_relay_filter(damus); + + // Do we have a user relay list stored in nostrdb? Start with that ... + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let nks = damus + .ndb + .query(&txn, &[filter.clone()], lim) + .expect("query user relays results") + .iter() + .map(|qr| qr.note_key) + .collect(); + let relays = handle_nip65_relays(&damus.ndb, &txn, &nks); + debug!("track_user_relays: initial: {:#?}", relays); + set_advertised_relays(&mut damus.pool, relays); + drop(txn); + + // Subscribe to user relay list updates + let ndbsub = damus + .ndb + .subscribe(&[filter.clone()]) + .expect("ndb subscription"); + let poolid = Uuid::new_v4().to_string(); + damus.pool.subscribe(poolid.clone(), vec![filter.clone()]); + + // Wait for updates to the subscription + loop { + match damus.ndb.wait_for_notes(ndbsub, 10).await { + Ok(nks) => { + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = handle_nip65_relays(&damus.ndb, &txn, &nks); + debug!("track_user_relays: update: {:#?}", relays); + set_advertised_relays(&mut damus.pool, relays); + } + Err(err) => error!("err: {:?}", err), + } + } +} + +fn user_relay_filter(damus: &mut Damus) -> Filter { + let account = damus + .accounts + .get_selected_account() + .as_ref() + .map(|a| a.pubkey.bytes()) + .expect("selected account"); + + // NIP-65 + Filter::new() + .authors([account]) + .kinds([10002]) + .limit(1) + .build() +} + +// useful for debugging +fn _query_note_json(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .map(|n| n.json().unwrap()) + .collect() +} + +fn handle_nip65_relays(ndb: &Ndb, txn: &Transaction, nks: &Vec) -> Vec { + nks.iter() + .filter_map(|nk| ndb.get_note_by_key(txn, *nk).ok()) + .flat_map(|n| { + n.tags() + .iter() + .filter_map(|ti| ti.get_unchecked(1).variant().str()) + .map(|s| s.to_string()) + }) + .collect() +} + +fn set_advertised_relays(pool: &mut RelayPool, relays: Vec) { + let wakeup = move || { + // FIXME - how do we repaint? + }; + pool.advertised_relays = relays + .into_iter() + .map(|s| RelayPool::canonicalize_url(&s)) + .collect(); + if let Err(e) = pool.configure_relays(wakeup) { + error!("{:?}", e) + } +} + +// Generic task spawning helpers + +struct SendableFuture { + future: Pin>, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for SendableFuture {} + +impl Future for SendableFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + self.get_mut().future.as_mut().poll(cx) + } +} + +pub fn spawn_sendable(future: F) +where + F: Future + 'static, +{ + let future = Box::pin(future); + let sendable_future = SendableFuture { + future, + _marker: PhantomData, + }; + task::spawn(sendable_future); +} diff --git a/src/test_data.rs b/src/test_data.rs index b271dad30..a879723c6 100644 --- a/src/test_data.rs +++ b/src/test_data.rs @@ -9,17 +9,19 @@ use crate::{user_account::UserAccount, Damus}; pub fn sample_pool() -> RelayPool { let mut pool = RelayPool::new(); let wakeup = move || {}; - - pool.add_url("wss://relay.damus.io".to_string(), wakeup); - pool.add_url("wss://eden.nostr.land".to_string(), wakeup); - pool.add_url("wss://nostr.wine".to_string(), wakeup); - pool.add_url("wss://nos.lol".to_string(), wakeup); - pool.add_url("wss://test_relay_url_long_00000000000000000000000000000000000000000000000000000000000000000000000000000000000".to_string(), wakeup); + let bootstrapping_urls = [ + "wss://relay.damus.io", + "wss://eden.nostr.land", + "wss://nostr.wine", + "wss://nos.lol", + "wss://test_relay_url_long_00000000000000000000000000000000000000000000000000000000000000000000000000000000000", + ]; + pool.bootstrapping_relays = bootstrapping_urls.iter().map(|&s| s.to_string()).collect(); for _ in 0..20 { - pool.add_url("tmp".to_string(), wakeup); + pool.local_relays.insert("tmp".to_string()); } - + pool.configure_relays(wakeup); pool }