Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/store/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl LogDataSource {
let uri_clone = uri.clone();
let stream_partition = async move {
let client = connection_registry
.client(uri_clone.origin.clone())
.client(uri_clone.endpoint_addr.origin.clone())
.await
.map_err(|err| ApiError::connection(err, "failed to connect to server"))?;
re_redap_client::stream_blueprint_and_partition_from_server(
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn stream_async(
on_msg: Option<Box<dyn Fn() + Send + Sync>>,
) -> Result<(), StreamError> {
let mut client = {
let url = uri.origin.as_url();
let url = uri.endpoint_addr.origin.as_url();

#[cfg(target_arch = "wasm32")]
let tonic_client = {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ async fn message_proxy_client(
compression: Compression,
status: Arc<AtomicCell<ClientConnectionState>>,
) {
let endpoint = match Endpoint::from_shared(uri.origin.as_url()) {
let endpoint = match Endpoint::from_shared(uri.endpoint_addr.origin.as_url()) {
Ok(endpoint) => endpoint,
Err(err) => {
status.store(ClientConnectionState::Disconnected(Err(
Expand Down
5 changes: 2 additions & 3 deletions crates/store/re_grpc_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ pub fn spawn_with_recv(
re_smart_channel::Receiver<re_log_types::DataSourceMessage>,
crossbeam::channel::Receiver<re_log_types::TableMsg>,
) {
let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
addr,
let uri = re_uri::ProxyUri::new(re_uri::EndpointAddr::new(
re_uri::Origin::from_scheme_and_socket_addr(re_uri::Scheme::RerunHttp, addr),
));
let (channel_log_tx, channel_log_rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_redap_client/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ pub async fn stream_blueprint_and_partition_from_server(
}

let re_uri::DatasetPartitionUri {
origin: _,
endpoint_addr: _,
dataset_id,
partition_id,
time_range,
Expand Down
8 changes: 5 additions & 3 deletions crates/top/re_sdk/src/grpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ impl GrpcServerSink {

let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;

let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
grpc_server_addr,
let uri = re_uri::ProxyUri::new(re_uri::EndpointAddr::new(
re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
grpc_server_addr,
),
));
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
re_smart_channel::SmartMessageSource::MessageProxy(uri.clone()),
Expand Down
8 changes: 5 additions & 3 deletions crates/top/re_sdk/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ impl WebViewerSink {
let (server_shutdown_signal, shutdown) = re_grpc_server::shutdown::shutdown();

let grpc_server_addr = format!("{bind_ip}:{grpc_port}").parse()?;
let uri = re_uri::ProxyUri::new(re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
grpc_server_addr,
let uri = re_uri::ProxyUri::new(re_uri::EndpointAddr::new(
re_uri::Origin::from_scheme_and_socket_addr(
re_uri::Scheme::RerunHttp,
grpc_server_addr,
),
));
let (channel_tx, channel_rx) = re_smart_channel::smart_channel::<re_log_types::LogMsg>(
re_smart_channel::SmartMessageSource::MessageProxy(uri),
Expand Down
102 changes: 102 additions & 0 deletions crates/utils/re_uri/src/endpoint_addr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::Origin;

/// A Rerun endpoint address, consisting of an origin and an optional path prefix.
///
/// Example: `https://rerun.io:443/custom/prefix`
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct EndpointAddr {
pub origin: Origin,

/// An optional path prefix, e.g. `/my/prefix`.
///
/// The prefix is guaranteed to start with a slash if it is not empty,
/// and guaranteed not to end with a slash.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path_prefix: Option<String>,
}

impl EndpointAddr {
/// Create a new [`EndpointAddr`] with the given origin and no path prefix.
pub fn new(origin: Origin) -> Self {
Self {
origin,
path_prefix: None,
}
}

/// Add a path prefix to the endpoint address.
///
/// The prefix is normalized:
/// - It will be ensured to start with a `/` if not empty.
/// - Trailing slashes will be removed.
pub fn with_path_prefix(mut self, path_prefix: impl Into<String>) -> Self {
let path_prefix = path_prefix.into();
if path_prefix.is_empty() || path_prefix == "/" {
self.path_prefix = None;
return self;
}

let mut path_prefix = path_prefix.trim_end_matches('/').to_owned();
if !path_prefix.starts_with('/') {
path_prefix.insert(0, '/');
}

self.path_prefix = Some(path_prefix);
self
}
}

impl std::fmt::Display for EndpointAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
origin,
path_prefix,
} = self;

write!(f, "{origin}")?;
if let Some(prefix) = path_prefix {
write!(f, "{prefix}")?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::Scheme;

fn test_origin() -> Origin {
Origin::from_scheme_and_socket_addr(Scheme::Rerun, "127.0.0.1:1234".parse().unwrap())
}

#[test]
fn test_serialization() {
let origin = test_origin();
let addr = EndpointAddr::new(origin.clone());
assert_eq!(addr.path_prefix, None);
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234");

let addr = EndpointAddr::new(origin.clone()).with_path_prefix("");
assert_eq!(addr.path_prefix, None);

let addr = EndpointAddr::new(origin.clone()).with_path_prefix("/");
assert_eq!(addr.path_prefix, None);

let addr = EndpointAddr::new(origin.clone()).with_path_prefix("foo");
assert_eq!(addr.path_prefix.as_deref(), Some("/foo"));
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo");

let addr = EndpointAddr::new(origin.clone()).with_path_prefix("/foo");
assert_eq!(addr.path_prefix.as_deref(), Some("/foo"));
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo");

let addr = EndpointAddr::new(origin.clone()).with_path_prefix("foo/");
assert_eq!(addr.path_prefix.as_deref(), Some("/foo"));
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo");

let addr = EndpointAddr::new(origin.clone()).with_path_prefix("/foo/bar");
assert_eq!(addr.path_prefix.as_deref(), Some("/foo/bar"));
assert_eq!(addr.to_string(), "rerun://127.0.0.1:1234/foo/bar");
}
}
11 changes: 6 additions & 5 deletions crates/utils/re_uri/src/endpoints/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use crate::{Origin, RedapUri};
use crate::{EndpointAddr, RedapUri};

/// `scheme://hostname:port/catalog`
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct CatalogUri {
pub origin: Origin,
pub endpoint_addr: EndpointAddr,
}

impl std::fmt::Display for CatalogUri {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/catalog", self.origin)
let Self { endpoint_addr } = self;
write!(f, "{endpoint_addr}/catalog")
}
}

impl CatalogUri {
pub fn new(origin: Origin) -> Self {
Self { origin }
pub fn new(endpoint_addr: EndpointAddr) -> Self {
Self { endpoint_addr }
}
}

Expand Down
28 changes: 16 additions & 12 deletions crates/utils/re_uri/src/endpoints/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use re_log_types::StoreId;

use crate::{Error, Fragment, Origin, RedapUri, TimeSelection};
use crate::{EndpointAddr, Error, Fragment, RedapUri, TimeSelection};

/// URI pointing at the data underlying a dataset.
///
Expand All @@ -11,7 +11,7 @@ use crate::{Error, Fragment, Origin, RedapUri, TimeSelection};
/// In the future we will add richer queries.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct DatasetPartitionUri {
pub origin: Origin,
pub endpoint_addr: EndpointAddr,
pub dataset_id: re_tuid::Tuid,

// Query parameters: these affect what data is returned.
Expand All @@ -26,14 +26,14 @@ pub struct DatasetPartitionUri {
impl std::fmt::Display for DatasetPartitionUri {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
origin,
endpoint_addr,
dataset_id,
partition_id,
time_range,
fragment,
} = self;

write!(f, "{origin}/dataset/{dataset_id}")?;
write!(f, "{endpoint_addr}/dataset/{dataset_id}")?;

// ?query:
{
Expand All @@ -54,7 +54,11 @@ impl std::fmt::Display for DatasetPartitionUri {
}

impl DatasetPartitionUri {
pub fn new(origin: Origin, dataset_id: re_tuid::Tuid, url: &url::Url) -> Result<Self, Error> {
pub fn new(
endpoint_addr: EndpointAddr,
dataset_id: re_tuid::Tuid,
url: &url::Url,
) -> Result<Self, Error> {
let mut partition_id = None;
let mut time_range = None;

Expand Down Expand Up @@ -85,7 +89,7 @@ impl DatasetPartitionUri {
}

Ok(Self {
origin,
endpoint_addr,
dataset_id,
partition_id,
time_range,
Expand All @@ -96,9 +100,9 @@ impl DatasetPartitionUri {
/// Returns [`Self`] without any (optional) `?query` or `#fragment`.
pub fn without_query_and_fragment(mut self) -> Self {
let Self {
origin: _, // Mandatory
dataset_id: _, // Mandatory
partition_id: _, // Mandatory
endpoint_addr: _, // Mandatory
dataset_id: _, // Mandatory
partition_id: _, // Mandatory
time_range,
fragment,
} = &mut self;
Expand All @@ -112,9 +116,9 @@ impl DatasetPartitionUri {
/// Returns [`Self`] without any (optional) `#fragment`.
pub fn without_fragment(mut self) -> Self {
let Self {
origin: _, // Mandatory
dataset_id: _, // Mandatory
partition_id: _, // Mandatory
endpoint_addr: _, // Mandatory
dataset_id: _, // Mandatory
partition_id: _, // Mandatory
time_range: _,
fragment,
} = &mut self;
Expand Down
18 changes: 12 additions & 6 deletions crates/utils/re_uri/src/endpoints/entry.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
use re_log_types::EntryId;

use crate::{Error, Origin, RedapUri};
use crate::{EndpointAddr, Error, RedapUri};

/// URI for a remote entry.
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)]
pub struct EntryUri {
pub origin: Origin,
pub endpoint_addr: EndpointAddr,
pub entry_id: EntryId,
}

impl std::fmt::Display for EntryUri {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { origin, entry_id } = self;
write!(f, "{origin}/entry/{entry_id}")
let Self {
endpoint_addr,
entry_id,
} = self;
write!(f, "{endpoint_addr}/entry/{entry_id}")
}
}

impl EntryUri {
pub fn new(origin: Origin, entry_id: EntryId) -> Self {
Self { origin, entry_id }
pub fn new(endpoint_addr: EndpointAddr, entry_id: EntryId) -> Self {
Self {
endpoint_addr,
entry_id,
}
}
}

Expand Down
11 changes: 6 additions & 5 deletions crates/utils/re_uri/src/endpoints/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use crate::{Origin, RedapUri};
use crate::{EndpointAddr, RedapUri};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ProxyUri {
pub origin: Origin,
pub endpoint_addr: EndpointAddr,
}

impl std::fmt::Display for ProxyUri {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/proxy", self.origin)
let Self { endpoint_addr } = self;
write!(f, "{endpoint_addr}/proxy")
}
}

impl ProxyUri {
pub fn new(origin: Origin) -> Self {
Self { origin }
pub fn new(endpoint_addr: EndpointAddr) -> Self {
Self { endpoint_addr }
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/utils/re_uri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
//!
//! ```

mod endpoint_addr;
mod endpoints;
mod error;
mod fragment;
Expand All @@ -42,6 +43,7 @@ mod scheme;
mod time_selection;

pub use self::{
endpoint_addr::EndpointAddr,
endpoints::{
catalog::CatalogUri, dataset::DatasetPartitionUri, entry::EntryUri, proxy::ProxyUri,
},
Expand Down
Loading
Loading