Skip to content

Commit 8845d45

Browse files
authored
Merge pull request #744 from tursodatabase/marinsport
libsql-server: use LRU cache to store active namespaces
2 parents 804efc1 + 59ee522 commit 8845d45

File tree

9 files changed

+260
-154
lines changed

9 files changed

+260
-154
lines changed

bottomless/src/replicator.rs

+27
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,33 @@ impl Replicator {
376376
})
377377
}
378378

379+
/// Checks if there exists any backup of given database
380+
pub async fn has_backup_of(db_name: impl AsRef<str>, options: &Options) -> Result<bool> {
381+
let prefix = match &options.db_id {
382+
Some(db_id) => format!("{db_id}-"),
383+
None => format!("ns-:{}-", db_name.as_ref()),
384+
};
385+
let config = options.client_config().await?;
386+
let client = Client::from_conf(config);
387+
let bucket = options.bucket_name.clone();
388+
389+
match client.head_bucket().bucket(&bucket).send().await {
390+
Ok(_) => tracing::trace!("Bucket {bucket} exists and is accessible"),
391+
Err(e) => {
392+
tracing::trace!("Bucket checking error: {e}");
393+
return Err(e.into());
394+
}
395+
}
396+
397+
let mut last_frame = 0;
398+
let list_objects = client.list_objects().bucket(&bucket).prefix(&prefix);
399+
let response = list_objects.send().await?;
400+
let _ = Self::try_get_last_frame_no(response, &mut last_frame);
401+
tracing::trace!("Last frame of {prefix}: {last_frame}");
402+
403+
Ok(last_frame > 0)
404+
}
405+
379406
pub async fn shutdown_gracefully(&mut self) -> Result<()> {
380407
tracing::info!("bottomless replicator: shutting down...");
381408
// 1. wait for all committed WAL frames to be committed locally

libsql-server/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ metrics = "0.21.1"
4141
metrics-util = "0.15"
4242
metrics-exporter-prometheus = "0.12.2"
4343
mimalloc = { version = "0.1.36", default-features = false }
44+
moka = { version = "0.12.1", features = ["future"] }
4445
nix = { version = "0.26.2", features = ["fs"] }
4546
once_cell = "1.17.0"
4647
parking_lot = "0.12.1"

libsql-server/src/error.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ pub enum Error {
9696
NamespaceStoreShutdown,
9797
#[error("Unable to update metastore: {0}")]
9898
MetaStoreUpdateFailure(Box<dyn std::error::Error + Send + Sync>),
99+
// This is for errors returned by moka
100+
#[error(transparent)]
101+
Ref(#[from] std::sync::Arc<Self>),
99102
}
100103

101104
trait ResponseError: std::error::Error {
@@ -109,6 +112,12 @@ trait ResponseError: std::error::Error {
109112
impl ResponseError for Error {}
110113

111114
impl IntoResponse for Error {
115+
fn into_response(self) -> axum::response::Response {
116+
(&self).into_response()
117+
}
118+
}
119+
120+
impl IntoResponse for &Error {
112121
fn into_response(self) -> axum::response::Response {
113122
use Error::*;
114123

@@ -156,6 +165,7 @@ impl IntoResponse for Error {
156165
UrlParseError(_) => self.format_err(StatusCode::BAD_REQUEST),
157166
NamespaceStoreShutdown => self.format_err(StatusCode::SERVICE_UNAVAILABLE),
158167
MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
168+
Ref(this) => this.as_ref().into_response(),
159169
}
160170
}
161171
}
@@ -230,7 +240,7 @@ pub enum LoadDumpError {
230240

231241
impl ResponseError for LoadDumpError {}
232242

233-
impl IntoResponse for LoadDumpError {
243+
impl IntoResponse for &LoadDumpError {
234244
fn into_response(self) -> axum::response::Response {
235245
use LoadDumpError::*;
236246

@@ -250,7 +260,7 @@ impl IntoResponse for LoadDumpError {
250260

251261
impl ResponseError for ForkError {}
252262

253-
impl IntoResponse for ForkError {
263+
impl IntoResponse for &ForkError {
254264
fn into_response(self) -> axum::response::Response {
255265
match self {
256266
ForkError::Internal(_)

libsql-server/src/lib.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpCo
100100
pub heartbeat_config: Option<HeartbeatConfig>,
101101
pub disable_namespaces: bool,
102102
pub shutdown: Arc<Notify>,
103+
pub max_active_namespaces: usize,
103104
}
104105

105106
impl<C, A, D> Default for Server<C, A, D> {
@@ -117,6 +118,7 @@ impl<C, A, D> Default for Server<C, A, D> {
117118
heartbeat_config: Default::default(),
118119
disable_namespaces: true,
119120
shutdown: Default::default(),
121+
max_active_namespaces: 100,
120122
}
121123
}
122124
}
@@ -384,6 +386,7 @@ where
384386
db_config: self.db_config.clone(),
385387
base_path: self.path.clone(),
386388
auth: auth.clone(),
389+
max_active_namespaces: self.max_active_namespaces,
387390
};
388391
let (namespaces, proxy_service, replication_service) = replica.configure().await?;
389392
self.rpc_client_config = None;
@@ -422,6 +425,7 @@ where
422425
extensions,
423426
base_path: self.path.clone(),
424427
disable_namespaces: self.disable_namespaces,
428+
max_active_namespaces: self.max_active_namespaces,
425429
join_set: &mut join_set,
426430
auth: auth.clone(),
427431
};
@@ -487,6 +491,7 @@ struct Primary<'a, A> {
487491
extensions: Arc<[PathBuf]>,
488492
base_path: Arc<Path>,
489493
disable_namespaces: bool,
494+
max_active_namespaces: usize,
490495
auth: Arc<Auth>,
491496
join_set: &'a mut JoinSet<anyhow::Result<()>>,
492497
}
@@ -520,12 +525,12 @@ where
520525
let meta_store_path = conf.base_path.join("metastore");
521526

522527
let factory = PrimaryNamespaceMaker::new(conf);
523-
524528
let namespaces = NamespaceStore::new(
525529
factory,
526530
false,
527531
self.db_config.snapshot_at_shutdown,
528532
meta_store_path,
533+
self.max_active_namespaces,
529534
)
530535
.await?;
531536

@@ -602,6 +607,7 @@ struct Replica<C> {
602607
db_config: DbConfig,
603608
base_path: Arc<Path>,
604609
auth: Arc<Auth>,
610+
max_active_namespaces: usize,
605611
}
606612

607613
impl<C: Connector> Replica<C> {
@@ -627,7 +633,14 @@ impl<C: Connector> Replica<C> {
627633
let meta_store_path = conf.base_path.join("metastore");
628634

629635
let factory = ReplicaNamespaceMaker::new(conf);
630-
let namespaces = NamespaceStore::new(factory, true, false, meta_store_path).await?;
636+
let namespaces = NamespaceStore::new(
637+
factory,
638+
true,
639+
false,
640+
meta_store_path,
641+
self.max_active_namespaces,
642+
)
643+
.await?;
631644
let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone());
632645
let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone());
633646

libsql-server/src/main.rs

+5
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ struct Cli {
195195
/// Enable snapshot at shutdown
196196
#[clap(long)]
197197
snapshot_at_shutdown: bool,
198+
199+
/// Max active namespaces kept in-memory
200+
#[clap(long, env = "SQLD_MAX_ACTIVE_NAMESPACES", default_value = "100")]
201+
max_active_namespaces: usize,
198202
}
199203

200204
#[derive(clap::Subcommand, Debug)]
@@ -506,6 +510,7 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
506510
disable_default_namespace: config.disable_default_namespace,
507511
disable_namespaces: !config.enable_namespaces,
508512
shutdown,
513+
max_active_namespaces: config.max_active_namespaces,
509514
})
510515
}
511516

libsql-server/src/namespace/fork.rs

-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ impl ForkTask<'_> {
109109
self.dest_namespace.clone(),
110110
RestoreOption::Latest,
111111
self.bottomless_db_id,
112-
true,
113112
// Forking works only on primary and
114113
// PrimaryNamespaceMaker::create ignores
115114
// reset_cb param

libsql-server/src/namespace/meta_store.rs

+7
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ impl MetaStore {
177177
inner: HandleState::External(change_tx, rx),
178178
}
179179
}
180+
181+
// TODO: we need to either make sure that the metastore is restored
182+
// before we start accepting connections or we need to contact bottomless
183+
// here to check if a namespace exists. Preferably the former.
184+
pub fn exists(&self, namespace: &NamespaceName) -> bool {
185+
self.inner.lock().configs.contains_key(namespace)
186+
}
180187
}
181188

182189
impl MetaStoreHandle {

0 commit comments

Comments
 (0)