Skip to content

WIP: multi-threaded ccp #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
77 changes: 68 additions & 9 deletions src/algs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ pub fn ipc_valid(v: String) -> std::result::Result<(), String> {
/// }
///
/// fn main() {
/// portus::start!("unix", None, MyCongestionControlAlgorithm(Default::default()));
/// let handle = portus::spawn!("unix", None, MyCongestionControlAlgorithm(Default::default()), 4u32);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation for start! should include a use of start!, not spawn!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before this doctest worked because install datapath programs would fail and so the function would exit. But after I added the code to wait if the datapath is not available, this doctest will hang forever. So I think our only options are (a) don't run the doctest, which is a little unfortunate because it's a nice test of the entire API with a sample congestion control algorithm or (b) use the spawn api and have it start ccp and then kill it with the handle. Do you have any other suggestions for being able to still test start too?

/// std::thread::sleep(std::time::Duration::from_secs(2));
/// handle.kill();
/// handle.wait();
/// }
/// ```
#[macro_export]
Expand All @@ -111,32 +114,88 @@ macro_rules! start {
$crate::start!($ipc, $log, $alg, Blocking)
}};
($ipc:expr, $log:expr, $alg: expr, $blk: ty) => {{
use $crate::ipc::BackendBuilder;
use $crate::ipc::SingleBackendBuilder;
match $ipc {
"unix" => {
use $crate::ipc::unix::Socket;
let b = Socket::<$blk>::new("in", "out")
.map(|sk| BackendBuilder { sock: sk })
let b = Socket::<$blk>::new(0, "in", "out")
.map(|sk| SingleBackendBuilder { sock: sk })
.expect("ipc initialization");
$crate::run::<_, _>(b, $crate::Config { logger: $log }, $alg)
$crate::run::<_, _, SingleBackendBuilder<_>>(
b,
$crate::Config { logger: $log },
$alg,
)
}
#[cfg(all(target_os = "linux"))]
"netlink" => {
use $crate::ipc::netlink::Socket;
let b = Socket::<$blk>::new()
.map(|sk| BackendBuilder { sock: sk })
.map(|sk| SingleBackendBuilder { sock: sk })
.expect("ipc initialization");
$crate::run::<_, _>(b, $crate::Config { logger: $log }, $alg)
$crate::run::<_, _, SingleBackendBuilder<_>>(
b,
$crate::Config { logger: $log },
$alg,
)
}
#[cfg(all(target_os = "linux"))]
"char" => {
use $crate::ipc::kp::Socket;
let b = Socket::<$blk>::new()
.map(|sk| BackendBuilder { sock: sk })
.map(|sk| SingleBackendBuilder { sock: sk })
.expect("ipc initialization");
$crate::run::<_, _>(b, $crate::Config { logger: $log }, $alg)
$crate::run::<_, _, SingleBackendBuilder<_>>(
b,
$crate::Config { logger: $log },
$alg,
)
}
_ => unreachable!(),
}
}};
($ipc:expr, $log:expr, $alg:expr, $blk:ty, $nthreads: expr) => {{
use std::convert::TryInto;
use $crate::ipc::MultiBackendBuilder;
match $ipc {
"unix" => {
use $crate::ipc::unix::Socket;
let mut v = vec![];
for i in 0..$nthreads {
v.push(Socket::<$blk>::new(i.try_into().unwrap(), "in", "out").unwrap())
}
let b = MultiBackendBuilder { socks: v };
$crate::run::<_, _, MultiBackendBuilder<_>>(
b,
$crate::Config { logger: $log },
$alg,
)
}
_ => unimplemented!(),
}
}};
}
#[macro_export]
macro_rules! spawn {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation? what is this used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added initially because of comment above about not being able to test start because it hangs but wanting to still test everything else -- but it seems like it makes sense to have a spawn equivalent of start! regardless since the interface is pretty much the same

($ipc:expr, $log:expr, $alg:expr, $nthreads: expr) => {{
use std::convert::TryInto;
use $crate::ipc::Blocking;
use $crate::ipc::MultiBackendBuilder;
match $ipc {
"unix" => {
use $crate::ipc::unix::Socket;
let mut v = vec![];
for i in 0..$nthreads {
v.push(Socket::<Blocking>::new(i.try_into().unwrap(), "in", "out").unwrap())
}
let b = MultiBackendBuilder { socks: v };
$crate::spawn::<_, _, MultiBackendBuilder<_>>(
b,
$crate::Config { logger: $log },
$alg,
)
}
_ => unimplemented!(),
}
}};
}
77 changes: 34 additions & 43 deletions src/bin/ipc_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ use std::sync::{atomic, Arc};
use time::Duration;

#[derive(Debug)]
struct TimeMsg(time::Timespec);
pub struct TimeMsg(time::Timespec);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be pub - it's in a binary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to be pub because of the separate deserialize_timemsg method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not pub, compiler says TimeMsg decleared as private, can't leak private type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then, why does deserialize_timemsg need to be pub?


use std::io::prelude::*;

impl portus::serialize::AsRawMsg for TimeMsg {
fn get_hdr(&self) -> (u8, u32, u32) {
(0xff, portus::serialize::HDR_LENGTH + 8 + 4, 0)
Expand All @@ -38,16 +39,19 @@ impl portus::serialize::AsRawMsg for TimeMsg {
Ok(())
}

fn from_raw_msg(msg: portus::serialize::RawMsg) -> portus::Result<Self> {
let b = msg.get_bytes()?;
let sec = LittleEndian::read_i64(&b[0..8]);
let nsec = LittleEndian::read_i32(&b[8..12]);
Ok(TimeMsg(time::Timespec::new(sec, nsec)))
fn from_raw_msg(_msg: portus::serialize::RawMsg) -> portus::Result<Self> {
unimplemented!()
}
}
pub fn deserialize_timemsg(msg: portus::serialize::other::Msg) -> portus::Result<TimeMsg> {
let b = msg.get_raw_bytes();
let sec = LittleEndian::read_i64(&b[0..8]);
let nsec = LittleEndian::read_i32(&b[8..12]);
Ok(TimeMsg(time::Timespec::new(sec, nsec)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has from_raw_msg been moved to this separate function and replaced with unimplemented. Also similarly why is this pub

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see commit message


#[derive(Debug)]
struct NlTimeMsg {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as above with TimeMsg

pub struct NlTimeMsg {
kern_rt: time::Timespec,
kern_st: time::Timespec,
}
Expand All @@ -74,29 +78,32 @@ impl portus::serialize::AsRawMsg for NlTimeMsg {
Ok(())
}

fn from_raw_msg(msg: portus::serialize::RawMsg) -> portus::Result<Self> {
let b = msg.get_bytes()?;
let up_sec = LittleEndian::read_i64(&b[0..8]);
let up_nsec = LittleEndian::read_i32(&b[8..12]);
let down_sec = LittleEndian::read_i64(&b[12..20]);
let down_nsec = LittleEndian::read_i32(&b[20..24]);
Ok(NlTimeMsg {
kern_rt: time::Timespec::new(up_sec, up_nsec),
kern_st: time::Timespec::new(down_sec, down_nsec),
})
fn from_raw_msg(_msg: portus::serialize::RawMsg) -> portus::Result<Self> {
unimplemented!()
}
}
pub fn deserialize_nltimemsg(msg: portus::serialize::other::Msg) -> portus::Result<NlTimeMsg> {
let b = msg.get_raw_bytes();
let up_sec = LittleEndian::read_i64(&b[0..8]);
let up_nsec = LittleEndian::read_i32(&b[8..12]);
let down_sec = LittleEndian::read_i64(&b[12..20]);
let down_nsec = LittleEndian::read_i32(&b[20..24]);
Ok(NlTimeMsg {
kern_rt: time::Timespec::new(up_sec, up_nsec),
kern_st: time::Timespec::new(down_sec, down_nsec),
})
}

use portus::serialize::AsRawMsg;
use portus::ipc::SingleBackend;
use std::sync::mpsc;
fn bench<T: Ipc>(b: BackendSender<T>, mut l: Backend<T>, iter: u32) -> Vec<Duration> {
fn bench<T: Ipc>(b: BackendSender<T>, mut l: SingleBackend<T>, iter: u32) -> Vec<Duration> {
(0..iter)
.map(|_| {
let then = time::get_time();
let msg = portus::serialize::serialize(&TimeMsg(then)).expect("serialize");
b.send_msg(&msg[..]).expect("send ts");
if let portus::serialize::Msg::Other(raw) = l.next().expect("receive echo") {
let then = TimeMsg::from_raw_msg(raw).expect("get time from raw");
let then = deserialize_timemsg(raw).expect("get time from raw");
time::get_time() - then.0
} else {
panic!("wrong type");
Expand Down Expand Up @@ -134,11 +141,8 @@ macro_rules! netlink_bench {

// listen
let c1 = thread::spawn(move || {
let mut buf = [0u8; 1024];
let mut nl = portus::ipc::netlink::Socket::<$mode>::new()
.map(|sk| {
Backend::new(sk, Arc::new(atomic::AtomicBool::new(true)), &mut buf[..])
})
.map(|sk| SingleBackend::new(sk, Arc::new(atomic::AtomicBool::new(true))))
.expect("nl ipc initialization");
tx.send(vec![]).expect("ok to insmod");
nl.next().expect("receive echo");
Expand All @@ -153,7 +157,7 @@ macro_rules! netlink_bench {
if let portus::serialize::Msg::Other(raw) = nl.next().expect("recv echo") {
let portus_rt = time::get_time();
let kern_recv_msg =
NlTimeMsg::from_raw_msg(raw).expect("get time from raw");
deserialize_nltimemsg(raw).expect("get time from raw");
return NlDuration(
portus_rt - portus_send_time,
kern_recv_msg.kern_rt - portus_send_time,
Expand Down Expand Up @@ -229,15 +233,8 @@ macro_rules! kp_bench {
.expect("load failed");

let c1 = thread::spawn(move || {
let mut receive_buf = [0u8; 1024];
let kp = portus::ipc::kp::Socket::<$mode>::new()
.map(|sk| {
Backend::new(
sk,
Arc::new(atomic::AtomicBool::new(true)),
&mut receive_buf[..],
)
})
.map(|sk| SingleBackend::new(sk, Arc::new(atomic::AtomicBool::new(true))))
.expect("kp ipc initialization");
tx.send(bench(kp.sender(), kp, iter)).expect("report rtts");
});
Expand Down Expand Up @@ -269,15 +266,8 @@ macro_rules! unix_bench {

// listen
let c1 = thread::spawn(move || {
let mut receive_buf = [0u8; 1024];
let unix = portus::ipc::unix::Socket::<$mode>::new("in", "out")
.map(|sk| {
Backend::new(
sk,
Arc::new(atomic::AtomicBool::new(true)),
&mut receive_buf[..],
)
})
let unix = portus::ipc::unix::Socket::<$mode>::new(1, "in", "out")
.map(|sk| SingleBackend::new(sk, Arc::new(atomic::AtomicBool::new(true))))
.expect("unix ipc initialization");
ready_rx.recv().expect("sync");
tx.send(bench(unix.sender(), unix, iter))
Expand All @@ -286,7 +276,8 @@ macro_rules! unix_bench {

// echo-er
let c2 = thread::spawn(move || {
let sk = portus::ipc::unix::Socket::<Blocking>::new("out", "in").expect("sk init");
let sk =
portus::ipc::unix::Socket::<Blocking>::new(1, "out", "in").expect("sk init");
let mut buf = [0u8; 1024];
ready_tx.send(true).expect("sync");
for _ in 0..iter {
Expand Down
Loading