diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..ead7ed8 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "rust-analyzer.cargo.features": [ + "server", + "web" + ] +} diff --git a/Cargo.toml b/Cargo.toml index ace363f..4a5ea40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dioxus-query" description = "Fully-typed, async, reusable cached state management for Dioxus 🧬" -version = "0.8.1" +version = "0.9.0" edition = "2021" license = "MIT" authors = ["Marc Espín "] @@ -11,17 +11,26 @@ repository = "https://github.com/marc2332/dioxus-query" keywords = ["dioxus", "async", "state", "synchronization"] categories = ["gui", "asynchronous"] +[features] +server = ["dep:dioxus-fullstack", "dep:dioxus-fullstack-protocol", "serde"] +web = ["dep:dioxus-fullstack-protocol", "serde"] + [dependencies] -dioxus-lib = { version = "0.6", default-features = false, features = ["macro", "hooks", "signals"] } +dioxus = { version = "0.7.0-alpha.3", default-features = false, features = ["macro", "hooks", "signals"] } +dioxus-core = { version = "0.7.0-alpha.3", default-features = false } futures-util = "0.3.28" -warnings = "0.2.1" tokio = { version = "^1", features = ["sync", "time"] } +# Fullstack +dioxus-fullstack = { version = "0.7.0-alpha.3", optional = true } +dioxus-fullstack-protocol = { version = "0.7.0-alpha.3", optional = true } +serde = { version = "1.0.219", optional = true } + [target.'cfg(target_arch = "wasm32")'.dependencies] wasmtimer = "0.4.1" web-time = "1.1.0" tokio = { version = "^1", features = ["sync"] } [dev-dependencies] -dioxus = { version = "0.6", features = ["desktop"] } +dioxus = { version = "0.7.0-alpha.3", features = ["desktop"] } tokio = { version = "^1", features = ["time"] } diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..6beb755 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.88.0" +profile = "default" \ No newline at end of file diff --git a/src/mutation.rs b/src/mutation.rs index 580cb09..8d4e722 100644 --- a/src/mutation.rs +++ b/src/mutation.rs @@ -1,4 +1,11 @@ use core::fmt; +use dioxus::prelude::*; +use dioxus::signals::{Readable, Writable}; +use dioxus::{ + hooks::{use_memo, use_reactive}, + signals::CopyValue, +}; +use dioxus_core::{provide_root_context, spawn_forever, use_drop, ReactiveContext, Task}; use std::{ cell::{Ref, RefCell}, collections::{HashMap, HashSet}, @@ -9,13 +16,6 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; - -use dioxus_lib::prelude::*; -use dioxus_lib::signals::{Readable, Writable}; -use dioxus_lib::{ - hooks::{use_memo, use_reactive}, - signals::CopyValue, -}; #[cfg(not(target_family = "wasm"))] use tokio::time; #[cfg(not(target_family = "wasm"))] diff --git a/src/query.rs b/src/query.rs index 70b44a2..20e119a 100644 --- a/src/query.rs +++ b/src/query.rs @@ -10,14 +10,15 @@ use std::{ time::Duration, }; -use ::warnings::Warning; -use dioxus_lib::prelude::Task; -use dioxus_lib::prelude::*; -use dioxus_lib::signals::{Readable, Writable}; -use dioxus_lib::{ +use dioxus::prelude::*; +use dioxus::signals::{Readable, Writable}; +use dioxus::{ hooks::{use_memo, use_reactive}, signals::CopyValue, }; +use dioxus_core::{ + provide_root_context, spawn_forever, use_drop, ReactiveContext, SuspendedFuture, Task, +}; use futures_util::stream::{FuturesUnordered, StreamExt}; use tokio::sync::Notify; #[cfg(not(target_family = "wasm"))] @@ -160,6 +161,10 @@ struct QuerySuspenseData { task: Task, } +#[cfg(any(feature = "web", feature = "server"))] +type QueryStorageEntry = + Option>>; + pub struct QueryData { state: Rc>>, reactive_contexts: Arc>>, @@ -167,6 +172,9 @@ pub struct QueryData { suspense_task: Rc>>, interval_task: Rc>>, clean_task: Rc>>, + + #[cfg(any(feature = "web", feature = "server"))] + storage_entry: Rc>>, } impl Clone for QueryData { @@ -178,6 +186,9 @@ impl Clone for QueryData { suspense_task: self.suspense_task.clone(), interval_task: self.interval_task.clone(), clean_task: self.clean_task.clone(), + + #[cfg(any(feature = "web", feature = "server"))] + storage_entry: self.storage_entry.clone(), } } } @@ -199,6 +210,8 @@ impl QueriesStorage { suspense_task: Rc::default(), interval_task: Rc::default(), clean_task: Rc::default(), + #[cfg(any(feature = "web", feature = "server"))] + storage_entry: Rc::default(), }); let query_data_clone = query_data.clone(); @@ -243,6 +256,85 @@ impl QueriesStorage { query_data.clone() } + #[cfg(any(feature = "web", feature = "server"))] + fn insert_or_get_server_query(&mut self, query: Query) -> QueryData + where + Q::Ok: serde::Serialize + serde::de::DeserializeOwned, + Q::Err: serde::Serialize + serde::de::DeserializeOwned, + { + let query_clone = query.clone(); + let mut storage = self.storage.write(); + + let query_data = storage.entry(query).or_insert_with(|| { + #[cfg(any(feature = "web", feature = "server"))] + let serialize_context = dioxus_fullstack_protocol::serialize_context(); + #[cfg(any(feature = "web", feature = "server"))] + let storage_entry = serialize_context.create_entry::>(); + + let mut state = QueryStateData::Pending; + + #[cfg(feature = "web")] + if let Ok(res) = storage_entry.get() { + state = QueryStateData::Settled { + res, + settlement_instant: Instant::now(), + }; + } + + QueryData { + state: Rc::new(RefCell::new(state)), + reactive_contexts: Arc::default(), + suspense_task: Rc::default(), + interval_task: Rc::default(), + clean_task: Rc::default(), + #[cfg(any(feature = "web", feature = "server"))] + storage_entry: Rc::new(RefCell::new(Some(storage_entry))), + } + }); + let query_data_clone = query_data.clone(); + + // Cancel clean task + if let Some(clean_task) = query_data.clean_task.take() { + clean_task.cancel(); + } + + // Start an interval task if necessary + // If multiple queries subscribers use different intervals the interval task + // will run using the shortest interval + let interval = query_clone.interval_time; + let interval_enabled = query_clone.interval_time != Duration::MAX; + let interval_task = &mut *query_data.interval_task.borrow_mut(); + + let create_interval_task = match interval_task { + None if interval_enabled => true, + Some((current_interval, current_interval_task)) if interval_enabled => { + let new_interval_is_shorter = *current_interval > interval; + if new_interval_is_shorter { + current_interval_task.cancel(); + *interval_task = None; + } + new_interval_is_shorter + } + _ => false, + }; + if create_interval_task { + let task = spawn_forever(async move { + loop { + // Wait as long as the stale time is configured + time::sleep(interval).await; + + // Run the query + QueriesStorage::::run_server_queries(&[(&query_clone, &query_data_clone)]) + .await; + } + }) + .expect("Failed to spawn interval task."); + *interval_task = Some((interval, task)); + } + + query_data.clone() + } + fn update_tasks(&mut self, query: Query) { let mut storage_clone = self.storage; let mut storage = self.storage.write(); @@ -285,6 +377,8 @@ impl QueriesStorage { suspense_task: Rc::default(), interval_task: Rc::default(), clean_task: Rc::default(), + #[cfg(any(feature = "web", feature = "server"))] + storage_entry: Rc::default(), }) .clone(); @@ -405,6 +499,52 @@ impl QueriesStorage { tasks.count().await; } + + #[cfg(any(feature = "web", feature = "server"))] + async fn run_server_queries(queries: &[(&Query, &QueryData)]) + where + Q::Ok: serde::Serialize + serde::de::DeserializeOwned, + Q::Err: serde::Serialize + serde::de::DeserializeOwned, + { + let tasks = FuturesUnordered::new(); + + for (query, query_data) in queries { + // Set to Loading + let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending) + .into_loading(); + *query_data.state.borrow_mut() = res; + for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() { + reactive_context.mark_dirty(); + } + + tasks.push(Box::pin(async move { + // Run + let res = query.query.run(&query.keys).await; + + // Cache the data if on the server + #[cfg(feature = "server")] + if let Some(storage_entry) = query_data.storage_entry.borrow_mut().take() { + storage_entry.insert(&res, std::panic::Location::caller()); + } + + // Set to settled + *query_data.state.borrow_mut() = QueryStateData::Settled { + res, + settlement_instant: Instant::now(), + }; + for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() { + reactive_context.mark_dirty(); + } + + // Notify the suspense task if any + if let Some(suspense_task) = &*query_data.suspense_task.borrow() { + suspense_task.notifier.notify_waiters(); + }; + })); + } + + tasks.count().await; + } } pub struct GetQuery { @@ -610,9 +750,6 @@ impl UseQuery { Q::Ok: Clone, Q::Err: Clone, { - let _allow_write_in_component_body = - ::warnings::Allow::new(warnings::signal_write_in_component_body::ID); - let storage = consume_context::>(); let mut storage = storage.storage.write_unchecked(); let query_data = storage.get_mut(&self.query.peek()).unwrap(); @@ -753,3 +890,48 @@ pub fn use_query(query: Query) -> UseQuery { UseQuery { query } } + +#[cfg(any(feature = "web", feature = "server"))] +pub fn use_server_query(query: Query) -> UseQuery +where + Q::Ok: serde::Serialize + serde::de::DeserializeOwned, + Q::Err: serde::Serialize + serde::de::DeserializeOwned, +{ + let mut storage = match try_consume_context::>() { + Some(storage) => storage, + None => provide_root_context(QueriesStorage::::new_in_root()), + }; + + let current_query = use_hook(|| Rc::new(RefCell::new(None))); + + let query = use_memo(use_reactive!(|query| { + let query_data = storage.insert_or_get_server_query(query.clone()); + + // Update the query tasks if there has been a change in the query + if let Some(prev_query) = current_query.borrow_mut().take() { + storage.update_tasks(prev_query); + } + + // Store this new query + current_query.borrow_mut().replace(query.clone()); + + // Immediately run the query if enabled and the value is stale + if query.enabled && query_data.state.borrow().is_stale(&query) { + let query = query.clone(); + spawn(async move { + QueriesStorage::run_server_queries(&[(&query, &query_data)]).await; + }); + } + + query + })); + + // Update the query tasks when the scope is dropped + use_drop({ + move || { + storage.update_tasks(query.peek().clone()); + } + }); + + UseQuery { query } +}