Skip to content

Commit ad6917f

Browse files
mayastor-borstiagolobocastro
andcommitted
Merge #1904
1904: fix: exit with error if gRPC port is busy r=tiagolobocastro a=tiagolobocastro By default, the panic behaviour is to unwind, meaning we don't exit. In case we fail to even start gRPC we should exit, as there's nothing to do. In this commit, we now internally raise a signal SIGUSR1 which then triggers the shutdown of the reactor et all. todo: We should also consider catching any panic and do the same, or is there any case where we'd want to unwind and remain running? Co-authored-by: Tiago Castro <[email protected]>
2 parents 02e79b6 + ebd1a91 commit ad6917f

File tree

5 files changed

+60
-39
lines changed

5 files changed

+60
-39
lines changed

Jenkinsfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ def dockerId() {
5959

6060
// TODO: Use multiple choices
6161
run_linter = true
62-
rust_test = true
63-
grpc_test = true
64-
pytest_test = true
62+
rust_test = false
63+
grpc_test = false
64+
pytest_test = false
6565
// WA https://issues.jenkins.io/browse/JENKINS-41929
6666
// on the first run of new parameters, they are set to null.
6767
run_tests = params.run_tests == null ? true : params.run_tests

io-engine/src/bin/io-engine.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,12 @@ fn start_tokio_runtime(args: &MayastorCliArgs) {
158158
futures.push(Registration::run().boxed());
159159
}
160160

161-
futures::future::try_join_all(futures)
162-
.await
163-
.expect("runtime exited in the normal state");
161+
if let Err(error) = futures::future::try_join_all(futures).await {
162+
error!(error, "tokio runtime exited unexpectedly");
163+
// todo: force process abort?
164+
signal_hook::low_level::raise(signal_hook::consts::SIGUSR1)
165+
.expect("failed to raise internal error");
166+
};
164167
});
165168
});
166169
}
@@ -296,5 +299,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
296299

297300
ms.fini();
298301
ms.event(EventAction::Start).generate();
302+
303+
if MayastorEnvironment::internal_aborted() {
304+
tracing::error!("Exitting with error due to an internal abort request");
305+
std::process::exit(1);
306+
}
307+
299308
Ok(())
300309
}

io-engine/src/core/env.rs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
pin::Pin,
77
str::FromStr,
88
sync::{
9-
atomic::{AtomicBool, Ordering::SeqCst},
9+
atomic::{AtomicI32, Ordering::SeqCst},
1010
Arc,
1111
Mutex,
1212
},
@@ -16,7 +16,7 @@ use std::{
1616
use byte_unit::{Byte, ByteUnit};
1717
use clap::Parser;
1818
use events_api::event::EventAction;
19-
use futures::{channel::oneshot, future};
19+
use futures::{channel::oneshot, future, TryFutureExt};
2020
use http::Uri;
2121
use once_cell::sync::{Lazy, OnceCell};
2222
use snafu::Snafu;
@@ -342,8 +342,7 @@ pub static GLOBAL_RC: Lazy<Arc<Mutex<i32>>> =
342342
Lazy::new(|| Arc::new(Mutex::new(-1)));
343343

344344
/// keep track if we have received a signal already
345-
pub static SIG_RECEIVED: Lazy<AtomicBool> =
346-
Lazy::new(|| AtomicBool::new(false));
345+
pub static SIG_RECEIVED: Lazy<AtomicI32> = Lazy::new(|| AtomicI32::new(0));
347346

348347
// FFI functions that are needed to initialize the environment
349348
extern "C" {
@@ -553,12 +552,12 @@ unsafe extern "C" fn signal_trampoline(_: *mut c_void) {
553552

554553
/// called on SIGINT and SIGTERM
555554
extern "C" fn mayastor_signal_handler(signo: i32) {
556-
if SIG_RECEIVED.load(SeqCst) {
555+
if SIG_RECEIVED.load(SeqCst) > 0 {
557556
return;
558557
}
559558

560-
warn!("Received SIGNO: {}", signo);
561-
SIG_RECEIVED.store(true, SeqCst);
559+
warn!("Received SIGNO: {signo}");
560+
SIG_RECEIVED.store(signo, SeqCst);
562561
unsafe {
563562
if let Some(mth) = Mthread::primary_safe() {
564563
spdk_thread_send_critical_msg(
@@ -650,21 +649,18 @@ impl MayastorEnvironment {
650649

651650
/// configure signal handling
652651
fn install_signal_handlers(&self) {
653-
unsafe {
654-
signal_hook::low_level::register(
655-
signal_hook::consts::SIGTERM,
656-
|| mayastor_signal_handler(1),
657-
)
658-
}
659-
.unwrap();
660-
661-
unsafe {
662-
signal_hook::low_level::register(
663-
signal_hook::consts::SIGINT,
664-
|| mayastor_signal_handler(1),
665-
)
652+
for signal in [
653+
signal_hook::consts::SIGTERM,
654+
signal_hook::consts::SIGINT,
655+
signal_hook::consts::SIGUSR1,
656+
] {
657+
unsafe {
658+
signal_hook::low_level::register(signal, move || {
659+
mayastor_signal_handler(signal)
660+
})
661+
}
662+
.expect("Failed to install handler for signal {signal}");
666663
}
667-
.unwrap();
668664
}
669665

670666
/// construct an array of options to be passed to EAL and start it
@@ -1093,7 +1089,8 @@ impl MayastorEnvironment {
10931089
where
10941090
F: FnOnce() + 'static,
10951091
{
1096-
type FutureResult = Result<(), ()>;
1092+
type FutureResult =
1093+
Result<(), Box<dyn std::error::Error + Send + Sync>>;
10971094
let node_name = self.node_name.clone();
10981095
let node_nqn = self.node_nqn.clone();
10991096

@@ -1133,7 +1130,9 @@ impl MayastorEnvironment {
11331130
)));
11341131
}
11351132
futures.push(Box::pin(subsys::Registration::run()));
1136-
futures.push(Box::pin(master));
1133+
futures.push(Box::pin(master.map_err(|_| {
1134+
std::io::Error::new(std::io::ErrorKind::Other, "").into()
1135+
})));
11371136
let _out = future::try_join_all(futures).await;
11381137
info!("reactors stopped");
11391138
ms.fini();
@@ -1146,6 +1145,11 @@ impl MayastorEnvironment {
11461145
pub fn make_hostnqn(&self) -> Option<String> {
11471146
self.node_nqn.clone()
11481147
}
1148+
1149+
/// Check if we had to internally aborted.
1150+
pub fn internal_aborted() -> bool {
1151+
SIG_RECEIVED.load(SeqCst) == signal_hook::consts::SIGUSR1
1152+
}
11491153
}
11501154

11511155
fn make_hostnqn(node_name: Option<&String>) -> Option<String> {

io-engine/src/grpc/server.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ impl MayastorGrpcServer {
6666
endpoint: std::net::SocketAddr,
6767
rpc_addr: String,
6868
api_versions: Vec<ApiVersion>,
69-
) -> Result<(), ()> {
70-
let mut rcv_chan = Self::get_or_init().rcv_chan.clone();
69+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
70+
let mut rcv_chan = Box::pin(Self::get_or_init().rcv_chan.clone());
7171

7272
let address = Cow::from(rpc_addr);
7373

@@ -76,10 +76,18 @@ impl MayastorGrpcServer {
7676

7777
let enable_v0 = api_versions.contains(&ApiVersion::V0).then_some(true);
7878
let enable_v1 = api_versions.contains(&ApiVersion::V1).then_some(true);
79-
info!(
80-
"{:?} gRPC server configured at address {}",
81-
api_versions, endpoint
82-
);
79+
80+
let incoming =
81+
tonic::transport::server::TcpIncoming::new(endpoint, true, None)
82+
.map_err(|e| {
83+
std::io::Error::new(
84+
std::io::ErrorKind::AddrInUse,
85+
format!(
86+
"Failed to bind gRPC socket to {endpoint}: {e}"
87+
),
88+
)
89+
})?;
90+
info!("{api_versions:?} gRPC server configured at address {endpoint}");
8391
let svc = Server::builder()
8492
.add_optional_service(
8593
enable_v1
@@ -141,7 +149,7 @@ impl MayastorGrpcServer {
141149
.add_optional_service(
142150
enable_v0.map(|_| BdevRpcServer::new(BdevSvc::new())),
143151
)
144-
.serve(endpoint);
152+
.serve_with_incoming(incoming);
145153

146154
select! {
147155
result = svc.fuse() => {
@@ -151,8 +159,8 @@ impl MayastorGrpcServer {
151159
Ok(())
152160
}
153161
Err(e) => {
154-
error!("gRPC server failed with error: {}", e);
155-
Err(())
162+
error!("gRPC server failed with error: {e:#?}");
163+
Err(e.into())
156164
}
157165
}
158166
},

io-engine/src/subsys/registration/registration_grpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ impl Registration {
197197

198198
/// runner responsible for registering and
199199
/// de-registering the mayastor instance on shutdown
200-
pub async fn run() -> Result<(), ()> {
200+
pub async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
201201
if let Some(registration) = GRPC_REGISTRATION.get() {
202202
registration.clone().run_loop().await;
203203
}

0 commit comments

Comments
 (0)