Skip to content

Commit e44fee4

Browse files
committed
Add and use EndpointAddr for URI structs
Signed-off-by: methylDragon <methylDragon@intrinsic.ai>
1 parent e9a47c1 commit e44fee4

File tree

22 files changed

+539
-254
lines changed

22 files changed

+539
-254
lines changed

crates/store/re_data_source/src/data_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl LogDataSource {
252252
let uri_clone = uri.clone();
253253
let stream_partition = async move {
254254
let client = connection_registry
255-
.client(uri_clone.origin.clone())
255+
.client(uri_clone.endpoint_addr.origin.clone())
256256
.await
257257
.map_err(|err| ApiError::connection(err, "failed to connect to server"))?;
258258
re_redap_client::stream_blueprint_and_partition_from_server(

crates/store/re_grpc_client/src/read.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ async fn stream_async(
3939
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
4040
) -> Result<(), StreamError> {
4141
let mut client = {
42-
let url = uri.origin.as_url();
42+
let url = uri.endpoint_addr.origin.as_url();
4343

4444
#[cfg(target_arch = "wasm32")]
4545
let tonic_client = {

crates/store/re_grpc_client/src/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ async fn message_proxy_client(
314314
compression: Compression,
315315
status: Arc<AtomicCell<ClientConnectionState>>,
316316
) {
317-
let endpoint = match Endpoint::from_shared(uri.origin.as_url()) {
317+
let endpoint = match Endpoint::from_shared(uri.endpoint_addr.origin.as_url()) {
318318
Ok(endpoint) => endpoint,
319319
Err(err) => {
320320
status.store(ClientConnectionState::Disconnected(Err(

crates/store/re_grpc_server/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,9 @@ pub fn spawn_with_recv(
360360
re_smart_channel::Receiver<re_log_types::DataSourceMessage>,
361361
crossbeam::channel::Receiver<re_log_types::TableMsg>,
362362
) {
363-
let uri = re_uri::ProxyUri::new(
363+
let uri = re_uri::ProxyUri::new(re_uri::EndpointAddr::new(
364364
re_uri::Origin::from_scheme_and_socket_addr(re_uri::Scheme::RerunHttp, addr),
365-
String::new(),
366-
);
365+
));
367366
let (channel_log_tx, channel_log_rx) = re_smart_channel::smart_channel(
368367
re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
369368
re_smart_channel::SmartChannelSource::MessageProxy(uri),

crates/store/re_redap_client/src/grpc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,7 @@ pub async fn stream_blueprint_and_partition_from_server(
350350
}
351351

352352
let re_uri::DatasetPartitionUri {
353-
origin: _,
354-
prefix: _,
353+
endpoint_addr: _,
355354
dataset_id,
356355
partition_id,
357356
time_range,

crates/top/re_sdk/src/grpc_server.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@ impl GrpcServerSink {
3434

3535
let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
3636

37-
let uri = re_uri::ProxyUri::new(
37+
let uri = re_uri::ProxyUri::new(re_uri::EndpointAddr::new(
3838
re_uri::Origin::from_scheme_and_socket_addr(
3939
re_uri::Scheme::RerunHttp,
4040
grpc_server_addr,
4141
),
42-
String::new(),
43-
);
42+
));
4443
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
4544
re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
4645
re_smart_channel::SmartChannelSource::Sdk,

crates/top/re_sdk/src/web_viewer.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,12 @@ impl WebViewerSink {
4949
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();
5050

5151
let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
52-
let uri = re_uri::ProxyUri::new(
52+
let uri = re_uri::ProxyUri::new(re_uri::EndpointAddr::new(
5353
re_uri::Origin::from_scheme_and_socket_addr(
5454
re_uri::Scheme::RerunHttp,
5555
grpc_server_addr,
5656
),
57-
String::new(),
58-
);
57+
));
5958
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
6059
re_smart_channel::SmartMessageSource::MessageProxy(uri),
6160
re_smart_channel::SmartChannelSource::Sdk,
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use crate::Origin;
2+
3+
/// A Rerun endpoint address, consisting of an origin and an optional path prefix.
4+
///
5+
/// Example: `https://rerun.io:443/custom/prefix`
6+
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
7+
pub struct EndpointAddr {
8+
pub origin: Origin,
9+
10+
/// An optional path prefix, e.g. `/my/prefix`.
11+
///
12+
/// The prefix is guaranteed to start with a slash if it is not empty,
13+
/// and guaranteed not to end with a slash.
14+
#[serde(default, skip_serializing_if = "Option::is_none")]
15+
pub path_prefix: Option<String>,
16+
}
17+
18+
impl EndpointAddr {
19+
/// Create a new [`EndpointAddr`] with the given origin and no path prefix.
20+
pub fn new(origin: Origin) -> Self {
21+
Self {
22+
origin,
23+
path_prefix: None,
24+
}
25+
}
26+
27+
/// Add a path prefix to the endpoint address.
28+
///
29+
/// The prefix is normalized:
30+
/// - It will be ensured to start with a `/` if not empty.
31+
/// - Trailing slashes will be removed.
32+
pub fn with_path_prefix(mut self, path_prefix: impl Into<String>) -> Self {
33+
let path_prefix = path_prefix.into();
34+
if path_prefix.is_empty() || path_prefix == "/" {
35+
self.path_prefix = None;
36+
return self;
37+
}
38+
39+
let mut path_prefix = path_prefix.trim_end_matches('/').to_owned();
40+
if !path_prefix.starts_with('/') {
41+
path_prefix.insert(0, '/');
42+
}
43+
44+
self.path_prefix = Some(path_prefix);
45+
self
46+
}
47+
}
48+
49+
impl std::fmt::Display for EndpointAddr {
50+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51+
let Self {
52+
origin,
53+
path_prefix,
54+
} = self;
55+
56+
write!(f, "{origin}")?;
57+
if let Some(prefix) = path_prefix {
58+
write!(f, "{prefix}")?;
59+
}
60+
Ok(())
61+
}
62+
}
63+
64+
#[cfg(test)]
65+
mod tests {
66+
use super::*;
67+
use crate::Scheme;
68+
69+
fn test_origin() -> Origin {
70+
Origin::from_scheme_and_socket_addr(Scheme::Rerun, "127.0.0.1:1234".parse().unwrap())
71+
}
72+
73+
#[test]
74+
fn test_serialization() {
75+
let origin = test_origin();
76+
let addr = EndpointAddr::new(origin.clone());
77+
assert_eq!(addr.path_prefix, None);
78+
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234");
79+
80+
let addr = EndpointAddr::new(origin.clone()).with_path_prefix("");
81+
assert_eq!(addr.path_prefix, None);
82+
83+
let addr = EndpointAddr::new(origin.clone()).with_path_prefix("/");
84+
assert_eq!(addr.path_prefix, None);
85+
86+
let addr = EndpointAddr::new(origin.clone()).with_path_prefix("foo");
87+
assert_eq!(addr.path_prefix.as_deref(), Some("/foo"));
88+
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo");
89+
90+
let addr = EndpointAddr::new(origin.clone()).with_path_prefix("/foo");
91+
assert_eq!(addr.path_prefix.as_deref(), Some("/foo"));
92+
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo");
93+
94+
let addr = EndpointAddr::new(origin.clone()).with_path_prefix("foo/");
95+
assert_eq!(addr.path_prefix.as_deref(), Some("/foo"));
96+
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo");
97+
98+
let addr = EndpointAddr::new(origin.clone()).with_path_prefix("/foo/bar");
99+
assert_eq!(addr.path_prefix.as_deref(), Some("/foo/bar"));
100+
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo/bar");
101+
}
102+
}

crates/utils/re_uri/src/endpoints/catalog.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
1-
use crate::{Origin, RedapUri};
1+
use crate::{EndpointAddr, RedapUri};
22

33
/// `scheme://hostname:port/catalog`
44
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
55
pub struct CatalogUri {
6-
pub origin: Origin,
7-
pub prefix: String,
6+
pub endpoint_addr: EndpointAddr,
87
}
98

109
impl std::fmt::Display for CatalogUri {
1110
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12-
write!(f, "{}{}/catalog", self.origin, self.prefix)
11+
let Self { endpoint_addr } = self;
12+
write!(f, "{endpoint_addr}/catalog")
1313
}
1414
}
1515

1616
impl CatalogUri {
17-
pub fn new(origin: Origin, prefix: String) -> Self {
18-
Self { origin, prefix }
17+
pub fn new(endpoint_addr: EndpointAddr) -> Self {
18+
Self { endpoint_addr }
1919
}
2020
}
2121

crates/utils/re_uri/src/endpoints/dataset.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use re_log_types::StoreId;
22

3-
use crate::{Error, Fragment, Origin, RedapUri, TimeSelection};
3+
use crate::{EndpointAddr, Error, Fragment, RedapUri, TimeSelection};
44

55
/// URI pointing at the data underlying a dataset.
66
///
@@ -11,8 +11,7 @@ use crate::{Error, Fragment, Origin, RedapUri, TimeSelection};
1111
/// In the future we will add richer queries.
1212
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1313
pub struct DatasetPartitionUri {
14-
pub origin: Origin,
15-
pub prefix: String,
14+
pub endpoint_addr: EndpointAddr,
1615
pub dataset_id: re_tuid::Tuid,
1716

1817
// Query parameters: these affect what data is returned.
@@ -26,22 +25,26 @@ pub struct DatasetPartitionUri {
2625

2726
impl std::fmt::Display for DatasetPartitionUri {
2827
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29-
write!(
30-
f,
31-
"{}{}/dataset/{}",
32-
self.origin, self.prefix, self.dataset_id
33-
)?;
28+
let Self {
29+
endpoint_addr,
30+
dataset_id,
31+
partition_id,
32+
time_range,
33+
fragment,
34+
} = self;
35+
36+
write!(f, "{endpoint_addr}/dataset/{dataset_id}")?;
3437

3538
// ?query:
3639
{
37-
write!(f, "?partition_id={}", self.partition_id)?;
40+
write!(f, "?partition_id={partition_id}")?;
3841
}
39-
if let Some(time_range) = &self.time_range {
42+
if let Some(time_range) = time_range {
4043
write!(f, "&time_range={time_range}")?;
4144
}
4245

4346
// #fragment:
44-
let fragment = self.fragment.to_string();
47+
let fragment = fragment.to_string();
4548
if !fragment.is_empty() {
4649
write!(f, "#{fragment}")?;
4750
}
@@ -52,8 +55,7 @@ impl std::fmt::Display for DatasetPartitionUri {
5255

5356
impl DatasetPartitionUri {
5457
pub fn new(
55-
origin: Origin,
56-
prefix: String,
58+
endpoint_addr: EndpointAddr,
5759
dataset_id: re_tuid::Tuid,
5860
url: &url::Url,
5961
) -> Result<Self, Error> {
@@ -87,8 +89,7 @@ impl DatasetPartitionUri {
8789
}
8890

8991
Ok(Self {
90-
origin,
91-
prefix,
92+
endpoint_addr,
9293
dataset_id,
9394
partition_id,
9495
time_range,
@@ -99,10 +100,9 @@ impl DatasetPartitionUri {
99100
/// Returns [`Self`] without any (optional) `?query` or `#fragment`.
100101
pub fn without_query_and_fragment(mut self) -> Self {
101102
let Self {
102-
origin: _, // Mandatory
103-
prefix: _, // Mandatory
104-
dataset_id: _, // Mandatory
105-
partition_id: _, // Mandatory
103+
endpoint_addr: _, // Mandatory
104+
dataset_id: _, // Mandatory
105+
partition_id: _, // Mandatory
106106
time_range,
107107
fragment,
108108
} = &mut self;
@@ -116,10 +116,9 @@ impl DatasetPartitionUri {
116116
/// Returns [`Self`] without any (optional) `#fragment`.
117117
pub fn without_fragment(mut self) -> Self {
118118
let Self {
119-
origin: _, // Mandatory
120-
prefix: _, // Mandatory
121-
dataset_id: _, // Mandatory
122-
partition_id: _, // Mandatory
119+
endpoint_addr: _, // Mandatory
120+
dataset_id: _, // Mandatory
121+
partition_id: _, // Mandatory
123122
time_range: _,
124123
fragment,
125124
} = &mut self;

0 commit comments

Comments
 (0)