diff --git a/rust/foxglove/src/channel.rs b/rust/foxglove/src/channel.rs index e041815bc..84617ac95 100644 --- a/rust/foxglove/src/channel.rs +++ b/rust/foxglove/src/channel.rs @@ -15,7 +15,7 @@ mod raw_channel; pub use channel_descriptor::ChannelDescriptor; pub use lazy_channel::{LazyChannel, LazyRawChannel}; -pub use raw_channel::RawChannel; +pub use raw_channel::{RawChannel, TransportKind}; /// Stack buffer size to use for encoding messages. const STACK_BUFFER_SIZE: usize = 256 * 1024; diff --git a/rust/foxglove/src/channel/raw_channel.rs b/rust/foxglove/src/channel/raw_channel.rs index fa4d1a4b3..f77ce83cd 100644 --- a/rust/foxglove/src/channel/raw_channel.rs +++ b/rust/foxglove/src/channel/raw_channel.rs @@ -18,6 +18,28 @@ use crate::{nanoseconds_since_epoch, Context, Metadata, PartialMetadata, Schema, /// Interval for throttled warnings. static WARN_THROTTLER_INTERVAL: Duration = Duration::from_secs(10); +/// Options that control how data for the channel is sent over the network with applicable sinks. +#[derive(Debug, Clone, Copy, Default)] +#[repr(u8)] +pub enum TransportKind { + /// Delivery of messages is best effort and may not be retried. This is the default. + /// This only affects [`CloudSink`](crate::CloudSink) which supports UDP, + /// not currently [`WebSocketServer`](crate::WebSocketServer) or [`McapWriter`](crate::McapWriter). + /// Messages are still delivered to the Foxglove in-order. + /// + /// Note: this only applies to messages under 15kb, larger messages will use Reliable transport. + #[default] + Lossy, + /// Delivery of messages is reliable, retrying if need need be. + /// This causes head-of-line blocking, and if there isn't enough bandwidth available + /// buffers and queues will grow longer. Be very sparing with this over poor quality networks. + /// + /// Doesn't affect [`McapWriter`](crate::McapWriter). + /// Note: delivery is still not guaranteed, because buffers and queues are not infinite. + /// Messages may still be dropped, or the OOM killer may reap the process. + Reliable, +} + /// A log channel that can be used to log binary messages. /// /// A "channel" is conceptually the same as a [MCAP channel]: it is a stream of messages which all @@ -38,6 +60,8 @@ pub struct RawChannel { context: Weak, sinks: LogSinkSet, closed: AtomicBool, + #[allow(dead_code)] + transport_kind: TransportKind, warn_throttler: Mutex, } @@ -48,6 +72,7 @@ impl RawChannel { message_encoding: String, schema: Option, metadata: BTreeMap, + transport_kind: TransportKind, ) -> Arc { Arc::new(Self { descriptor: ChannelDescriptor::new( @@ -60,6 +85,7 @@ impl RawChannel { context: Arc::downgrade(context), sinks: LogSinkSet::new(), closed: AtomicBool::new(false), + transport_kind, warn_throttler: Mutex::new(Throttler::new(WARN_THROTTLER_INTERVAL)), }) } diff --git a/rust/foxglove/src/channel_builder.rs b/rust/foxglove/src/channel_builder.rs index ffaefa124..5bfc4546e 100644 --- a/rust/foxglove/src/channel_builder.rs +++ b/rust/foxglove/src/channel_builder.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::sync::Arc; -use crate::{Channel, Context, Encode, FoxgloveError, RawChannel, Schema}; +use crate::{Channel, Context, Encode, FoxgloveError, RawChannel, Schema, TransportKind}; /// A builder for creating a [`Channel`] or [`RawChannel`]. #[must_use] @@ -12,6 +12,7 @@ pub struct ChannelBuilder { schema: Option, metadata: BTreeMap, context: Arc, + transport_kind: TransportKind, } impl ChannelBuilder { @@ -25,6 +26,7 @@ impl ChannelBuilder { schema: None, metadata: BTreeMap::new(), context: Context::get_default(), + transport_kind: TransportKind::Lossy, } } @@ -65,6 +67,13 @@ impl ChannelBuilder { self } + /// Sets the [`TransportKind`](crate::TransportKind) for the channel. + /// It controls how data for the channel is sent over the network with applicable sinks. + pub fn transport_kind(mut self, transport_kind: TransportKind) -> Self { + self.transport_kind = transport_kind; + self + } + /// Builds a [`RawChannel`]. /// /// Returns [`FoxgloveError::MessageEncodingRequired`] if no message encoding was specified. @@ -76,6 +85,7 @@ impl ChannelBuilder { .ok_or_else(|| FoxgloveError::MessageEncodingRequired)?, self.schema, self.metadata, + self.transport_kind, ); channel = self.context.add_channel(channel); Ok(channel) diff --git a/rust/foxglove/src/lib.rs b/rust/foxglove/src/lib.rs index 197d8f5ab..80a73c2be 100644 --- a/rust/foxglove/src/lib.rs +++ b/rust/foxglove/src/lib.rs @@ -339,7 +339,9 @@ pub mod stream; pub use app_url::AppUrl; // Re-export bytes crate for convenience when implementing the `Encode` trait pub use bytes; -pub use channel::{Channel, ChannelDescriptor, ChannelId, LazyChannel, LazyRawChannel, RawChannel}; +pub use channel::{ + Channel, ChannelDescriptor, ChannelId, LazyChannel, LazyRawChannel, RawChannel, TransportKind, +}; pub use channel_builder::ChannelBuilder; pub use context::{Context, LazyContext}; #[doc(hidden)]