Skip to content

WIP: Add session support #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
31 changes: 20 additions & 11 deletions examples/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
extern crate mux;
extern crate time;

use mux::session::*;

extern crate byteorder;
extern crate rand;

use mux::Rmsg;

use std::cmp::max;
use std::net::TcpStream;
use std::sync::Arc;

use std::thread;
use std::time::Duration;

fn test_session(socket: TcpStream) {
/*
println!("Testing mux client session.");

let session = Arc::new(MuxSession::new(socket).unwrap());

let res = session.ping().unwrap();
println!("Ping time: {:?}", res);
let iters = 10_000;
let threadc = 50;

let startt = time::get_time();

let threads: Vec<thread::JoinHandle<Duration>> = (0..50).map(|id| {
let threads: Vec<thread::JoinHandle<Duration>> = (0..threadc).map(|id| {
let session = session.clone();

thread::spawn(move || {
let mut ping_time = Duration::new(0, 0);
let iters = 10_000;
let mut pingc = 0;
for _ in 0..iters {
if rand::random::<u8>() > 64 {
let b = format!("Hello, world: {}", id).into_bytes();
Expand All @@ -38,9 +44,11 @@ fn test_session(socket: TcpStream) {
}
} else {
ping_time = ping_time + session.ping().unwrap();
pingc += 1;
}
}
ping_time/(iters as u32)

ping_time/max(1, pingc)
})
}).collect();

Expand All @@ -50,15 +58,16 @@ fn test_session(socket: TcpStream) {
total_ping = total_ping + t.join().unwrap();
}

println!("Finished. Average ping: {:?}", total_ping/threadc);
*/
let rps = {
let elapsed = time::get_time() - startt;
((iters*threadc) as f32/(elapsed.num_milliseconds() as f32)) * 1e3
};

println!("Finished. Rps: {}. Mean Ping: {:?}", rps, total_ping/threadc);
}

fn main() {
let socket = TcpStream::connect(("localhost", 9000)).unwrap();

println!("Testing TRequest frame.");
//test_trequest(&mut socket);
test_session(socket);
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern crate byteorder;

mod dtab;
pub mod codec;
pub mod session;
pub mod types;

pub use dtab::*;
Expand Down
35 changes: 35 additions & 0 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
mod sessionimpl;

use std::io;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;

use std::marker;

use self::sessionimpl::MuxSessionImpl;
use super::{Tdispatch, Rdispatch};

pub struct MuxSession {
inner: Arc<MuxSessionImpl>
}

unsafe impl marker::Send for MuxSession {}
unsafe impl marker::Sync for MuxSession {}


impl MuxSession {
pub fn new(socket: TcpStream) -> io::Result<MuxSession> {
let inner = Arc::new(try!(MuxSessionImpl::new(socket)));
Ok(MuxSession { inner: inner })
}

pub fn dispatch(&self, msg: &Tdispatch) -> io::Result<Rdispatch> {
self.inner.dispatch(msg)
}

#[inline]
pub fn ping(&self) -> io::Result<Duration> {
self.inner.ping()
}
}
Loading