-
Notifications
You must be signed in to change notification settings - Fork 12
WIP: multi-threaded ccp #78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
The Other variant of Msg was previously just a wrapper around RawMsg, which must have the same lifetime as receive buf, because it has a direct slice into it. All of the other message variants copy the contents of RawMsg into their own struct, allowing them to own the data, meaning they don't need lifetime specifiers. So the only reason we have a lifetime specifier is for the other type, which itself is only used in tests and benchmarks. We can get rid of this by creating a wrapper Other message type that basically mimics RawMsg, but copies the contents. This is actually more realistic for the benchmark anyway, because it now more closely mimics the behavior of the real message types we use (i.e. it makes its own copy of the message).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Might be good to run the IPC benchmark to see if the performance has changed (use of
Arc
vsRc
among other changes). - I haven't yet looked at split of the overall diff into individual commits, we can look at that once we're happy with the overall structure
- Also pls run rustfmt, that is why travis is sad.
@@ -101,7 +101,10 @@ pub fn ipc_valid(v: String) -> std::result::Result<(), String> { | |||
/// } | |||
/// | |||
/// fn main() { | |||
/// portus::start!("unix", None, MyCongestionControlAlgorithm(Default::default())); | |||
/// let handle = portus::spawn!("unix", None, MyCongestionControlAlgorithm(Default::default()), 4u32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for start!
should include a use of start!
, not spawn!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So before this doctest worked because install datapath programs would fail and so the function would exit. But after I added the code to wait if the datapath is not available, this doctest will hang forever. So I think our only options are (a) don't run the doctest, which is a little unfortunate because it's a nice test of the entire API with a sample congestion control algorithm or (b) use the spawn api and have it start ccp and then kill it with the handle. Do you have any other suggestions for being able to still test start too?
}}; | ||
} | ||
#[macro_export] | ||
macro_rules! spawn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
documentation? what is this used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added initially because of comment above about not being able to test start because it hangs but wanting to still test everything else -- but it seems like it makes sense to have a spawn equivalent of start! regardless since the interface is pretty much the same
@@ -14,9 +14,10 @@ use std::sync::{atomic, Arc}; | |||
use time::Duration; | |||
|
|||
#[derive(Debug)] | |||
struct TimeMsg(time::Timespec); | |||
pub struct TimeMsg(time::Timespec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to be pub
- it's in a binary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed to be pub because of the separate deserialize_timemsg
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's not pub, compiler says TimeMsg decleared as private, can't leak private type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then, why does deserialize_timemsg
need to be pub?
let sec = LittleEndian::read_i64(&b[0..8]); | ||
let nsec = LittleEndian::read_i32(&b[8..12]); | ||
Ok(TimeMsg(time::Timespec::new(sec, nsec))) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why has from_raw_msg
been moved to this separate function and replaced with unimplemented
. Also similarly why is this pub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see commit message
/// Backend will yield incoming IPC messages forever via `next()`. | ||
/// It owns the socket; `BackendSender` holds weak references. | ||
/// The atomic bool is a way to stop iterating. | ||
pub struct SingleBackend<T: Ipc> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving this up above MultiBackend
would clean up the diff
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
fn drop(&mut self) { | ||
Rc::get_mut(&mut self.sock) | ||
Arc::get_mut(&mut self.sock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this have to be an Arc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because backend is moved inside the sending thread (around line 128) so it has to be send which means all of its members must be send
}; | ||
} | ||
|
||
pub struct Socket<T> { | ||
sk: UnixDatagram, | ||
dest: String, | ||
_phantom: PhantomData<T>, | ||
_id: u8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please move above _phantom
- PhantomData
fields are supposed to be last
@@ -549,7 +587,7 @@ where | |||
let f = alg.new_flow( | |||
Datapath { | |||
sock_id: c.sid, | |||
sender: backend.clone(), | |||
sender: b.sender(), // backend.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls remove old comments
@@ -0,0 +1,41 @@ | |||
//! Message sent from datapath to CCP when a new flow starts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this called "other" if it's actually a "DatapathInitMsg" or whatever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, looks like i accidentally copied that comment, it's not correct :)
other is just a wrapper around RawMsg that owns the bytes to remove the lifetime specifier
…ltibackend The idea here is to allow CCP to listen on multiple sockets, allowing datapaths (such as mTCP or mvfst) to internally maintain a separate connection with ccp per-thread, without changing the api or main implementation loop much. Aside from adding backend traits to the function signatures, the main run_inner loop does not change
417ebce
to
071296b
Compare
This is a WIP, not ready to be merged yet.
This PR contains a few commits to enable an easier integration with mvfst.
The only change to the interface is an additional typeparameter to run allowing for different backendbuilders to be passed in. The traditional backend and backendbuilder were renamed to singlebackend and singlebackendbuilder.
Once this new interface is in place, it's possible to implement multi-threaded ccp without changing the api or internal run_inner implementation by implementing a multibackend (and multibackendbuilder).