-
Notifications
You must be signed in to change notification settings - Fork 72
Add TransportKind option to channels and channel builder #820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,28 @@ use crate::{nanoseconds_since_epoch, Context, Metadata, PartialMetadata, Schema, | |
| /// Interval for throttled warnings. | ||
| static WARN_THROTTLER_INTERVAL: Duration = Duration::from_secs(10); | ||
|
|
||
| /// Options that control how data for the channel is sent over the network with applicable sinks. | ||
| #[derive(Debug, Clone, Copy, Default)] | ||
| #[repr(u8)] | ||
| pub enum TransportKind { | ||
| /// Delivery of messages is best effort and may not be retried. This is the default. | ||
| /// This only affects [`CloudSink`](crate::CloudSink) which supports UDP, | ||
| /// not currently [`WebSocketServer`](crate::WebSocketServer) or [`McapWriter`](crate::McapWriter). | ||
| /// Messages are still delivered to the Foxglove in-order. | ||
| /// | ||
| /// Note: this only applies to messages under 15kb, larger messages will use Reliable transport. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we planning on dispatching based on message size? Or is this more saying "you must use Reliable transport if message size exceeds 15K"?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can do either. I think dynamic dispatch on message size is the best user experience (at least when not limited by network bandwidth), so I lean that way. |
||
| #[default] | ||
| Lossy, | ||
| /// Delivery of messages is reliable, retrying if need need be. | ||
| /// This causes head-of-line blocking, and if there isn't enough bandwidth available | ||
| /// buffers and queues will grow longer. Be very sparing with this over poor quality networks. | ||
| /// | ||
| /// Doesn't affect [`McapWriter`](crate::McapWriter). | ||
| /// Note: delivery is still not guaranteed, because buffers and queues are not infinite. | ||
| /// Messages may still be dropped, or the OOM killer may reap the process. | ||
| Reliable, | ||
| } | ||
|
|
||
| /// A log channel that can be used to log binary messages. | ||
| /// | ||
| /// A "channel" is conceptually the same as a [MCAP channel]: it is a stream of messages which all | ||
|
|
@@ -38,6 +60,8 @@ pub struct RawChannel { | |
| context: Weak<Context>, | ||
| sinks: LogSinkSet, | ||
| closed: AtomicBool, | ||
| #[allow(dead_code)] | ||
| transport_kind: TransportKind, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of exposing this at the top-level of a Channel, I wonder if we nest this in a TransportOptions struct, so the Channel API doesn't need to change as we introduce new QoS and transport configuration settings. Thoughts?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's a good idea
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this belongs on the channel at all. Transport options are particular to a (channel, sink) pair. I'd prefer that we rework this along the lines of sink channel filters, but of course only for those sinks that can offer differentiated QoS.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with that, it's specific to the (channel, sink) pair. But, it's imperative that it be efficient to access. If it requires a hash table lookup on each log call, I think it's too much. Given that, I think the Channel gives a logical and efficient place to store the setting, and sinks are free to ignore it unless they need it. Can you think of another way we could implement this efficiently?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really worried about a hashmap lookup on the fast path. If this becomes a measurable bottleneck, and we want to optimize, we could potentially store it during channel subscription, as part of the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep in mind it's not just a hashmap lookup, but taking a read() lock on a RwLock, doing the lookup, and releasing the lock. About ~400 cycles as a rough estimate, depending on caching and architecture. At 10k log messages/sec it'd be around 4 million cycles / sec just for checking a flag, which could have been a pointer indirection. If we don't make this part of the public interface, having it on the channel as an implementation detail seems ok to me. Less code to maintain too. We can always refactor it if we want to later.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, so then let's see what it would take to configure this via the sink, and store it in the |
||
| warn_throttler: Mutex<Throttler>, | ||
| } | ||
|
|
||
|
|
@@ -48,6 +72,7 @@ impl RawChannel { | |
| message_encoding: String, | ||
| schema: Option<Schema>, | ||
| metadata: BTreeMap<String, String>, | ||
| transport_kind: TransportKind, | ||
| ) -> Arc<Self> { | ||
| Arc::new(Self { | ||
| descriptor: ChannelDescriptor::new( | ||
|
|
@@ -60,6 +85,7 @@ impl RawChannel { | |
| context: Arc::downgrade(context), | ||
| sinks: LogSinkSet::new(), | ||
| closed: AtomicBool::new(false), | ||
| transport_kind, | ||
| warn_throttler: Mutex::new(Throttler::new(WARN_THROTTLER_INTERVAL)), | ||
| }) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: TransportReliability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or TransportDelivery? Naming things...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lean towards
TransportReliability.TransportKindandTransportDeliveryare a bit too generic for my taste; that could encapsulate things like History and Durability, which are orthogonal to this axes of delivery.