Skip to content

Commit 4ff95eb

Browse files
mayastor-borsdsharma-dc
andcommitted
Merge #1879
1879: feat(persistence): update replica health condition as atomic operation r=dsharma-dc a=dsharma-dc Fail the transaction operation and shutdown the nexus to avoid succeeding IO via remaining replicas. This ensures data integrity in case the replica being marked here has been picked up as source of truth elsewhere for the volume. Co-authored-by: Diwakar Sharma <[email protected]>
2 parents 2857f5f + 45e3d9e commit 4ff95eb

File tree

7 files changed

+216
-73
lines changed

7 files changed

+216
-73
lines changed

io-engine/src/bdev/nexus/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub use nexus_iter::{
4848
pub(crate) use nexus_module::{NexusModule, NEXUS_MODULE_NAME};
4949
pub(crate) use nexus_nbd::{NbdDisk, NbdError};
5050
pub(crate) use nexus_persistence::PersistOp;
51-
pub use nexus_persistence::{ChildInfo, NexusInfo};
51+
pub use nexus_persistence::{ChildInfo, NexusInfo, NexusInfoTxn, PersistentNexusInfo};
5252
pub(crate) use nexus_share::NexusPtpl;
5353

5454
pub use nexus_bdev_snapshot::{

io-engine/src/bdev/nexus/nexus_bdev_error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ pub enum Error {
176176
InvalidReservation { reservation: u8 },
177177
#[snafu(display("failed to update share properties {}", name))]
178178
UpdateShareProperties { source: CoreError, name: String },
179-
#[snafu(display("failed to save nexus state {}", name))]
179+
#[snafu(display("failed to save nexus state {name}, {source}"))]
180180
SaveStateFailed { source: StoreError, name: String },
181181
}
182182

io-engine/src/bdev/nexus/nexus_persistence.rs

Lines changed: 124 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
use super::{IoMode, Nexus, NexusChild};
2-
use crate::{persistent_store::PersistentStore, sleep::mayastor_sleep};
2+
use crate::{
3+
persistent_store::{to_json_byte_vec, PersistentStore},
4+
sleep::mayastor_sleep,
5+
store::store_defs::StoreError,
6+
};
7+
use etcd_client::Error as EtcdErr;
38
use serde::{Deserialize, Serialize};
49
use std::time::Duration;
510

@@ -8,10 +13,10 @@ use super::Error;
813
/// Information associated with the persisted NexusInfo structure.
914
pub struct PersistentNexusInfo {
1015
/// Structure that is written to the persistent store.
11-
inner: NexusInfo,
16+
pub inner: NexusInfo,
1217
/// Key to use to persist the NexusInfo structure.
1318
/// If `Some` the key has been supplied by the control plane.
14-
key: Option<String>,
19+
pub key: Option<String>,
1520
}
1621

1722
impl PersistentNexusInfo {
@@ -31,13 +36,20 @@ impl PersistentNexusInfo {
3136

3237
/// Definition of the nexus information that gets saved in the persistent
3338
/// store.
34-
#[derive(Serialize, Deserialize, Debug, Default)]
39+
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
3540
pub struct NexusInfo {
3641
/// Nexus destroyed successfully.
3742
pub clean_shutdown: bool,
43+
/// Nexus needs to be shutdown.
44+
pub do_self_shutdown: bool,
3845
/// Information about children.
3946
pub children: Vec<ChildInfo>,
4047
}
48+
pub struct NexusInfoTxn<'a> {
49+
key_info: &'a mut PersistentNexusInfo,
50+
// Expected value for the key.
51+
expected: NexusInfo,
52+
}
4153

4254
/// Definition of the child information that gets saved in the persistent
4355
/// store.
@@ -96,6 +108,7 @@ impl<'n> Nexus<'n> {
96108
// expect the NexusInfo structure to contain default values.
97109
assert!(nexus_info.children.is_empty());
98110
assert!(!nexus_info.clean_shutdown);
111+
assert!(!nexus_info.do_self_shutdown);
99112
self.children_iter().for_each(|c| {
100113
let child_info = ChildInfo {
101114
uuid: NexusChild::uuid(c.uri()).expect("Failed to get child UUID."),
@@ -164,11 +177,40 @@ impl<'n> Nexus<'n> {
164177

165178
let uuid = NexusChild::uuid(child_uri).expect("Failed to get child UUID.");
166179

180+
let expected_value = nexus_info.clone();
167181
nexus_info.children.iter_mut().for_each(|c| {
168182
if c.uuid == uuid {
169183
c.healthy = *healthy;
170184
}
171185
});
186+
187+
let mut txn = NexusInfoTxn {
188+
key_info: &mut persistent_nexus_info,
189+
expected: expected_value,
190+
};
191+
192+
// Try executing the transaction. If the nexus info key's value isn't what we
193+
// expect, shutdown this nexus. This can only happen if do_self_shutdown is
194+
// found true in etcd for this key, which can only be set by core-agent's
195+
// republish path. In this situation, the newly published nexus could've
196+
// picked up the replica that we are trying to mark unhealthy here. Shutting
197+
// down this nexus here ensures we don't let this IO succeed to other replicas
198+
// after marking this one as unhealthy.
199+
match self.save_txn(&mut txn).await {
200+
Ok(_) => {
201+
self.set_nexus_io_mode(IoMode::Normal).await;
202+
return Ok(());
203+
}
204+
Err(e) => {
205+
error!(
206+
"{self:?}: failed to update persistent store txn, \
207+
will shutdown the nexus: {e}"
208+
);
209+
self.try_self_shutdown();
210+
211+
return Err(e);
212+
}
213+
}
172214
}
173215
PersistOp::Shutdown => {
174216
// Only update the clean shutdown variable. Do not update the
@@ -243,4 +285,82 @@ impl<'n> Nexus<'n> {
243285
}
244286
}
245287
}
288+
289+
async fn save_txn(&self, info: &mut NexusInfoTxn<'_>) -> Result<(), Error> {
290+
// If a key has been provided, use it to store the NexusInfo; use the
291+
// nexus uuid as the key otherwise.
292+
let key = match &info.key_info.key {
293+
Some(k) => k.clone(),
294+
None => self.uuid().to_string(),
295+
};
296+
297+
let mut retry = PersistentStore::retries();
298+
299+
let new_value = to_json_byte_vec(&info.key_info.inner);
300+
let expected_value = to_json_byte_vec(&info.expected);
301+
let mut logged = false;
302+
303+
loop {
304+
match PersistentStore::txn_create_execute(&key, &new_value, &expected_value).await {
305+
Ok(txn_resp) => {
306+
if let Some(current_value) = txn_resp {
307+
let val = serde_json::from_slice::<NexusInfo>(&current_value).unwrap();
308+
309+
// The server had likely received the transaction and executed it, but
310+
// client here saw a timeout. So if the current value is same as what we intended
311+
// to set, then consider success. Don't trust any other value and shutdown.
312+
if current_value == new_value {
313+
info!("value for key {key} already updated: {val:?}");
314+
return Ok(());
315+
}
316+
317+
warn!("current state found: key - {key}, value - {val:?}");
318+
319+
// This nexus won't be used again but let's still update this field with what's
320+
// found in etcd.
321+
info.key_info.inner_mut().do_self_shutdown = val.do_self_shutdown;
322+
323+
return Err(Error::SaveStateFailed {
324+
source: StoreError::Txn {
325+
key: key.clone(),
326+
source: EtcdErr::IoError(std::io::Error::new(
327+
std::io::ErrorKind::Other,
328+
"Txn CompareOp failed",
329+
)),
330+
},
331+
name: self.name.clone(),
332+
});
333+
} else {
334+
// Don't need to check individual op responses.
335+
debug!(?key, "{self:?}: the state was saved successfully via txn");
336+
return Ok(());
337+
}
338+
}
339+
340+
Err(err) => {
341+
retry -= 1;
342+
if retry == 0 {
343+
return Err(Error::SaveStateFailed {
344+
source: err,
345+
name: self.name.clone(),
346+
});
347+
}
348+
349+
if !logged {
350+
error!(
351+
"{self:?}: failed to persist nexus info transaction: {err}\
352+
will silently retry ({retry} left): {err}"
353+
);
354+
logged = true;
355+
}
356+
357+
// Allow some time for the connection to the persistent
358+
// store to be re-established before retrying the operation.
359+
if mayastor_sleep(Duration::from_secs(1)).await.is_err() {
360+
error!("{self:?}: failed to wait for sleep");
361+
}
362+
}
363+
};
364+
}
365+
}
246366
}

io-engine/src/persistent_store.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,18 @@
55
//! the etcd-client crate. This crate has a dependency on the tokio async
66
//! runtime.
77
use crate::{
8-
core,
9-
core::Reactor,
8+
core::{self, Reactor},
109
store::{
1110
etcd::Etcd,
1211
store_defs::{
1312
DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue, TxnWait,
1413
},
1514
},
1615
};
17-
use etcd_client::{Compare, TxnOp, TxnResponse};
1816
use futures::channel::oneshot;
1917
use once_cell::sync::OnceCell;
2018
use parking_lot::Mutex;
19+
use serde::Serialize;
2120
use serde_json::Value;
2221
use snafu::ResultExt;
2322
use std::{future::Future, time::Duration};
@@ -196,18 +195,27 @@ impl PersistentStore {
196195
})?
197196
}
198197

199-
/// Executes a transaction for the given key.
200-
pub async fn txn(
198+
/// Executes a transaction with Put op for the given key.
199+
/// On failure, Get the key value.
200+
pub async fn txn_create_execute(
201201
key: &impl StoreKey,
202-
cmps: Vec<Compare>,
203-
ops_success: Vec<TxnOp>,
204-
ops_failure: Option<Vec<TxnOp>>,
205-
) -> Result<TxnResponse, StoreError> {
202+
new_value: &[u8],
203+
expected_value: &[u8],
204+
) -> Result<Option<Vec<u8>>, StoreError> {
206205
let key_string = key.to_string();
206+
207+
info!(
208+
"Executing transaction for key {}, value {}, expected value {}.",
209+
key_string,
210+
String::from_utf8_lossy(new_value),
211+
String::from_utf8_lossy(expected_value)
212+
);
213+
214+
let new_value_owned = new_value.to_owned();
215+
let expected_value_owned = expected_value.to_owned();
207216
let rx = Self::execute_store_op(async move {
208-
info!("Executing transaction for key {}.", key_string);
209217
Self::backing_store()
210-
.txn_kv(&key_string, cmps, ops_success, ops_failure)
218+
.put_kv_cas(&key_string, new_value_owned, expected_value_owned)
211219
.await
212220
});
213221

@@ -324,3 +332,8 @@ impl PersistentStore {
324332
persistent_store.lock().store = backing_store;
325333
}
326334
}
335+
336+
pub fn to_json_byte_vec<T: Serialize>(input: &T) -> Vec<u8> {
337+
let val = serde_json::to_value(input).expect("Failed conversion to serde_json value");
338+
serde_json::to_vec(&val).expect("Failed conversion to serde_json bytes")
339+
}

io-engine/src/store/etcd.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::store::store_defs::{
55
StoreError::MissingEntry, StoreKey, StoreValue, Txn as TxnErr, ValueString,
66
};
77
use async_trait::async_trait;
8-
use etcd_client::{Client, Compare, Txn, TxnOp, TxnResponse};
8+
use etcd_client::{Client, Compare, CompareOp, Error, Txn, TxnOp, TxnOpResponse};
99
use serde_json::Value;
1010
use snafu::ResultExt;
1111

@@ -49,23 +49,50 @@ impl Store for Etcd {
4949
Ok(())
5050
}
5151

52-
/// Executes a transaction for the given key. If the compares succeed, then
53-
/// ops_success will be executed atomically, otherwise ops_failure will be
54-
/// executed atomically.
55-
async fn txn_kv<K: StoreKey>(
52+
/// Executes a transaction for the given key. If the compare succeed, then
53+
/// new_value `Put` will be executed atomically, otherwise the current value
54+
/// will be `Get` and returned.
55+
async fn put_kv_cas<K: StoreKey>(
5656
&mut self,
5757
key: &K,
58-
cmps: Vec<Compare>,
59-
ops_success: Vec<TxnOp>,
60-
ops_failure: Option<Vec<TxnOp>>,
61-
) -> Result<TxnResponse, StoreError> {
62-
let fops = ops_failure.map_or(vec![], |v| v);
63-
self.0
64-
.txn(Txn::new().when(cmps).and_then(ops_success).or_else(fops))
58+
new_value: Vec<u8>,
59+
expected_value: Vec<u8>,
60+
) -> Result<Option<Vec<u8>>, StoreError> {
61+
let cmp = Compare::value(key.to_string(), CompareOp::Equal, expected_value);
62+
let put_op = TxnOp::put(key.to_string(), new_value, None);
63+
let or_else_op = TxnOp::get(key.to_string(), None);
64+
65+
let txn_resp = self
66+
.0
67+
.txn(
68+
Txn::new()
69+
.when([cmp])
70+
.and_then([put_op])
71+
.or_else([or_else_op]),
72+
)
6573
.await
6674
.context(TxnErr {
6775
key: key.to_string(),
68-
})
76+
})?;
77+
78+
if !txn_resp.succeeded() {
79+
if let Some(TxnOpResponse::Get(g)) = &txn_resp.op_responses().first() {
80+
if let Some(kv) = g.kvs().first() {
81+
let current_value = kv.value();
82+
return Ok(Some(current_value.to_owned()));
83+
}
84+
} else {
85+
return Err(StoreError::Txn {
86+
key: key.to_string(),
87+
source: Error::IoError(std::io::Error::new(
88+
std::io::ErrorKind::Other,
89+
"Requested TxnOpResponse::Get not found",
90+
)),
91+
});
92+
}
93+
}
94+
95+
Ok(None)
6996
}
7097

7198
/// 'Get' the value for the given key from etcd.

io-engine/src/store/store_defs.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Definition of a trait for a key-value store together with its error codes.
22
33
use async_trait::async_trait;
4-
use etcd_client::{Compare, Error, TxnOp, TxnResponse};
4+
use etcd_client::Error;
55
use serde_json::{Error as SerdeError, Value};
66
use snafu::Snafu;
77

@@ -105,13 +105,13 @@ pub trait Store: Sync + Send + Clone {
105105
value: &V,
106106
) -> Result<(), StoreError>;
107107

108-
async fn txn_kv<K: StoreKey>(
108+
/// Put an entry by doing compare-and-swap with expected value.
109+
async fn put_kv_cas<K: StoreKey>(
109110
&mut self,
110111
key: &K,
111-
cmps: Vec<Compare>,
112-
ops_success: Vec<TxnOp>,
113-
ops_failure: Option<Vec<TxnOp>>,
114-
) -> Result<TxnResponse, StoreError>;
112+
new_value: Vec<u8>,
113+
expected_value: Vec<u8>,
114+
) -> Result<Option<Vec<u8>>, StoreError>;
115115

116116
/// Get an entry from the store.
117117
async fn get_kv<K: StoreKey>(&mut self, key: &K) -> Result<Value, StoreError>;

0 commit comments

Comments
 (0)