Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"rust-analyzer.cargo.features": [
"server",
"web"
]
}
17 changes: 13 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "dioxus-query"
description = "Fully-typed, async, reusable cached state management for Dioxus 🧬"
version = "0.8.1"
version = "0.9.0"
edition = "2021"
license = "MIT"
authors = ["Marc Espín <[email protected]>"]
Expand All @@ -11,17 +11,26 @@ repository = "https://github.com/marc2332/dioxus-query"
keywords = ["dioxus", "async", "state", "synchronization"]
categories = ["gui", "asynchronous"]

[features]
server = ["dep:dioxus-fullstack", "dep:dioxus-fullstack-protocol", "serde"]
web = ["dep:dioxus-fullstack-protocol", "serde"]

[dependencies]
dioxus-lib = { version = "0.6", default-features = false, features = ["macro", "hooks", "signals"] }
dioxus = { version = "0.7.0-alpha.3", default-features = false, features = ["macro", "hooks", "signals"] }
dioxus-core = { version = "0.7.0-alpha.3", default-features = false }
futures-util = "0.3.28"
warnings = "0.2.1"
tokio = { version = "^1", features = ["sync", "time"] }

# Fullstack
dioxus-fullstack = { version = "0.7.0-alpha.3", optional = true }
dioxus-fullstack-protocol = { version = "0.7.0-alpha.3", optional = true }
serde = { version = "1.0.219", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasmtimer = "0.4.1"
web-time = "1.1.0"
tokio = { version = "^1", features = ["sync"] }

[dev-dependencies]
dioxus = { version = "0.6", features = ["desktop"] }
dioxus = { version = "0.7.0-alpha.3", features = ["desktop"] }
tokio = { version = "^1", features = ["time"] }
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "1.88.0"
profile = "default"
14 changes: 7 additions & 7 deletions src/mutation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use core::fmt;
use dioxus::prelude::*;
use dioxus::signals::{Readable, Writable};
use dioxus::{
hooks::{use_memo, use_reactive},
signals::CopyValue,
};
use dioxus_core::{provide_root_context, spawn_forever, use_drop, ReactiveContext, Task};
use std::{
cell::{Ref, RefCell},
collections::{HashMap, HashSet},
Expand All @@ -9,13 +16,6 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};

use dioxus_lib::prelude::*;
use dioxus_lib::signals::{Readable, Writable};
use dioxus_lib::{
hooks::{use_memo, use_reactive},
signals::CopyValue,
};
#[cfg(not(target_family = "wasm"))]
use tokio::time;
#[cfg(not(target_family = "wasm"))]
Expand Down
198 changes: 190 additions & 8 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ use std::{
time::Duration,
};

use ::warnings::Warning;
use dioxus_lib::prelude::Task;
use dioxus_lib::prelude::*;
use dioxus_lib::signals::{Readable, Writable};
use dioxus_lib::{
use dioxus::prelude::*;
use dioxus::signals::{Readable, Writable};
use dioxus::{
hooks::{use_memo, use_reactive},
signals::CopyValue,
};
use dioxus_core::{
provide_root_context, spawn_forever, use_drop, ReactiveContext, SuspendedFuture, Task,
};
use futures_util::stream::{FuturesUnordered, StreamExt};
use tokio::sync::Notify;
#[cfg(not(target_family = "wasm"))]
Expand Down Expand Up @@ -160,13 +161,20 @@ struct QuerySuspenseData {
task: Task,
}

#[cfg(any(feature = "web", feature = "server"))]
type QueryStorageEntry<Ok, Err> =
Option<dioxus_fullstack_protocol::SerializeContextEntry<Result<Ok, Err>>>;

pub struct QueryData<Q: QueryCapability> {
state: Rc<RefCell<QueryStateData<Q>>>,
reactive_contexts: Arc<Mutex<HashSet<ReactiveContext>>>,

suspense_task: Rc<RefCell<Option<QuerySuspenseData>>>,
interval_task: Rc<RefCell<Option<(Duration, Task)>>>,
clean_task: Rc<RefCell<Option<Task>>>,

#[cfg(any(feature = "web", feature = "server"))]
storage_entry: Rc<RefCell<QueryStorageEntry<Q::Ok, Q::Err>>>,
}

impl<Q: QueryCapability> Clone for QueryData<Q> {
Expand All @@ -178,6 +186,9 @@ impl<Q: QueryCapability> Clone for QueryData<Q> {
suspense_task: self.suspense_task.clone(),
interval_task: self.interval_task.clone(),
clean_task: self.clean_task.clone(),

#[cfg(any(feature = "web", feature = "server"))]
storage_entry: self.storage_entry.clone(),
}
}
}
Expand All @@ -199,6 +210,8 @@ impl<Q: QueryCapability> QueriesStorage<Q> {
suspense_task: Rc::default(),
interval_task: Rc::default(),
clean_task: Rc::default(),
#[cfg(any(feature = "web", feature = "server"))]
storage_entry: Rc::default(),
});
let query_data_clone = query_data.clone();

Expand Down Expand Up @@ -243,6 +256,85 @@ impl<Q: QueryCapability> QueriesStorage<Q> {
query_data.clone()
}

#[cfg(any(feature = "web", feature = "server"))]
fn insert_or_get_server_query(&mut self, query: Query<Q>) -> QueryData<Q>
where
Q::Ok: serde::Serialize + serde::de::DeserializeOwned,
Q::Err: serde::Serialize + serde::de::DeserializeOwned,
{
let query_clone = query.clone();
let mut storage = self.storage.write();

let query_data = storage.entry(query).or_insert_with(|| {
#[cfg(any(feature = "web", feature = "server"))]
let serialize_context = dioxus_fullstack_protocol::serialize_context();
#[cfg(any(feature = "web", feature = "server"))]
let storage_entry = serialize_context.create_entry::<Result<Q::Ok, Q::Err>>();

let mut state = QueryStateData::Pending;

#[cfg(feature = "web")]
if let Ok(res) = storage_entry.get() {
state = QueryStateData::Settled {
res,
settlement_instant: Instant::now(),
};
}

QueryData {
state: Rc::new(RefCell::new(state)),
reactive_contexts: Arc::default(),
suspense_task: Rc::default(),
interval_task: Rc::default(),
clean_task: Rc::default(),
#[cfg(any(feature = "web", feature = "server"))]
storage_entry: Rc::new(RefCell::new(Some(storage_entry))),
}
});
let query_data_clone = query_data.clone();

// Cancel clean task
if let Some(clean_task) = query_data.clean_task.take() {
clean_task.cancel();
}

// Start an interval task if necessary
// If multiple queries subscribers use different intervals the interval task
// will run using the shortest interval
let interval = query_clone.interval_time;
let interval_enabled = query_clone.interval_time != Duration::MAX;
let interval_task = &mut *query_data.interval_task.borrow_mut();

let create_interval_task = match interval_task {
None if interval_enabled => true,
Some((current_interval, current_interval_task)) if interval_enabled => {
let new_interval_is_shorter = *current_interval > interval;
if new_interval_is_shorter {
current_interval_task.cancel();
*interval_task = None;
}
new_interval_is_shorter
}
_ => false,
};
if create_interval_task {
let task = spawn_forever(async move {
loop {
// Wait as long as the stale time is configured
time::sleep(interval).await;

// Run the query
QueriesStorage::<Q>::run_server_queries(&[(&query_clone, &query_data_clone)])
.await;
}
})
.expect("Failed to spawn interval task.");
*interval_task = Some((interval, task));
}

query_data.clone()
}

fn update_tasks(&mut self, query: Query<Q>) {
let mut storage_clone = self.storage;
let mut storage = self.storage.write();
Expand Down Expand Up @@ -285,6 +377,8 @@ impl<Q: QueryCapability> QueriesStorage<Q> {
suspense_task: Rc::default(),
interval_task: Rc::default(),
clean_task: Rc::default(),
#[cfg(any(feature = "web", feature = "server"))]
storage_entry: Rc::default(),
})
.clone();

Expand Down Expand Up @@ -405,6 +499,52 @@ impl<Q: QueryCapability> QueriesStorage<Q> {

tasks.count().await;
}

#[cfg(any(feature = "web", feature = "server"))]
async fn run_server_queries(queries: &[(&Query<Q>, &QueryData<Q>)])
where
Q::Ok: serde::Serialize + serde::de::DeserializeOwned,
Q::Err: serde::Serialize + serde::de::DeserializeOwned,
{
let tasks = FuturesUnordered::new();

for (query, query_data) in queries {
// Set to Loading
let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
.into_loading();
*query_data.state.borrow_mut() = res;
for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
reactive_context.mark_dirty();
}

tasks.push(Box::pin(async move {
// Run
let res = query.query.run(&query.keys).await;

// Cache the data if on the server
#[cfg(feature = "server")]
if let Some(storage_entry) = query_data.storage_entry.borrow_mut().take() {
Copy link

Copilot AI Jul 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The take() operation removes the storage entry permanently, which means subsequent query runs won't be able to cache data. Consider using as_ref() or cloning the entry instead of taking ownership.

Suggested change
if let Some(storage_entry) = query_data.storage_entry.borrow_mut().take() {
if let Some(storage_entry) = query_data.storage_entry.borrow().as_ref() {

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, its fine to take it as this will only run once, plus I need the owned version in order to call the insert thingy

storage_entry.insert(&res, std::panic::Location::caller());
}

// Set to settled
*query_data.state.borrow_mut() = QueryStateData::Settled {
res,
settlement_instant: Instant::now(),
};
for reactive_context in query_data.reactive_contexts.lock().unwrap().iter() {
reactive_context.mark_dirty();
}

// Notify the suspense task if any
if let Some(suspense_task) = &*query_data.suspense_task.borrow() {
suspense_task.notifier.notify_waiters();
};
}));
}

tasks.count().await;
}
}

pub struct GetQuery<Q: QueryCapability> {
Expand Down Expand Up @@ -610,9 +750,6 @@ impl<Q: QueryCapability> UseQuery<Q> {
Q::Ok: Clone,
Q::Err: Clone,
{
let _allow_write_in_component_body =
::warnings::Allow::new(warnings::signal_write_in_component_body::ID);

let storage = consume_context::<QueriesStorage<Q>>();
let mut storage = storage.storage.write_unchecked();
let query_data = storage.get_mut(&self.query.peek()).unwrap();
Expand Down Expand Up @@ -753,3 +890,48 @@ pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {

UseQuery { query }
}

#[cfg(any(feature = "web", feature = "server"))]
pub fn use_server_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q>
where
Q::Ok: serde::Serialize + serde::de::DeserializeOwned,
Q::Err: serde::Serialize + serde::de::DeserializeOwned,
{
let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
Some(storage) => storage,
None => provide_root_context(QueriesStorage::<Q>::new_in_root()),
};

let current_query = use_hook(|| Rc::new(RefCell::new(None)));

let query = use_memo(use_reactive!(|query| {
let query_data = storage.insert_or_get_server_query(query.clone());

// Update the query tasks if there has been a change in the query
if let Some(prev_query) = current_query.borrow_mut().take() {
storage.update_tasks(prev_query);
}

// Store this new query
current_query.borrow_mut().replace(query.clone());

// Immediately run the query if enabled and the value is stale
if query.enabled && query_data.state.borrow().is_stale(&query) {
let query = query.clone();
spawn(async move {
QueriesStorage::run_server_queries(&[(&query, &query_data)]).await;
});
}

query
}));

// Update the query tasks when the scope is dropped
use_drop({
move || {
storage.update_tasks(query.peek().clone());
}
});

UseQuery { query }
}
Loading