Skip to content

libsql: attach databases from other namespaces as readonly #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 14, 2024
1 change: 1 addition & 0 deletions libsql-replication/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ message DatabaseConfig {
optional string bottomless_db_id = 6;
optional string jwt_key = 7;
optional uint64 txn_timeout_s = 8;
bool allow_attach = 9;
}
2 changes: 2 additions & 0 deletions libsql-replication/src/generated/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ pub struct DatabaseConfig {
pub jwt_key: ::core::option::Option<::prost::alloc::string::String>,
#[prost(uint64, optional, tag = "8")]
pub txn_timeout_s: ::core::option::Option<u64>,
#[prost(bool, tag = "9")]
pub allow_attach: bool,
}
4 changes: 4 additions & 0 deletions libsql-server/src/connection/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct DatabaseConfig {
pub bottomless_db_id: Option<String>,
pub jwt_key: Option<String>,
pub txn_timeout: Option<Duration>,
pub allow_attach: bool,
}

const fn default_max_size() -> u64 {
Expand All @@ -35,6 +36,7 @@ impl Default for DatabaseConfig {
bottomless_db_id: None,
jwt_key: None,
txn_timeout: Some(TXN_TIMEOUT),
allow_attach: false,
}
}
}
Expand All @@ -50,6 +52,7 @@ impl From<&metadata::DatabaseConfig> for DatabaseConfig {
bottomless_db_id: value.bottomless_db_id.clone(),
jwt_key: value.jwt_key.clone(),
txn_timeout: value.txn_timeout_s.map(Duration::from_secs),
allow_attach: value.allow_attach,
}
}
}
Expand All @@ -65,6 +68,7 @@ impl From<&DatabaseConfig> for metadata::DatabaseConfig {
bottomless_db_id: value.bottomless_db_id.clone(),
jwt_key: value.jwt_key.clone(),
txn_timeout_s: value.txn_timeout.map(|d| d.as_secs()),
allow_attach: value.allow_attach,
}
}
}
44 changes: 43 additions & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,31 @@ impl<W: Wal> Connection<W> {
Ok(enabled)
}

fn prepare_attach_query(&self, attached: &str, attached_alias: &str) -> Result<String> {
let attached = attached.strip_prefix('"').unwrap_or(attached);
let attached = attached.strip_suffix('"').unwrap_or(attached);
if attached.contains('/') {
return Err(Error::Internal(format!(
"Invalid attached database name: {:?}",
attached
)));
}
let path = PathBuf::from(self.conn.path().unwrap_or("."));
let dbs_path = path
.parent()
.unwrap_or_else(|| std::path::Path::new(".."))
.parent()
.unwrap_or_else(|| std::path::Path::new(".."))
.canonicalize()
.unwrap_or_else(|_| std::path::PathBuf::from(".."));
let query = format!(
"ATTACH DATABASE 'file:{}?mode=ro' AS \"{attached_alias}\"",
dbs_path.join(attached).join("data").display()
);
tracing::trace!("ATTACH rewritten to: {query}");
Ok(query)
}

fn execute_query(
&self,
query: &Query,
Expand All @@ -764,12 +789,29 @@ impl<W: Wal> Connection<W> {
StmtKind::Read | StmtKind::TxnBegin | StmtKind::Other => config.block_reads,
StmtKind::Write => config.block_reads || config.block_writes,
StmtKind::TxnEnd | StmtKind::Release | StmtKind::Savepoint => false,
StmtKind::Attach | StmtKind::Detach => !config.allow_attach,
};
if blocked {
return Err(Error::Blocked(config.block_reason.clone()));
}

let mut stmt = self.conn.prepare(&query.stmt.stmt)?;
let mut stmt = if matches!(query.stmt.kind, StmtKind::Attach) {
match &query.stmt.attach_info {
Some((attached, attached_alias)) => {
let query = self.prepare_attach_query(attached, attached_alias)?;
self.conn.prepare(&query)?
}
None => {
return Err(Error::Internal(format!(
"Failed to ATTACH: {:?}",
query.stmt.attach_info
)))
}
}
} else {
self.conn.prepare(&query.stmt.stmt)?
};

if stmt.readonly() {
READ_QUERY_COUNT.increment(1);
} else {
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ async fn handle_get_config<M: MakeNamespace, C: Connector>(
max_db_size: Some(max_db_size),
heartbeat_url: config.heartbeat_url.clone().map(|u| u.into()),
jwt_key: config.jwt_key.clone(),
allow_attach: config.allow_attach,
};

Ok(Json(resp))
Expand Down Expand Up @@ -236,6 +237,8 @@ struct HttpDatabaseConfig {
heartbeat_url: Option<String>,
#[serde(default)]
jwt_key: Option<String>,
#[serde(default)]
allow_attach: bool,
}

async fn handle_post_config<M: MakeNamespace, C>(
Expand All @@ -255,6 +258,7 @@ async fn handle_post_config<M: MakeNamespace, C>(
config.block_reads = req.block_reads;
config.block_writes = req.block_writes;
config.block_reason = req.block_reason;
config.allow_attach = req.allow_attach;
if let Some(size) = req.max_db_size {
config.max_db_pages = size.as_u64() / LIBSQL_PAGE_SIZE;
}
Expand Down
19 changes: 18 additions & 1 deletion libsql-server/src/query_analysis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use fallible_iterator::FallibleIterator;
use sqlite3_parser::ast::{Cmd, PragmaBody, QualifiedName, Stmt};
use sqlite3_parser::ast::{Cmd, Expr, Id, PragmaBody, QualifiedName, Stmt};
use sqlite3_parser::lexer::sql::{Parser, ParserError};

/// A group of statements to be executed together.
Expand All @@ -11,6 +11,8 @@ pub struct Statement {
/// Is the statement an INSERT, UPDATE or DELETE?
pub is_iud: bool,
pub is_insert: bool,
// Optional id and alias associated with the statement (used for attach/detach)
pub attach_info: Option<(String, String)>,
}

impl Default for Statement {
Expand All @@ -30,6 +32,8 @@ pub enum StmtKind {
Write,
Savepoint,
Release,
Attach,
Detach,
Other,
}

Expand Down Expand Up @@ -113,6 +117,8 @@ impl StmtKind {
savepoint_name: Some(_),
..
}) => Some(Self::Release),
Cmd::Stmt(Stmt::Attach { .. }) => Some(Self::Attach),
Cmd::Stmt(Stmt::Detach(_)) => Some(Self::Detach),
_ => None,
}
}
Expand Down Expand Up @@ -236,6 +242,7 @@ impl Statement {
kind: StmtKind::Read,
is_iud: false,
is_insert: false,
attach_info: None,
}
}

Expand All @@ -257,6 +264,7 @@ impl Statement {
kind,
is_iud: false,
is_insert: false,
attach_info: None,
});
}
}
Expand All @@ -267,11 +275,20 @@ impl Statement {
);
let is_insert = matches!(c, Cmd::Stmt(Stmt::Insert { .. }));

let attach_info = match &c {
Cmd::Stmt(Stmt::Attach {
expr: Expr::Id(Id(expr)),
db_name: Expr::Id(Id(name)),
..
}) => Some((expr.clone(), name.clone())),
_ => None,
};
Ok(Statement {
stmt: c.to_string(),
kind,
is_iud,
is_insert,
attach_info,
})
}
// The parser needs to be boxed because it's large, and you don't want it on the stack.
Expand Down
74 changes: 74 additions & 0 deletions libsql-server/tests/namespaces/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,77 @@ fn meta_store() {

sim.run().unwrap();
}

#[test]
fn meta_attach() {
let mut sim = Builder::new().build();
let tmp = tempdir().unwrap();
make_primary(&mut sim, tmp.path().to_path_buf());

sim.client("client", async {
let client = Client::new();

// STEP 1: create namespace and check that it can be read from
client
.post(
"http://primary:9090/v1/namespaces/foo/create",
json!({
"max_db_size": "5mb"
}),
)
.await?;

{
let foo = Database::open_remote_with_connector(
"http://foo.primary:8080",
"",
TurmoilConnector,
)?;
let foo_conn = foo.connect()?;

foo_conn.execute("select 1", ()).await.unwrap();
}

// STEP 2: try attaching a database
{
let foo = Database::open_remote_with_connector(
"http://foo.primary:8080",
"",
TurmoilConnector,
)?;
let foo_conn = foo.connect()?;

foo_conn.execute("attach foo as foo", ()).await.unwrap_err();
}

// STEP 3: update config to allow attaching databases
client
.post(
"http://primary:9090/v1/namespaces/foo/config",
json!({
"block_reads": false,
"block_writes": false,
"allow_attach": true,
}),
)
.await?;

{
let foo = Database::open_remote_with_connector(
"http://foo.primary:8080",
"",
TurmoilConnector,
)?;
let foo_conn = foo.connect()?;

foo_conn
.execute_batch("attach foo as foo; select * from foo.sqlite_master")
.await
.unwrap();
}

Ok(())
});

sim.run().unwrap();
}
10 changes: 9 additions & 1 deletion libsql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{Error, Result};
use fallible_iterator::FallibleIterator;
use sqlite3_parser::ast::{Cmd, PragmaBody, QualifiedName, Stmt, TransactionType};
use sqlite3_parser::ast::{Cmd, PragmaBody, QualifiedName, Stmt, TransactionType, Expr, Id};
use sqlite3_parser::lexer::sql::{Parser, ParserError};

/// A group of statements to be executed together.
Expand Down Expand Up @@ -30,6 +30,8 @@ pub enum StmtKind {
Write,
Savepoint,
Release,
Attach,
Detach,
Other,
}

Expand Down Expand Up @@ -116,6 +118,12 @@ impl StmtKind {
savepoint_name: Some(_),
..
}) => Some(Self::Release),
Cmd::Stmt(Stmt::Attach {
expr: Expr::Id(Id(expr)),
db_name: Expr::Id(Id(name)),
..
}) if expr == name => Some(Self::Attach),
Cmd::Stmt(Stmt::Detach(_)) => Some(Self::Detach),
_ => None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/replication/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl State {
(State::Txn, StmtKind::Release) => State::Txn,
(_, StmtKind::Release) => State::Invalid,

(state, StmtKind::Other | StmtKind::Write | StmtKind::Read) => state,
(state, StmtKind::Other | StmtKind::Write | StmtKind::Read | StmtKind::Attach | StmtKind::Detach) => state,
(State::Invalid, _) => State::Invalid,

(State::Init, StmtKind::TxnBegin) => State::Txn,
Expand Down