Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ build-data = { version = "0.2.3" }
clap = { version = "4.5.35", features = ["derive"] }
const_format = { version = "0.2.34" }
criterion = { version = "0.5.1", features = ["async_tokio"] }
ctrlc = { version = "3.4", features = ["termination"] }
error-stack = { version = "0.5", default-features = false, features = [
"std",
"serde",
] }
fastimer = { version = "0.8.0" }
fastimer = { version = "0.9.0" }
fastrace = { version = "0.7.9" }
fastrace-opentelemetry = { version = "0.10.0" }
foyer = { version = "0.16.0", features = ["nightly"] }
Expand Down
1 change: 1 addition & 0 deletions cmd/percas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ release = false

[dependencies]
clap = { workspace = true }
ctrlc = { workspace = true }
error-stack = { workspace = true }
fastimer = { workspace = true }
log = { workspace = true }
Expand Down
31 changes: 9 additions & 22 deletions cmd/percas/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,16 @@

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use clap::ValueHint;
use error_stack::Result;
use error_stack::ResultExt;
use fastimer::schedule::SimpleActionExt;
use percas_core::Config;
use percas_core::FoyerEngine;
use percas_core::num_cpus;
use percas_server::PercasContext;
use percas_server::runtime::Runtime;
use percas_server::runtime::make_runtime;
use percas_server::runtime::timer;
use percas_server::scheduled::ReportMetricsAction;
use percas_server::telemetry;

use crate::Error;
Expand All @@ -51,6 +47,7 @@ impl CommandStart {
let mut drop_guards =
telemetry::init(&telemetry_runtime, "percas", config.telemetry.clone());
drop_guards.push(Box::new(telemetry_runtime));
log::info!("Percas is starting with loaded config: {config:#?}");

let server_runtime = make_server_runtime();
server_runtime.block_on(run_server(&server_runtime, config))
Expand All @@ -67,34 +64,24 @@ fn make_server_runtime() -> Runtime {
}

async fn run_server(rt: &Runtime, config: Config) -> Result<(), Error> {
let make_error = || Error("failed to start server".to_string());

let engine = FoyerEngine::try_new(
&config.storage.data_dir,
config.storage.memory_capacity,
config.storage.disk_capacity,
)
.await
.change_context_lazy(make_error)?;
.change_context_lazy(|| Error("failed to start server".to_string()))?;

let ctx = Arc::new(PercasContext { engine });
let (server, shutdown_tx) = percas_server::server::start_server(rt, &config.server, ctx)
.await
.change_context_lazy(|| {
Error("A fatal error has occurred in server process.".to_string())
})?;

// Scheduled actions
ReportMetricsAction::new(ctx.clone()).schedule_with_fixed_delay(
rt,
timer(),
None,
Duration::from_secs(60),
);

log::info!("config: {config:#?}");
ctrlc::set_handler(move || shutdown_tx.shutdown())
.change_context_lazy(|| Error("failed to setup ctrl-c signal handle".to_string()))?;

let server = percas_server::server::start_server(&config.server, ctx)
.await
.inspect_err(|err| {
log::error!("server stopped: {}", err);
})
.change_context_lazy(make_error)?;
server.await_shutdown().await;
Ok(())
}
4 changes: 1 addition & 3 deletions crates/server/src/scheduled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ impl ReportMetricsAction {
}
}

impl fastimer::schedule::BaseAction for ReportMetricsAction {
impl fastimer::schedule::SimpleAction for ReportMetricsAction {
fn name(&self) -> &str {
"report_metrics"
}
}

impl fastimer::schedule::SimpleAction for ReportMetricsAction {
async fn run(&mut self) {
self.do_report().await;
}
Expand Down
63 changes: 44 additions & 19 deletions crates/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use mea::latch::Latch;
use fastimer::schedule::SimpleActionExt;
use mea::shutdown::ShutdownRecv;
use mea::shutdown::ShutdownSend;
use mea::waitgroup::WaitGroup;
use percas_core::ServerConfig;
use percas_metrics::GlobalMetrics;
Expand All @@ -40,6 +42,9 @@ use poem::web::Path;
use poem::web::headers::ContentType;

use crate::PercasContext;
use crate::runtime::Runtime;
use crate::runtime::timer;
use crate::scheduled::ReportMetricsAction;

struct LoggerMiddleware;

Expand Down Expand Up @@ -83,25 +88,28 @@ pub(crate) type ServerFuture<T> = tokio::task::JoinHandle<Result<T, io::Error>>;
pub struct ServerState {
server_advertise_addr: SocketAddr,
server_fut: ServerFuture<()>,
shutdown: Arc<Latch>,

shutdown_rx_server: ShutdownRecv,
shutdown_tx_actions: Vec<ShutdownSend>,
}

impl ServerState {
pub fn server_advertise_addr(&self) -> SocketAddr {
self.server_advertise_addr
}

pub fn shutdown_handle(&self) -> impl Fn() {
let shutdown = self.shutdown.clone();
move || shutdown.count_down()
}
pub async fn await_shutdown(self) {
self.shutdown_rx_server.is_shutdown().await;

pub fn shutdown(&self) {
self.shutdown_handle()();
}
log::info!("percas server is shutting down");

pub async fn await_shutdown(self) {
self.shutdown.wait().await;
for shutdown in self.shutdown_tx_actions.iter() {
shutdown.shutdown();
}
for shutdown in self.shutdown_tx_actions {
shutdown.await_shutdown().await;
}
log::info!("percas actions shutdown");

match self.server_fut.await {
Ok(_) => log::info!("percas server stopped."),
Expand All @@ -111,10 +119,12 @@ impl ServerState {
}

pub async fn start_server(
rt: &Runtime,
config: &ServerConfig,
ctx: Arc<PercasContext>,
) -> Result<ServerState, io::Error> {
let shutdown = Arc::new(Latch::new(1));
) -> Result<(ServerState, ShutdownSend), io::Error> {
let (shutdown_tx_server, shutdown_rx_server) = mea::shutdown::new_pair();

let wg = WaitGroup::new();

log::info!("listening on {}", config.listen_addr);
Expand All @@ -130,18 +140,18 @@ pub async fn start_server(
resolve_advertise_addr(listen_addr, config.advertise_addr.as_deref())?;

let server_fut = {
let shutdown_clone = shutdown.clone();
let shutdown_clone = shutdown_rx_server.clone();
let wg_clone = wg.clone();

let route = Route::new()
.at("/*key", poem::get(get).put(put).delete(delete))
.data(ctx)
.data(ctx.clone())
.with(LoggerMiddleware);
let signal = async move {
log::info!("server has started on [{listen_addr}]");
drop(wg_clone);

shutdown_clone.wait().await;
shutdown_clone.is_shutdown().await;
log::info!("server is closing");
};

Expand All @@ -153,11 +163,26 @@ pub async fn start_server(
};

wg.await;
Ok(ServerState {

// Scheduled actions
let mut shutdown_tx_actions = vec![];
let (shutdown_tx, shutdown_rx) = mea::shutdown::new_pair();
ReportMetricsAction::new(ctx.clone()).schedule_with_fixed_delay(
async move { shutdown_rx.is_shutdown().await },
rt,
timer(),
None,
Duration::from_secs(60),
);
shutdown_tx_actions.push(shutdown_tx);

let state = ServerState {
server_advertise_addr,
server_fut,
shutdown,
})
shutdown_rx_server,
shutdown_tx_actions,
};
Ok((state, shutdown_tx_server))
}

fn resolve_advertise_addr(
Expand Down
3 changes: 1 addition & 2 deletions tests/behavior/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ where

let exit_code = test(Testkit { client }).await.report();

state.server_state.shutdown();
state.server_state.await_shutdown().await;
state.shutdown().await;
exit_code
})
}
Expand Down
2 changes: 1 addition & 1 deletion tests/toolkit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ release = false

[dependencies]
local-ip-address = { workspace = true }
mea = { workspace = true }
percas-core = { workspace = true }
percas-server = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[lints]
workspace = true
14 changes: 12 additions & 2 deletions tests/toolkit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;
use std::net::SocketAddr;
use std::sync::Arc;

use mea::shutdown::ShutdownSend;
use percas_core::Config;
use percas_core::FoyerEngine;
use percas_core::LogsConfig;
Expand All @@ -40,9 +41,17 @@ type DropGuard = Box<dyn Any>;
#[derive(Debug)]
pub struct TestServerState {
pub server_state: ServerState,
shutdown_tx_server: ShutdownSend,
_drop_guards: Vec<DropGuard>,
}

impl TestServerState {
pub async fn shutdown(self) {
self.shutdown_tx_server.shutdown();
self.server_state.await_shutdown().await;
}
}

pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerState> {
let mut drop_guard = Vec::<DropGuard>::new();
drop_guard.extend(
Expand Down Expand Up @@ -81,7 +90,7 @@ pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerSta
},
};

let server_state = rt.block_on(async move {
let (server_state, shutdown_tx_server) = rt.block_on(async move {
let engine = FoyerEngine::try_new(
&config.storage.data_dir,
config.storage.memory_capacity,
Expand All @@ -90,14 +99,15 @@ pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerSta
.await
.unwrap();
let ctx = Arc::new(percas_server::PercasContext { engine });
percas_server::server::start_server(&config.server, ctx)
percas_server::server::start_server(rt, &config.server, ctx)
.await
.unwrap()
});

drop_guard.push(Box::new(temp_dir));
Some(TestServerState {
server_state,
shutdown_tx_server,
_drop_guards: drop_guard,
})
}
Loading