diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index 4caf0d2676..da5f39df60 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -76,6 +76,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } http-body = "0.4" url = { version = "2.3", features = ["serde"] } uuid = { version = "1.3", features = ["v4", "serde"] } +moka = { version = "0.12.1", features = ["future"] } [dev-dependencies] arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] } diff --git a/libsql-server/src/error.rs b/libsql-server/src/error.rs index 4aced4ba63..a7421447fb 100644 --- a/libsql-server/src/error.rs +++ b/libsql-server/src/error.rs @@ -92,6 +92,9 @@ pub enum Error { UrlParseError(#[from] url::ParseError), #[error("Namespace store has shutdown")] NamespaceStoreShutdown, + // This is for errors returned by moka + #[error(transparent)] + Ref(#[from] std::sync::Arc), } trait ResponseError: std::error::Error { @@ -105,6 +108,12 @@ trait ResponseError: std::error::Error { impl ResponseError for Error {} impl IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + (&self).into_response() + } +} + +impl IntoResponse for &Error { fn into_response(self) -> axum::response::Response { use Error::*; @@ -150,6 +159,7 @@ impl IntoResponse for Error { PrimaryStreamInterupted => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), UrlParseError(_) => self.format_err(StatusCode::BAD_REQUEST), NamespaceStoreShutdown => self.format_err(StatusCode::SERVICE_UNAVAILABLE), + Ref(this) => this.as_ref().into_response(), } } } @@ -224,7 +234,7 @@ pub enum LoadDumpError { impl ResponseError for LoadDumpError {} -impl IntoResponse for LoadDumpError { +impl IntoResponse for &LoadDumpError { fn into_response(self) -> axum::response::Response { use LoadDumpError::*; @@ -244,7 +254,7 @@ impl IntoResponse for LoadDumpError { impl ResponseError for ForkError {} -impl IntoResponse for ForkError { +impl IntoResponse for &ForkError { fn into_response(self) -> axum::response::Response { match self { ForkError::Internal(_) diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 25699cd360..bb4501d9b5 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -1,21 +1,21 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::fmt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use anyhow::Context as _; -use async_lock::{RwLock, RwLockUpgradableReadGuard}; +use async_lock::RwLock; use bottomless::bottomless_wal::CreateBottomlessWal; use bottomless::replicator::Options; use bytes::Bytes; use chrono::NaiveDateTime; use enclose::enclose; -use futures_core::Stream; +use futures::TryFutureExt; +use futures_core::{Future, Stream}; use hyper::Uri; use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient; use libsql_sys::wal::{Sqlite3WalManager, WalManager}; +use moka::future::Cache; use parking_lot::Mutex; use rusqlite::ErrorCode; use tokio::io::AsyncBufReadExt; @@ -24,7 +24,6 @@ use tokio::task::JoinSet; use tokio::time::{Duration, Instant}; use tokio_util::io::StreamReader; use tonic::transport::Channel; -use tracing::trace; use uuid::Uuid; use crate::auth::Authenticated; @@ -336,6 +335,8 @@ impl MakeNamespace for ReplicaNamespaceMaker { } } +type NamespaceEntry = Arc>>>; + /// Stores and manage a set of namespaces. pub struct NamespaceStore { inner: Arc>, @@ -350,7 +351,7 @@ impl Clone for NamespaceStore { } struct NamespaceStoreInner { - store: RwLock>>, + store: Cache>, /// The namespace factory, to create new namespaces. make_namespace: M, allow_lazy_creation: bool, @@ -360,9 +361,24 @@ struct NamespaceStoreInner { impl NamespaceStore { pub fn new(make_namespace: M, allow_lazy_creation: bool, snapshot_at_shutdown: bool) -> Self { + let store = Cache::>::builder() + .async_eviction_listener(|name, ns, _| { + Box::pin(async move { + tracing::info!("namespace `{name}` deallocated"); + // shutdown namespace + if let Some(ns) = ns.write().await.take() { + if let Err(e) = ns.destroy().await { + tracing::error!("error deallocating `{name}`: {e}") + } + } + }) + }) + // TODO(marin): configurable capacity + .max_capacity(25) + .build(); Self { inner: Arc::new(NamespaceStoreInner { - store: Default::default(), + store, make_namespace, allow_lazy_creation, has_shutdown: AtomicBool::new(false), @@ -375,18 +391,19 @@ impl NamespaceStore { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); } - let mut lock = self.inner.store.write().await; let mut bottomless_db_id_init = NamespaceBottomlessDbIdInit::FetchFromConfig; - if let Some(ns) = lock.remove(&namespace) { - bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( - NamespaceBottomlessDbId::from_config(&ns.db_config_store.get()), - ); + if let Some(ns) = self.inner.store.remove(&namespace).await { // FIXME: when destroying, we are waiting for all the tasks associated with the - // allocation to finnish, which create a lot of contention on the lock. Need to use a + // allocation to finish, which create a lot of contention on the lock. Need to use a // conccurent hashmap to deal with this issue. // deallocate in-memory resources - ns.destroy().await?; + if let Some(ns) = ns.write().await.take() { + bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( + NamespaceBottomlessDbId::from_config(&ns.db_config_store.get()), + ); + ns.destroy().await?; + } } // destroy on-disk database and backups @@ -400,24 +417,29 @@ impl NamespaceStore { Ok(()) } - async fn reset( + pub async fn reset( &self, namespace: NamespaceName, restore_option: RestoreOption, - ) -> crate::Result<()> { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let mut lock = self.inner.store.write().await; - if let Some(ns) = lock.remove(&namespace) { + ) -> anyhow::Result<()> { + // The process for reseting is as follow: + // - get a lock on the namespace entry, if the entry exists, then it's a lock on the entry, + // if it doesn't exist, insert an empty entry and take a lock on it + // - destroy the old namespace + // - create a new namespace and insert it in the held lock + let entry = self + .inner + .store + .get_with(namespace.clone(), async { Default::default() }) + .await; + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { // FIXME: when destroying, we are waiting for all the tasks associated with the // allocation to finnish, which create a lot of contention on the lock. Need to use a // conccurent hashmap to deal with this issue. - // deallocate in-memory resources ns.destroy().await?; } - // destroy on-disk database self.inner .make_namespace @@ -438,7 +460,8 @@ impl NamespaceStore { self.make_reset_cb(), ) .await?; - lock.insert(namespace, ns); + + lock.replace(ns); Ok(()) } @@ -475,18 +498,22 @@ impl NamespaceStore { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); } - let mut lock = self.inner.store.write().await; - if lock.contains_key(&to) { - return Err(crate::error::Error::NamespaceAlreadyExist( - to.as_str().to_string(), - )); + + let to_entry = self + .inner + .store + .get_with(to.clone(), async { Default::default() }) + .await; + let mut to_lock = to_entry.write().await; + if to_lock.is_some() { + return Err(crate::error::Error::NamespaceAlreadyExist(to.to_string())); } // check that the source namespace exists - let from_ns = match lock.entry(from.clone()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // we just want to load the namespace into memory, so we refuse creation. + let from_entry = self + .inner + .store + .try_get_with(from.clone(), async { let ns = self .inner .make_namespace @@ -498,16 +525,25 @@ impl NamespaceStore { self.make_reset_cb(), ) .await?; - e.insert(ns) - } + tracing::info!("loaded namespace: `{to}`"); + Ok::<_, crate::error::Error>(Arc::new(RwLock::new(Some(ns)))) + }) + .await + // FIXME: find how to deal with Arc + .unwrap(); + + let from_lock = from_entry.read().await; + let Some(from_ns) = &*from_lock else { + return Err(crate::error::Error::NamespaceDoesntExist(to.to_string())); }; - let forked = self + let to_ns = self .inner .make_namespace .fork(from_ns, to.clone(), timestamp) .await?; - lock.insert(to.clone(), forked); + + to_lock.replace(to_ns); Ok(()) } @@ -519,7 +555,7 @@ impl NamespaceStore { f: Fun, ) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(&Namespace) -> R + 'static, { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); @@ -533,80 +569,143 @@ impl NamespaceStore { pub async fn with(&self, namespace: NamespaceName, f: Fun) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(&Namespace) -> R + 'static, { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let before_load = Instant::now(); - let lock = self.inner.store.upgradable_read().await; - if let Some(ns) = lock.get(&namespace) { - Ok(f(ns)) - } else { - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - let ns = self - .inner - .make_namespace - .create( - namespace.clone(), - RestoreOption::Latest, - NamespaceBottomlessDbId::NotProvided, - self.inner.allow_lazy_creation, - self.make_reset_cb(), - ) - .await?; - let ret = f(&ns); - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + let init = { + let namespace = namespace.clone(); + async move { + let ns = self + .inner + .make_namespace + .create( + namespace.clone(), + RestoreOption::Latest, + NamespaceBottomlessDbId::NotProvided, + self.inner.allow_lazy_creation, + self.make_reset_cb(), + ) + .await?; + tracing::info!("loaded namespace: `{namespace}`"); - NAMESPACE_LOAD_LATENCY.record(before_load.elapsed()); + Ok(Some(ns)) + } + }; - Ok(ret) - } + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| async move { + let lock = ns.read().await; + match &*lock { + Some(ns) => Ok(f(ns)), + // the namespace was taken out of the entry + None => Err(Error::NamespaceDoesntExist(name.to_string())), + } + } + }; + + self.with_lock_or_init(namespace, f, init).await? } - pub async fn create( + async fn with_lock_or_init( &self, namespace: NamespaceName, - restore_option: RestoreOption, - bottomless_db_id: NamespaceBottomlessDbId, - ) -> crate::Result<()> { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let lock = self.inner.store.upgradable_read().await; - if lock.contains_key(&namespace) { - return Err(crate::error::Error::NamespaceAlreadyExist( - namespace.as_str().to_owned(), - )); - } - + f: Fun, + init: Init, + ) -> crate::Result + where + Fun: FnOnce(NamespaceEntry) -> Fut, + Fut: Future, + Init: Future>>>, + { + let before_load = Instant::now(); let ns = self .inner - .make_namespace - .create( + .store + .try_get_with( namespace.clone(), - restore_option, - bottomless_db_id, - true, - self.make_reset_cb(), + init.map_ok(|ns| Arc::new(RwLock::new(ns))), ) .await?; + NAMESPACE_LOAD_LATENCY.record(before_load.elapsed()); + Ok(f(ns).await) + } - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + pub async fn create( + &self, + namespace: NamespaceName, + restore_option: RestoreOption, + bottomless_db_id: NamespaceBottomlessDbId, + ) -> crate::Result<()> { + let name = namespace.clone(); + let bottomless_db_id_for_init = bottomless_db_id.clone(); + let init = async { + let ns = self + .inner + .make_namespace + .create( + name.clone(), + RestoreOption::Latest, + bottomless_db_id_for_init, + false, + self.make_reset_cb(), + ) + .await; + match ns { + // the namespace already exist, load it, and let the `f` function fail + Ok(ns) => { + tracing::info!("loaded namespace: `{name}`"); + Ok(Some(ns)) + } + // return an empty slot to put the new namespace in + Err(Error::NamespaceDoesntExist(_)) => Ok(None), + Err(e) => Err(e), + } + }; - Ok(()) + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| { + let ns = ns.clone(); + let name = name.clone(); + async move { + let mut lock = ns.write().await; + if lock.is_some() { + return Err(Error::NamespaceAlreadyExist(name.to_string())); + } + let ns = self + .inner + .make_namespace + .create( + name.clone(), + restore_option, + bottomless_db_id, + true, + self.make_reset_cb(), + ) + .await?; + + tracing::info!("loaded namespace: `{name}`"); + + lock.replace(ns); + + Ok(()) + } + } + }; + + self.with_lock_or_init(namespace, f, init).await? } pub async fn shutdown(self) -> crate::Result<()> { self.inner.has_shutdown.store(true, Ordering::Relaxed); - let mut lock = self.inner.store.write().await; - for (name, ns) in lock.drain() { - ns.shutdown(self.inner.snapshot_at_shutdown).await?; - trace!("shutdown namespace: `{}`", name); + for (_name, entry) in self.inner.store.iter() { + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { + ns.shutdown(self.inner.snapshot_at_shutdown).await?; + } } + self.inner.store.invalidate_all(); + self.inner.store.run_pending_tasks().await; Ok(()) }