Skip to content
Draft
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ members = [

[workspace.dependencies]
ahash = "0.8.0"
arrayvec = "0.7.2"
backtrace = "0.3.66"
bitvec = "1.0.1"
blake3 = "1.3.1"
Expand All @@ -48,6 +49,7 @@ clap = "2.33.3"
crossbeam-channel = "0.5.6"
crossbeam-queue = "0.3.5"
foreign-types-shared = "0.3.1"
jemallocator = "0.5.0"
libc = "0.2.134"
log = "0.4.17"
memmap2 = "0.2.2"
Expand Down
6 changes: 3 additions & 3 deletions src/core/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,16 @@ impl Admin {
match request {
AdminRequest::FlushAll => {
let _ = self.signal_queue_tx.try_send_all(Signal::FlushAll);
session.send(AdminResponse::Ok)?;
session.send(&AdminResponse::Ok)?;
}
AdminRequest::Quit => {
return Err(Error::new(ErrorKind::Other, "should hangup"));
}
AdminRequest::Stats => {
session.send(AdminResponse::Stats)?;
session.send(&AdminResponse::Stats)?;
}
AdminRequest::Version => {
session.send(AdminResponse::version(self.version.clone()))?;
session.send(&AdminResponse::version(self.version.clone()))?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/proxy/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,10 @@ where
{
if let Some(session) = self.sessions.get_mut(token.0) {
if response.should_hangup() {
let _ = session.send(FrontendResponse::from(response));
let _ = session.send(&FrontendResponse::from(response));
self.close(token);
continue;
} else if session.send(FrontendResponse::from(response)).is_err() {
} else if session.send(&FrontendResponse::from(response)).is_err() {
self.close(token);
continue;
} else if session.write_pending() > 0 {
Expand Down
5 changes: 2 additions & 3 deletions src/core/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,17 @@ use admin::AdminBuilder;
use common::signal::Signal;
use common::ssl::tls_acceptor;
use config::*;
use core::marker::PhantomData;
use core::time::Duration;
use crossbeam_channel::{bounded, Sender};
use entrystore::EntryStore;
use logger::{Drain, Klog};
use protocol_common::{Compose, Execute, Parse};
use protocol_common::{Compose, Execute, IntoBuffers, Parse};
use queues::Queues;
use rustcommon_metrics::*;
use session::{Buf, ServerSession, Session};
use slab::Slab;
use std::io::{Error, ErrorKind, Result};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use waker::Waker;

mod listener;
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl<Parser, Request, Response, Storage> ProcessBuilder<Parser, Request, Respons
where
Parser: 'static + Parse<Request> + Clone + Send,
Request: 'static + Klog + Klog<Response = Response> + Send,
Response: 'static + Compose + Send,
Response: 'static + Compose + IntoBuffers + Send,
Storage: 'static + Execute<Request, Response> + EntryStore + Send,
{
pub fn new<T: AdminConfig + ServerConfig + TlsConfig + WorkerConfig>(
Expand Down
68 changes: 19 additions & 49 deletions src/core/server/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use std::thread::JoinHandle;

mod multi;
mod single;
mod storage;

use multi::*;
use single::*;
use storage::*;

heatmap!(
WORKER_EVENT_DEPTH,
Expand Down Expand Up @@ -44,16 +42,15 @@ pub enum Workers<Parser, Request, Response, Storage> {
worker: SingleWorker<Parser, Request, Response, Storage>,
},
Multi {
workers: Vec<MultiWorker<Parser, Request, Response>>,
storage: StorageWorker<Request, Response, Storage, Token>,
workers: Vec<MultiWorker<Parser, Request, Response, Storage>>,
},
}

impl<Parser, Request, Response, Storage> Workers<Parser, Request, Response, Storage>
where
Parser: 'static + Parse<Request> + Clone + Send,
Request: 'static + Klog + Klog<Response = Response> + Send,
Response: 'static + Compose + Send,
Response: 'static + Compose + IntoBuffers + Send,
Storage: 'static + EntryStore + Execute<Request, Response> + Send,
{
pub fn spawn(self) -> Vec<JoinHandle<()>> {
Expand All @@ -64,14 +61,8 @@ where
.spawn(move || worker.run())
.unwrap()]
}
Self::Multi {
mut workers,
mut storage,
} => {
let mut join_handles = vec![std::thread::Builder::new()
.name(format!("{}_storage", THREAD_PREFIX))
.spawn(move || storage.run())
.unwrap()];
Self::Multi { mut workers } => {
let mut join_handles = vec![];

for (id, mut worker) in workers.drain(..).enumerate() {
join_handles.push(
Expand All @@ -93,8 +84,7 @@ pub enum WorkersBuilder<Parser, Request, Response, Storage> {
worker: SingleWorkerBuilder<Parser, Request, Response, Storage>,
},
Multi {
workers: Vec<MultiWorkerBuilder<Parser, Request, Response>>,
storage: StorageWorkerBuilder<Request, Response, Storage>,
workers: Vec<MultiWorkerBuilder<Parser, Request, Response, Storage>>,
},
}

Expand All @@ -108,15 +98,18 @@ where
let threads = config.worker().threads();

if threads > 1 {
let storage = Arc::new(Mutex::new(storage));

let mut workers = vec![];
for _ in 0..threads {
workers.push(MultiWorkerBuilder::new(config, parser.clone())?)
workers.push(MultiWorkerBuilder::new(
config,
parser.clone(),
storage.clone(),
)?)
}

Ok(Self::Multi {
workers,
storage: StorageWorkerBuilder::new(config, storage)?,
})
Ok(Self::Multi { workers })
} else {
Ok(Self::Single {
worker: SingleWorkerBuilder::new(config, parser, storage)?,
Expand All @@ -129,10 +122,7 @@ where
Self::Single { worker } => {
vec![worker.waker()]
}
Self::Multi {
workers,
storage: _,
} => workers.iter().map(|w| w.waker()).collect(),
Self::Multi { workers } => workers.iter().map(|w| w.waker()).collect(),
}
}

Expand All @@ -141,8 +131,8 @@ where
Self::Single { worker } => {
vec![worker.waker()]
}
Self::Multi { workers, storage } => {
let mut wakers = vec![storage.waker()];
Self::Multi { workers } => {
let mut wakers = vec![];
for worker in workers {
wakers.push(worker.waker());
}
Expand All @@ -160,35 +150,15 @@ where
let mut session_queues = session_queues;
match self {
Self::Multi {
storage,
// storage,
mut workers,
} => {
let storage_wakers = vec![storage.waker()];
let worker_wakers: Vec<Arc<Waker>> = workers.iter().map(|v| v.waker()).collect();
let (mut worker_data_queues, mut storage_data_queues) =
Queues::new(worker_wakers, storage_wakers, QUEUE_CAPACITY);

// The storage thread precedes the worker threads in the set of
// wakers, so its signal queue is the first element of
// `signal_queues`. Its request queue is also the first (and
// only) element of `request_queues`. We remove these and build
// the storage so we can loop through the remaining signal
// queues when launching the worker threads.
let s = storage.build(storage_data_queues.remove(0), signal_queues.remove(0));

let mut w = Vec::new();
for worker_builder in workers.drain(..) {
w.push(worker_builder.build(
worker_data_queues.remove(0),
session_queues.remove(0),
signal_queues.remove(0),
));
w.push(worker_builder.build(session_queues.remove(0), signal_queues.remove(0)));
}

Workers::Multi {
storage: s,
workers: w,
}
Workers::Multi { workers: w }
}
Self::Single { worker } => Workers::Single {
worker: worker.build(session_queues.remove(0), signal_queues.remove(0)),
Expand Down
Loading