Skip to content

Commit a6e2834

Browse files
Add support for namespace to the remote connection builder
1 parent 1e6af39 commit a6e2834

File tree

6 files changed

+95
-11
lines changed

6 files changed

+95
-11
lines changed

libsql-server/src/http/user/db_factory.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ pub fn namespace_from_headers(
5050

5151
if let Some(from_metadata) = headers.get(NAMESPACE_METADATA_KEY) {
5252
try_namespace_from_metadata(from_metadata)
53+
} else if let Some(from_ns_header) = headers.get("x-namespace") {
54+
try_namespace_from_header(from_ns_header)
5355
} else if let Some(from_host) = headers.get("host") {
5456
try_namespace_from_host(from_host, disable_default_namespace)
5557
} else if !disable_default_namespace {
@@ -59,6 +61,11 @@ pub fn namespace_from_headers(
5961
}
6062
}
6163

64+
fn try_namespace_from_header(header: &axum::http::HeaderValue) -> Result<NamespaceName, Error> {
65+
NamespaceName::from_bytes(header.as_bytes().to_vec().into())
66+
.map_err(|_| Error::InvalidNamespace)
67+
}
68+
6269
fn try_namespace_from_host(
6370
from_host: &axum::http::HeaderValue,
6471
disable_default_namespace: bool,

libsql-server/tests/embedded_replica/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1696,3 +1696,43 @@ fn schema_db() {
16961696

16971697
sim.run().unwrap();
16981698
}
1699+
1700+
#[test]
1701+
fn remote_namespace_header_support() {
1702+
let tmp_host = tempdir().unwrap();
1703+
let tmp_host_path = tmp_host.path().to_owned();
1704+
1705+
let mut sim = Builder::new()
1706+
.simulation_duration(Duration::from_secs(1000))
1707+
.build();
1708+
1709+
make_primary(&mut sim, tmp_host_path.clone());
1710+
1711+
sim.client("client", async move {
1712+
let client = Client::new();
1713+
1714+
client
1715+
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
1716+
.await?;
1717+
1718+
let db_url = "http://primary:8080";
1719+
1720+
let remote = libsql::Builder::new_remote(db_url.to_string(), String::new())
1721+
.namespace("foo")
1722+
.connector(TurmoilConnector)
1723+
.build()
1724+
.await
1725+
.unwrap();
1726+
1727+
let conn = remote.connect().unwrap();
1728+
1729+
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
1730+
.await?;
1731+
1732+
conn.execute("INSERT into user(id) values (1);", ()).await?;
1733+
1734+
Ok(())
1735+
});
1736+
1737+
sim.run().unwrap();
1738+
}

libsql/src/database.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ enum DbType {
9595
auth_token: String,
9696
connector: crate::util::ConnectorService,
9797
version: Option<String>,
98+
namespace: Option<String>,
9899
},
99100
}
100101

@@ -238,6 +239,7 @@ cfg_replication! {
238239
OpenFlags::default(),
239240
encryption_config.clone(),
240241
None,
242+
None,
241243
).await?;
242244

243245
Ok(Database {
@@ -522,6 +524,7 @@ cfg_remote! {
522524
auth_token: auth_token.into(),
523525
connector: crate::util::ConnectorService::new(svc),
524526
version,
527+
namespace: None,
525528
},
526529
max_write_replication_index: Default::default(),
527530
})
@@ -672,7 +675,7 @@ impl Database {
672675
remote: HttpConnection::new(
673676
url.clone(),
674677
auth_token.clone(),
675-
HttpSender::new(connector.clone(), None),
678+
HttpSender::new(connector.clone(), None, None),
676679
),
677680
read_your_writes: *read_your_writes,
678681
context: db.sync_ctx.clone().unwrap(),
@@ -693,13 +696,15 @@ impl Database {
693696
auth_token,
694697
connector,
695698
version,
699+
namespace,
696700
} => {
697701
let conn = std::sync::Arc::new(
698702
crate::hrana::connection::HttpConnection::new_with_connector(
699703
url,
700704
auth_token,
701705
connector.clone(),
702706
version.as_ref().map(|s| s.as_str()),
707+
namespace.as_ref().map(|s| s.as_str()),
703708
),
704709
);
705710

libsql/src/database/builder.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ impl Builder<()> {
6060
auth_token,
6161
connector: None,
6262
version: None,
63+
namespace: None,
6364
},
6465
encryption_config: None,
6566
read_your_writes: true,
6667
sync_interval: None,
6768
http_request_callback: None,
68-
namespace: None,
6969
skip_safety_assert: false,
7070
},
7171
}
@@ -101,6 +101,7 @@ impl Builder<()> {
101101
auth_token,
102102
connector: None,
103103
version: None,
104+
namespace: None,
104105
},
105106
connector: None,
106107
read_your_writes: true,
@@ -119,6 +120,7 @@ impl Builder<()> {
119120
auth_token,
120121
connector: None,
121122
version: None,
123+
namespace: None,
122124
},
123125
}
124126
}
@@ -132,6 +134,7 @@ cfg_replication_or_remote_or_sync! {
132134
auth_token: String,
133135
connector: Option<crate::util::ConnectorService>,
134136
version: Option<String>,
137+
namespace: Option<String>,
135138
}
136139
}
137140

@@ -220,7 +223,6 @@ cfg_replication! {
220223
read_your_writes: bool,
221224
sync_interval: Option<std::time::Duration>,
222225
http_request_callback: Option<crate::util::HttpRequestCallback>,
223-
namespace: Option<String>,
224226
skip_safety_assert: bool,
225227
}
226228

@@ -286,7 +288,7 @@ cfg_replication! {
286288
/// Set the namespace that will be communicated to remote replica in the http header.
287289
pub fn namespace(mut self, namespace: impl Into<String>) -> Builder<RemoteReplica>
288290
{
289-
self.inner.namespace = Some(namespace.into());
291+
self.inner.remote.namespace = Some(namespace.into());
290292
self
291293
}
292294

@@ -320,12 +322,12 @@ cfg_replication! {
320322
auth_token,
321323
connector,
322324
version,
325+
namespace,
323326
},
324327
encryption_config,
325328
read_your_writes,
326329
sync_interval,
327330
http_request_callback,
328-
namespace,
329331
skip_safety_assert
330332
} = self.inner;
331333

@@ -420,6 +422,7 @@ cfg_replication! {
420422
auth_token,
421423
connector,
422424
version,
425+
namespace,
423426
}) = remote
424427
{
425428
let connector = if let Some(connector) = connector {
@@ -444,6 +447,7 @@ cfg_replication! {
444447
flags,
445448
encryption_config.clone(),
446449
http_request_callback,
450+
namespace,
447451
)
448452
.await?
449453
} else {
@@ -509,6 +513,7 @@ cfg_sync! {
509513
auth_token,
510514
connector: _,
511515
version: _,
516+
namespace: _,
512517
},
513518
connector,
514519
remote_writes,
@@ -574,13 +579,21 @@ cfg_remote! {
574579
self
575580
}
576581

582+
/// Set the namespace that will be communicated to the remote in the http header.
583+
pub fn namespace(mut self, namespace: impl Into<String>) -> Builder<Remote>
584+
{
585+
self.inner.namespace = Some(namespace.into());
586+
self
587+
}
588+
577589
/// Build the remote database client.
578590
pub async fn build(self) -> Result<Database> {
579591
let Remote {
580592
url,
581593
auth_token,
582594
connector,
583595
version,
596+
namespace,
584597
} = self.inner;
585598

586599
let connector = if let Some(connector) = connector {
@@ -602,6 +615,7 @@ cfg_remote! {
602615
auth_token,
603616
connector,
604617
version,
618+
namespace,
605619
},
606620
max_write_replication_index: Default::default(),
607621
})

libsql/src/hrana/hyper.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,27 @@ pub type ByteStream = Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Syn
2525
pub struct HttpSender {
2626
inner: hyper::Client<ConnectorService, hyper::Body>,
2727
version: HeaderValue,
28+
namespace: Option<HeaderValue>,
2829
}
2930

3031
impl HttpSender {
31-
pub fn new(connector: ConnectorService, version: Option<&str>) -> Self {
32+
pub fn new(
33+
connector: ConnectorService,
34+
version: Option<&str>,
35+
namespace: Option<&str>,
36+
) -> Self {
3237
let ver = version.unwrap_or(env!("CARGO_PKG_VERSION"));
3338

3439
let version = HeaderValue::try_from(format!("libsql-remote-{ver}")).unwrap();
40+
let namespace = namespace.map(|v| HeaderValue::try_from(v).unwrap());
3541

3642
let inner = hyper::Client::builder().build(connector);
3743

38-
Self { inner, version }
44+
Self {
45+
inner,
46+
version,
47+
namespace,
48+
}
3949
}
4050

4151
async fn send(
@@ -44,9 +54,15 @@ impl HttpSender {
4454
auth: Arc<str>,
4555
body: String,
4656
) -> Result<super::HttpBody<ByteStream>> {
47-
let req = hyper::Request::post(url.as_ref())
57+
let mut req_builder = hyper::Request::post(url.as_ref())
4858
.header(AUTHORIZATION, auth.as_ref())
49-
.header("x-libsql-client-version", self.version.clone())
59+
.header("x-libsql-client-version", self.version.clone());
60+
61+
if let Some(namespace) = self.namespace {
62+
req_builder = req_builder.header("x-namespace", namespace);
63+
}
64+
65+
let req = req_builder
5066
.body(hyper::Body::from(body))
5167
.map_err(|err| HranaError::Http(format!("{:?}", err)))?;
5268

@@ -108,8 +124,9 @@ impl HttpConnection<HttpSender> {
108124
token: impl Into<String>,
109125
connector: ConnectorService,
110126
version: Option<&str>,
127+
namespace: Option<&str>,
111128
) -> Self {
112-
let inner = HttpSender::new(connector, version);
129+
let inner = HttpSender::new(connector, version, namespace);
113130
Self::new(url.into(), token.into(), inner)
114131
}
115132
}

libsql/src/local/database.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ impl Database {
266266
flags: OpenFlags,
267267
encryption_config: Option<EncryptionConfig>,
268268
http_request_callback: Option<crate::util::HttpRequestCallback>,
269+
namespace: Option<String>,
269270
) -> Result<Database> {
270271
use std::path::PathBuf;
271272

@@ -284,7 +285,7 @@ impl Database {
284285
auth_token,
285286
version.as_deref(),
286287
http_request_callback,
287-
None,
288+
namespace,
288289
)
289290
.map_err(|e| crate::Error::Replication(e.into()))?;
290291

0 commit comments

Comments
 (0)