Skip to content

Commit 5df443c

Browse files
authored
refactor: upgrade and catch up server structure (#28)
Signed-off-by: tison <wander4096@gmail.com>
1 parent a9a23e5 commit 5df443c

File tree

9 files changed

+85
-54
lines changed

9 files changed

+85
-54
lines changed

Cargo.lock

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,12 @@ build-data = { version = "0.2.3" }
5050
clap = { version = "4.5.35", features = ["derive"] }
5151
const_format = { version = "0.2.34" }
5252
criterion = { version = "0.5.1", features = ["async_tokio"] }
53+
ctrlc = { version = "3.4", features = ["termination"] }
5354
error-stack = { version = "0.5", default-features = false, features = [
5455
"std",
5556
"serde",
5657
] }
57-
fastimer = { version = "0.8.0" }
58+
fastimer = { version = "0.9.0" }
5859
fastrace = { version = "0.7.9" }
5960
fastrace-opentelemetry = { version = "0.10.0" }
6061
foyer = { version = "0.16.0", features = ["nightly"] }

cmd/percas/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ release = false
2929

3030
[dependencies]
3131
clap = { workspace = true }
32+
ctrlc = { workspace = true }
3233
error-stack = { workspace = true }
3334
fastimer = { workspace = true }
3435
log = { workspace = true }

cmd/percas/src/start.rs

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,16 @@
1414

1515
use std::path::PathBuf;
1616
use std::sync::Arc;
17-
use std::time::Duration;
1817

1918
use clap::ValueHint;
2019
use error_stack::Result;
2120
use error_stack::ResultExt;
22-
use fastimer::schedule::SimpleActionExt;
2321
use percas_core::Config;
2422
use percas_core::FoyerEngine;
2523
use percas_core::num_cpus;
2624
use percas_server::PercasContext;
2725
use percas_server::runtime::Runtime;
2826
use percas_server::runtime::make_runtime;
29-
use percas_server::runtime::timer;
30-
use percas_server::scheduled::ReportMetricsAction;
3127
use percas_server::telemetry;
3228

3329
use crate::Error;
@@ -51,6 +47,7 @@ impl CommandStart {
5147
let mut drop_guards =
5248
telemetry::init(&telemetry_runtime, "percas", config.telemetry.clone());
5349
drop_guards.push(Box::new(telemetry_runtime));
50+
log::info!("Percas is starting with loaded config: {config:#?}");
5451

5552
let server_runtime = make_server_runtime();
5653
server_runtime.block_on(run_server(&server_runtime, config))
@@ -67,34 +64,24 @@ fn make_server_runtime() -> Runtime {
6764
}
6865

6966
async fn run_server(rt: &Runtime, config: Config) -> Result<(), Error> {
70-
let make_error = || Error("failed to start server".to_string());
71-
7267
let engine = FoyerEngine::try_new(
7368
&config.storage.data_dir,
7469
config.storage.memory_capacity,
7570
config.storage.disk_capacity,
7671
)
7772
.await
78-
.change_context_lazy(make_error)?;
73+
.change_context_lazy(|| Error("failed to start server".to_string()))?;
7974

8075
let ctx = Arc::new(PercasContext { engine });
76+
let (server, shutdown_tx) = percas_server::server::start_server(rt, &config.server, ctx)
77+
.await
78+
.change_context_lazy(|| {
79+
Error("A fatal error has occurred in server process.".to_string())
80+
})?;
8181

82-
// Scheduled actions
83-
ReportMetricsAction::new(ctx.clone()).schedule_with_fixed_delay(
84-
rt,
85-
timer(),
86-
None,
87-
Duration::from_secs(60),
88-
);
89-
90-
log::info!("config: {config:#?}");
82+
ctrlc::set_handler(move || shutdown_tx.shutdown())
83+
.change_context_lazy(|| Error("failed to setup ctrl-c signal handle".to_string()))?;
9184

92-
let server = percas_server::server::start_server(&config.server, ctx)
93-
.await
94-
.inspect_err(|err| {
95-
log::error!("server stopped: {}", err);
96-
})
97-
.change_context_lazy(make_error)?;
9885
server.await_shutdown().await;
9986
Ok(())
10087
}

crates/server/src/scheduled.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,11 @@ impl ReportMetricsAction {
3737
}
3838
}
3939

40-
impl fastimer::schedule::BaseAction for ReportMetricsAction {
40+
impl fastimer::schedule::SimpleAction for ReportMetricsAction {
4141
fn name(&self) -> &str {
4242
"report_metrics"
4343
}
44-
}
4544

46-
impl fastimer::schedule::SimpleAction for ReportMetricsAction {
4745
async fn run(&mut self) {
4846
self.do_report().await;
4947
}

crates/server/src/server.rs

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use std::net::SocketAddr;
1717
use std::sync::Arc;
1818
use std::time::Duration;
1919

20-
use mea::latch::Latch;
20+
use fastimer::schedule::SimpleActionExt;
21+
use mea::shutdown::ShutdownRecv;
22+
use mea::shutdown::ShutdownSend;
2123
use mea::waitgroup::WaitGroup;
2224
use percas_core::ServerConfig;
2325
use percas_metrics::GlobalMetrics;
@@ -40,6 +42,9 @@ use poem::web::Path;
4042
use poem::web::headers::ContentType;
4143

4244
use crate::PercasContext;
45+
use crate::runtime::Runtime;
46+
use crate::runtime::timer;
47+
use crate::scheduled::ReportMetricsAction;
4348

4449
struct LoggerMiddleware;
4550

@@ -83,25 +88,28 @@ pub(crate) type ServerFuture<T> = tokio::task::JoinHandle<Result<T, io::Error>>;
8388
pub struct ServerState {
8489
server_advertise_addr: SocketAddr,
8590
server_fut: ServerFuture<()>,
86-
shutdown: Arc<Latch>,
91+
92+
shutdown_rx_server: ShutdownRecv,
93+
shutdown_tx_actions: Vec<ShutdownSend>,
8794
}
8895

8996
impl ServerState {
9097
pub fn server_advertise_addr(&self) -> SocketAddr {
9198
self.server_advertise_addr
9299
}
93100

94-
pub fn shutdown_handle(&self) -> impl Fn() {
95-
let shutdown = self.shutdown.clone();
96-
move || shutdown.count_down()
97-
}
101+
pub async fn await_shutdown(self) {
102+
self.shutdown_rx_server.is_shutdown().await;
98103

99-
pub fn shutdown(&self) {
100-
self.shutdown_handle()();
101-
}
104+
log::info!("percas server is shutting down");
102105

103-
pub async fn await_shutdown(self) {
104-
self.shutdown.wait().await;
106+
for shutdown in self.shutdown_tx_actions.iter() {
107+
shutdown.shutdown();
108+
}
109+
for shutdown in self.shutdown_tx_actions {
110+
shutdown.await_shutdown().await;
111+
}
112+
log::info!("percas actions shutdown");
105113

106114
match self.server_fut.await {
107115
Ok(_) => log::info!("percas server stopped."),
@@ -111,10 +119,12 @@ impl ServerState {
111119
}
112120

113121
pub async fn start_server(
122+
rt: &Runtime,
114123
config: &ServerConfig,
115124
ctx: Arc<PercasContext>,
116-
) -> Result<ServerState, io::Error> {
117-
let shutdown = Arc::new(Latch::new(1));
125+
) -> Result<(ServerState, ShutdownSend), io::Error> {
126+
let (shutdown_tx_server, shutdown_rx_server) = mea::shutdown::new_pair();
127+
118128
let wg = WaitGroup::new();
119129

120130
log::info!("listening on {}", config.listen_addr);
@@ -130,18 +140,18 @@ pub async fn start_server(
130140
resolve_advertise_addr(listen_addr, config.advertise_addr.as_deref())?;
131141

132142
let server_fut = {
133-
let shutdown_clone = shutdown.clone();
143+
let shutdown_clone = shutdown_rx_server.clone();
134144
let wg_clone = wg.clone();
135145

136146
let route = Route::new()
137147
.at("/*key", poem::get(get).put(put).delete(delete))
138-
.data(ctx)
148+
.data(ctx.clone())
139149
.with(LoggerMiddleware);
140150
let signal = async move {
141151
log::info!("server has started on [{listen_addr}]");
142152
drop(wg_clone);
143153

144-
shutdown_clone.wait().await;
154+
shutdown_clone.is_shutdown().await;
145155
log::info!("server is closing");
146156
};
147157

@@ -153,11 +163,26 @@ pub async fn start_server(
153163
};
154164

155165
wg.await;
156-
Ok(ServerState {
166+
167+
// Scheduled actions
168+
let mut shutdown_tx_actions = vec![];
169+
let (shutdown_tx, shutdown_rx) = mea::shutdown::new_pair();
170+
ReportMetricsAction::new(ctx.clone()).schedule_with_fixed_delay(
171+
async move { shutdown_rx.is_shutdown().await },
172+
rt,
173+
timer(),
174+
None,
175+
Duration::from_secs(60),
176+
);
177+
shutdown_tx_actions.push(shutdown_tx);
178+
179+
let state = ServerState {
157180
server_advertise_addr,
158181
server_fut,
159-
shutdown,
160-
})
182+
shutdown_rx_server,
183+
shutdown_tx_actions,
184+
};
185+
Ok((state, shutdown_tx_server))
161186
}
162187

163188
fn resolve_advertise_addr(

tests/behavior/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ where
4141

4242
let exit_code = test(Testkit { client }).await.report();
4343

44-
state.server_state.shutdown();
45-
state.server_state.await_shutdown().await;
44+
state.shutdown().await;
4645
exit_code
4746
})
4847
}

tests/toolkit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ release = false
2727

2828
[dependencies]
2929
local-ip-address = { workspace = true }
30+
mea = { workspace = true }
3031
percas-core = { workspace = true }
3132
percas-server = { workspace = true }
3233
regex = { workspace = true }
3334
tempfile = { workspace = true }
34-
tokio = { workspace = true, features = ["full"] }
3535

3636
[lints]
3737
workspace = true

tests/toolkit/src/lib.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::any::Any;
1616
use std::net::SocketAddr;
1717
use std::sync::Arc;
1818

19+
use mea::shutdown::ShutdownSend;
1920
use percas_core::Config;
2021
use percas_core::FoyerEngine;
2122
use percas_core::LogsConfig;
@@ -40,9 +41,17 @@ type DropGuard = Box<dyn Any>;
4041
#[derive(Debug)]
4142
pub struct TestServerState {
4243
pub server_state: ServerState,
44+
shutdown_tx_server: ShutdownSend,
4345
_drop_guards: Vec<DropGuard>,
4446
}
4547

48+
impl TestServerState {
49+
pub async fn shutdown(self) {
50+
self.shutdown_tx_server.shutdown();
51+
self.server_state.await_shutdown().await;
52+
}
53+
}
54+
4655
pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerState> {
4756
let mut drop_guard = Vec::<DropGuard>::new();
4857
drop_guard.extend(
@@ -81,7 +90,7 @@ pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerSta
8190
},
8291
};
8392

84-
let server_state = rt.block_on(async move {
93+
let (server_state, shutdown_tx_server) = rt.block_on(async move {
8594
let engine = FoyerEngine::try_new(
8695
&config.storage.data_dir,
8796
config.storage.memory_capacity,
@@ -90,14 +99,15 @@ pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerSta
9099
.await
91100
.unwrap();
92101
let ctx = Arc::new(percas_server::PercasContext { engine });
93-
percas_server::server::start_server(&config.server, ctx)
102+
percas_server::server::start_server(rt, &config.server, ctx)
94103
.await
95104
.unwrap()
96105
});
97106

98107
drop_guard.push(Box::new(temp_dir));
99108
Some(TestServerState {
100109
server_state,
110+
shutdown_tx_server,
101111
_drop_guards: drop_guard,
102112
})
103113
}

0 commit comments

Comments
 (0)