Skip to content

Commit c94de54

Browse files
mayastor-borstiagolobocastro
andcommitted
Merge #1901
1901: fix: exit with error if gRPC port is busy r=tiagolobocastro a=tiagolobocastro By default, the panic behaviour is to unwind, meaning we don't exit. In case we fail to even start gRPC we should exit, as there's nothing to do. In this commit, we now internally raise a signal SIGUSR1 which then triggers the shutdown of the reactor et all. todo: We should also consider catching any panic and do the same, or is there any case where we'd want to unwind and remain running? Co-authored-by: Tiago Castro <[email protected]>
2 parents 2ef980b + b80aaa9 commit c94de54

File tree

4 files changed

+50
-32
lines changed

4 files changed

+50
-32
lines changed

io-engine/src/bin/io-engine.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,12 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {
167167
futures.push(Registration::run().boxed());
168168
}
169169

170-
futures::future::try_join_all(futures)
171-
.await
172-
.expect("runtime exited in the normal state");
170+
if let Err(error) = futures::future::try_join_all(futures).await {
171+
error!(error, "tokio runtime exited unexpectedly");
172+
// todo: force process abort?
173+
signal_hook::low_level::raise(signal_hook::consts::SIGUSR1)
174+
.expect("failed to raise internal error");
175+
};
173176
});
174177
});
175178
}
@@ -313,5 +316,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
313316

314317
ms.fini();
315318
ms.event(EventAction::Start).generate();
319+
320+
if MayastorEnvironment::internal_aborted() {
321+
tracing::error!("Exitting with error due to an internal abort request");
322+
std::process::exit(1);
323+
}
324+
316325
Ok(())
317326
}

io-engine/src/core/env.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
pin::Pin,
66
str::FromStr,
77
sync::{
8-
atomic::{AtomicBool, Ordering::SeqCst},
8+
atomic::{AtomicI32, Ordering::SeqCst},
99
Arc, Mutex,
1010
},
1111
time::Duration,
@@ -14,7 +14,7 @@ use std::{
1414
use byte_unit::{Byte, Unit};
1515
use clap::Parser;
1616
use events_api::event::EventAction;
17-
use futures::{channel::oneshot, future};
17+
use futures::{channel::oneshot, future, TryFutureExt};
1818
use http::Uri;
1919
use once_cell::sync::{Lazy, OnceCell};
2020
use snafu::Snafu;
@@ -381,7 +381,7 @@ impl MayastorCliArgs {
381381
pub static GLOBAL_RC: Lazy<Arc<Mutex<i32>>> = Lazy::new(|| Arc::new(Mutex::new(-1)));
382382

383383
/// keep track if we have received a signal already
384-
pub static SIG_RECEIVED: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(false));
384+
pub static SIG_RECEIVED: Lazy<AtomicI32> = Lazy::new(|| AtomicI32::new(0));
385385

386386
#[derive(Debug, Snafu)]
387387
pub enum EnvError {
@@ -575,12 +575,12 @@ unsafe extern "C" fn signal_trampoline(_: *mut c_void) {
575575

576576
/// called on SIGINT and SIGTERM
577577
extern "C" fn mayastor_signal_handler(signo: i32) {
578-
if SIG_RECEIVED.load(SeqCst) {
578+
if SIG_RECEIVED.load(SeqCst) > 0 {
579579
return;
580580
}
581581

582-
warn!("Received SIGNO: {}", signo);
583-
SIG_RECEIVED.store(true, SeqCst);
582+
warn!("Received SIGNO: {signo}");
583+
SIG_RECEIVED.store(signo, SeqCst);
584584
unsafe {
585585
if let Some(mth) = Mthread::primary_safe() {
586586
spdk_thread_send_critical_msg(mth.as_ptr(), Some(signal_trampoline));
@@ -669,19 +669,16 @@ impl MayastorEnvironment {
669669

670670
/// configure signal handling
671671
fn install_signal_handlers(&self) {
672-
unsafe {
673-
signal_hook::low_level::register(signal_hook::consts::SIGTERM, || {
674-
mayastor_signal_handler(1)
675-
})
676-
}
677-
.unwrap();
678-
679-
unsafe {
680-
signal_hook::low_level::register(signal_hook::consts::SIGINT, || {
681-
mayastor_signal_handler(1)
682-
})
672+
for signal in [
673+
signal_hook::consts::SIGTERM,
674+
signal_hook::consts::SIGINT,
675+
signal_hook::consts::SIGUSR1,
676+
] {
677+
unsafe {
678+
signal_hook::low_level::register(signal, move || mayastor_signal_handler(signal))
679+
}
680+
.expect("Failed to install handler for signal {signal}");
683681
}
684-
.unwrap();
685682
}
686683

687684
/// construct an array of options to be passed to EAL and start it
@@ -1128,7 +1125,7 @@ impl MayastorEnvironment {
11281125
where
11291126
F: FnOnce() + 'static,
11301127
{
1131-
type FutureResult = Result<(), ()>;
1128+
type FutureResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
11321129
let node_name = self.node_name.clone();
11331130
let node_nqn = self.node_nqn.clone();
11341131

@@ -1166,7 +1163,9 @@ impl MayastorEnvironment {
11661163
)));
11671164
}
11681165
futures.push(Box::pin(subsys::Registration::run()));
1169-
futures.push(Box::pin(master));
1166+
futures.push(Box::pin(
1167+
master.map_err(|_| std::io::Error::other("").into()),
1168+
));
11701169
let _out = future::try_join_all(futures).await;
11711170
info!("reactors stopped");
11721171
ms.fini();
@@ -1179,6 +1178,11 @@ impl MayastorEnvironment {
11791178
pub fn make_hostnqn(&self) -> Option<String> {
11801179
self.node_nqn.clone()
11811180
}
1181+
1182+
/// Check if we had to internally aborted.
1183+
pub fn internal_aborted() -> bool {
1184+
SIG_RECEIVED.load(SeqCst) == signal_hook::consts::SIGUSR1
1185+
}
11821186
}
11831187

11841188
fn make_hostnqn(node_name: Option<&String>) -> Option<String> {

io-engine/src/grpc/server.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl MayastorGrpcServer {
5454
endpoint: std::net::SocketAddr,
5555
rpc_addr: String,
5656
api_versions: Vec<ApiVersion>,
57-
) -> Result<(), ()> {
57+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
5858
let mut rcv_chan = Box::pin(Self::get_or_init().rcv_chan.clone());
5959

6060
let address = Cow::from(rpc_addr);
@@ -64,10 +64,15 @@ impl MayastorGrpcServer {
6464

6565
let enable_v0 = api_versions.contains(&ApiVersion::V0).then_some(true);
6666
let enable_v1 = api_versions.contains(&ApiVersion::V1).then_some(true);
67-
info!(
68-
"{:?} gRPC server configured at address {}",
69-
api_versions, endpoint
70-
);
67+
68+
let incoming =
69+
tonic::transport::server::TcpIncoming::new(endpoint, true, None).map_err(|e| {
70+
std::io::Error::new(
71+
std::io::ErrorKind::AddrInUse,
72+
format!("Failed to bind gRPC socket to {endpoint}: {e}"),
73+
)
74+
})?;
75+
info!("{api_versions:?} gRPC server configured at address {endpoint}");
7176
let svc = Server::builder()
7277
.add_optional_service(
7378
enable_v1.map(|_| v1::bdev::BdevRpcServer::new(BdevService::new())),
@@ -115,7 +120,7 @@ impl MayastorGrpcServer {
115120
enable_v0.map(|_| JsonRpcServer::new(JsonRpcSvc::new(address.clone()))),
116121
)
117122
.add_optional_service(enable_v0.map(|_| BdevRpcServer::new(BdevSvc::new())))
118-
.serve(endpoint);
123+
.serve_with_incoming(incoming);
119124

120125
select! {
121126
result = svc.fuse() => {
@@ -125,8 +130,8 @@ impl MayastorGrpcServer {
125130
Ok(())
126131
}
127132
Err(e) => {
128-
error!("gRPC server failed with error: {}", e);
129-
Err(())
133+
error!("gRPC server failed with error: {e:#?}");
134+
Err(e.into())
130135
}
131136
}
132137
},

io-engine/src/subsys/registration/registration_grpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl Registration {
189189

190190
/// runner responsible for registering and
191191
/// de-registering the mayastor instance on shutdown
192-
pub async fn run() -> Result<(), ()> {
192+
pub async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
193193
if let Some(registration) = GRPC_REGISTRATION.get() {
194194
registration.clone().run_loop().await;
195195
}

0 commit comments

Comments
 (0)