Skip to content

Commit d05dff8

Browse files
author
mayastor-bors
committed
chore(bors): merge pull request #965
965: Add Volume Health information to release/2.7 r=tiagolobocastro a=tiagolobocastro See related PR's for more information. Co-authored-by: mayastor-bors <[email protected]>
2 parents 7446968 + ffe5e5a commit d05dff8

File tree

27 files changed

+1759
-177
lines changed

27 files changed

+1759
-177
lines changed

control-plane/agents/src/bin/core/controller/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
pub(crate) mod io_engine;
44
/// Various policies' definitions(e.g. rebuild policy)
55
pub(crate) mod policies;
6+
mod pstor_cache;
67
/// reconciliation logic
78
pub(crate) mod reconciler;
89
/// registry with node and all its resources
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use agents::errors::SvcError;
2+
use snafu::ResultExt;
3+
use std::collections::BTreeMap;
4+
use stor_port::{
5+
pstor,
6+
pstor::{error::DeserialiseValue, Error, ObjectKey, StorableObject, StoreWatchReceiver},
7+
};
8+
9+
/// A persistent store cache, which fetches all entries and them allows reading
10+
/// them via the `pstor::StoreObj` interface.
11+
#[derive(Clone)]
12+
pub(crate) struct PStorCache {
13+
entries: BTreeMap<String, serde_json::Value>,
14+
}
15+
16+
impl PStorCache {
17+
/// Create a new `PStorCache`.
18+
pub(crate) async fn new(
19+
pstor: &mut impl pstor::StoreKv,
20+
page_size: i64,
21+
prefix: &str,
22+
) -> Result<Self, SvcError> {
23+
let entries = pstor
24+
.get_values_paged_all(prefix, page_size)
25+
.await?
26+
.into_iter()
27+
.collect::<BTreeMap<_, _>>();
28+
29+
Ok(Self { entries })
30+
}
31+
}
32+
33+
#[async_trait::async_trait]
34+
impl pstor::StoreObj for PStorCache {
35+
async fn put_obj<O: StorableObject>(&mut self, _object: &O) -> Result<(), Error> {
36+
unimplemented!()
37+
}
38+
39+
async fn get_obj<O: StorableObject>(&mut self, key: &O::Key) -> Result<O, Error> {
40+
let key = key.key();
41+
match self.entries.get(&key) {
42+
Some(kv) => Ok(
43+
serde_json::from_value(kv.clone()).context(DeserialiseValue {
44+
value: kv.to_string(),
45+
})?,
46+
),
47+
None => Err(Error::MissingEntry { key }),
48+
}
49+
}
50+
51+
async fn watch_obj<K: ObjectKey>(&mut self, _key: &K) -> Result<StoreWatchReceiver, Error> {
52+
unimplemented!()
53+
}
54+
}

control-plane/agents/src/bin/core/controller/registry.rs

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
use super::{resources::operations_helper::*, wrapper::NodeWrapper};
1717
use crate::{
1818
controller::{
19+
pstor_cache::PStorCache,
1920
reconciler::ReconcilerControl,
21+
resources::VolumeHealthWatcher,
2022
task_poller::{PollEvent, PollTriggerEvent},
2123
wrapper::InternalOps,
2224
},
@@ -36,7 +38,7 @@ use stor_port::{
3638
},
3739
types::v0::{
3840
store::{
39-
nexus_persistence::delete_all_v1_nexus_info,
41+
nexus_persistence::{delete_all_v1_nexus_info, NexusInfo},
4042
registry::{ControlPlaneService, CoreRegistryConfig, NodeRegistration},
4143
volume::InitiatorAC,
4244
},
@@ -74,6 +76,7 @@ pub(crate) struct RegistryInner<S: Store> {
7476
nodes: NodesMapLocked,
7577
/// spec (aka desired state) of the various resources.
7678
specs: ResourceSpecsLocked,
79+
health: Option<VolumeHealthWatcher>,
7780
/// period to refresh the cache.
7881
cache_period: std::time::Duration,
7982
store: Arc<Mutex<S>>,
@@ -136,6 +139,7 @@ impl Registry {
136139
thin_args: ThinArgs,
137140
ha_enabled: bool,
138141
etcd_max_page_size: i64,
142+
no_volume_health: bool,
139143
) -> Result<Self, SvcError> {
140144
let store_endpoint = Self::format_store_endpoint(&store_url);
141145
tracing::info!("Connecting to persistent store at {}", store_endpoint);
@@ -165,6 +169,11 @@ impl Registry {
165169
inner: Arc::new(RegistryInner {
166170
nodes: Default::default(),
167171
specs: ResourceSpecsLocked::new(),
172+
health: if no_volume_health {
173+
None
174+
} else {
175+
Some(VolumeHealthWatcher::new(&store))
176+
},
168177
cache_period,
169178
store: Arc::new(Mutex::new(store.clone())),
170179
store_timeout,
@@ -322,6 +331,19 @@ impl Registry {
322331
pub(crate) fn specs(&self) -> &ResourceSpecsLocked {
323332
&self.specs
324333
}
334+
/// Get a reference to the volume health watcher.
335+
pub(crate) fn health(&self) -> Option<&VolumeHealthWatcher> {
336+
self.health.as_ref()
337+
}
338+
/// Retain health info as per the given retain closure.
339+
pub(crate) fn health_retain<F: FnMut(&uuid::Uuid, &mut Arc<NexusInfo>) -> bool>(
340+
&self,
341+
retain: F,
342+
) {
343+
if let Some(h) = self.health() {
344+
h.retain(retain)
345+
}
346+
}
325347

326348
/// Serialized write to the persistent store.
327349
pub(crate) async fn store_obj<O: StorableObject>(&self, object: &O) -> Result<(), SvcError> {
@@ -386,6 +408,32 @@ impl Registry {
386408
}
387409
}
388410

411+
/// Serialized delete to the persistent store, with a prefix.
412+
pub(crate) async fn delete_kv_prefix<K: StoreKey>(&self, key: &K) -> Result<(), SvcError> {
413+
let store = self.store.clone();
414+
match tokio::time::timeout(self.store_timeout, async move {
415+
let mut store = store.lock().await;
416+
let key = key.to_string();
417+
Self::op_with_threshold(async move { store.delete_values_prefix(&key).await }).await
418+
})
419+
.await
420+
{
421+
Ok(result) => match result {
422+
Ok(_) => Ok(()),
423+
// already deleted, no problem
424+
Err(StoreError::MissingEntry { .. }) => {
425+
tracing::warn!("Entry with key {} missing from store.", key.to_string());
426+
Ok(())
427+
}
428+
Err(error) => Err(SvcError::from(error)),
429+
},
430+
Err(_) => Err(SvcError::from(StoreError::Timeout {
431+
operation: "Delete".to_string(),
432+
timeout: self.store_timeout,
433+
})),
434+
}
435+
}
436+
389437
async fn op_with_threshold<F, O>(future: F) -> O
390438
where
391439
F: Future<Output = O>,
@@ -437,14 +485,46 @@ impl Registry {
437485

438486
/// Initialise the registry with the content of the persistent store.
439487
async fn init(&self) -> Result<(), SvcError> {
488+
{
489+
let store = self.store.clone();
490+
let mut store = store.lock().await;
491+
self.specs
492+
.init(
493+
store.deref_mut(),
494+
self.legacy_prefix_present,
495+
self.etcd_max_page_size,
496+
)
497+
.await?;
498+
}
499+
self.init_health().await?;
500+
501+
Ok(())
502+
}
503+
504+
async fn init_health(&self) -> Result<(), SvcError> {
505+
let Some(health) = &self.health else {
506+
return Ok(());
507+
};
508+
health.init().await?;
440509
let mut store = self.store.lock().await;
441-
self.specs
442-
.init(
443-
store.deref_mut(),
444-
self.legacy_prefix_present,
445-
self.etcd_max_page_size,
446-
)
447-
.await?;
510+
let mut pstor_cache = PStorCache::new(
511+
store.deref_mut(),
512+
self.etcd_max_page_size,
513+
health.key_prefix(),
514+
)
515+
.await?;
516+
for volume in self.specs.volumes_rsc() {
517+
let Some(nexus_info_key) = volume.immutable_ref().health_info_key() else {
518+
continue;
519+
};
520+
let nexus_info_key = nexus_info_key.nexus_info_key();
521+
let Ok(mut info) = pstor_cache.get_obj::<NexusInfo>(&nexus_info_key).await else {
522+
continue;
523+
};
524+
info.uuid = nexus_info_key.nexus_id().clone();
525+
info.volume_uuid = Some(volume.uuid().clone());
526+
health.if_empty_insert(info);
527+
}
448528
Ok(())
449529
}
450530

control-plane/agents/src/bin/core/controller/resources/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub(crate) mod operations_helper;
2323
/// Generic resources map.
2424
pub(crate) mod resource_map;
2525

26+
pub(crate) use volume::VolumeHealthWatcher;
27+
2628
impl<T: OperationSequencer + Sized, R> Drop for OperationGuard<T, R> {
2729
fn drop(&mut self) {
2830
self.unlock();

control-plane/agents/src/bin/core/controller/resources/operations_helper.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -990,23 +990,10 @@ impl ResourceSpecsLocked {
990990
where
991991
T: DeserializeOwned,
992992
{
993-
let specs: Vec<Result<T, serde_json::Error>> = values
994-
.iter()
995-
.map(|v| serde_json::from_value(v.clone()))
996-
.collect();
997-
998-
let mut result = vec![];
999-
for spec in specs {
1000-
match spec {
1001-
Ok(s) => {
1002-
result.push(s);
1003-
}
1004-
Err(e) => {
1005-
return Err(e);
1006-
}
1007-
}
1008-
}
1009-
Ok(result)
993+
values
994+
.into_iter()
995+
.map(serde_json::from_value)
996+
.collect::<Result<Vec<_>, serde_json::Error>>()
1010997
}
1011998

1012999
/// Populate the resource specs with data from the persistent store.
@@ -1031,7 +1018,7 @@ impl ResourceSpecsLocked {
10311018
.map_err(|e| SpecError::StoreGet {
10321019
source: Box::new(e),
10331020
})?;
1034-
let store_values = store_entries.iter().map(|e| e.1.clone()).collect();
1021+
let store_values = store_entries.into_iter().map(|e| e.1).collect();
10351022

10361023
let mut resource_specs = self.0.write();
10371024
match spec_type {
@@ -1040,6 +1027,7 @@ impl ResourceSpecsLocked {
10401027
Self::deserialise_specs::<VolumeSpec>(store_values).context(Deserialise {
10411028
obj_type: StorableObjectType::VolumeSpec,
10421029
})?;
1030+
// Add the volume watch here for all the specs OR after this call.
10431031
let ag_specs = get_affinity_group_specs(&specs);
10441032
resource_specs.volumes.populate(specs);
10451033
// Load the ag specs in memory, ag specs are not persisted in memory so we don't

control-plane/agents/src/bin/core/controller/resources/volume/mod.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
mod snapshot;
22

33
use super::{ResourceMutex, ResourceUid};
4-
use stor_port::types::v0::{
5-
store::volume::{AffinityGroupSpec, VolumeSpec},
6-
transport::VolumeId,
4+
use parking_lot::Mutex;
5+
use std::{collections::BTreeMap, sync::Arc};
6+
use stor_port::{
7+
pstor,
8+
types::v0::{
9+
store::{
10+
nexus_persistence::{NexusInfo, NexusInfoKey},
11+
volume::{AffinityGroupSpec, VolumeSpec},
12+
},
13+
transport::VolumeId,
14+
},
715
};
816

917
impl ResourceMutex<VolumeSpec> {
@@ -55,3 +63,96 @@ macro_rules! volume_span {
5563
};
5664
}
5765
crate::impl_trace_span!(volume_span, VolumeSpec);
66+
67+
use pstor::{StoreKv, StoreKvWatcher};
68+
use stor_port::types::v0::transport::NexusId;
69+
70+
pub(crate) struct VolumeHealthWatcher {
71+
watcher: Box<dyn StoreKvWatcher>,
72+
key_prefix: String,
73+
health: Arc<Mutex<BTreeMap<uuid::Uuid, Arc<NexusInfo>>>>,
74+
}
75+
impl std::fmt::Debug for VolumeHealthWatcher {
76+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77+
f.debug_struct("VolumeHealthWatcher").finish()
78+
}
79+
}
80+
81+
impl VolumeHealthWatcher {
82+
/// Create a new `Self`.
83+
pub(crate) fn new(store: &impl StoreKv) -> Self {
84+
let health = Arc::new(Mutex::new(BTreeMap::new()));
85+
86+
let health_cln = health.clone();
87+
let watcher = store.kv_watcher(move |arg| {
88+
let cnt = pstor::WatchResult::Continue;
89+
90+
if arg.value.is_empty() {
91+
match NexusInfoKey::parse_id(arg.updated_key) {
92+
Ok(id) => {
93+
tracing::debug!(key=%arg.updated_key, %id, "Removing key");
94+
health_cln.lock().remove(id.uuid());
95+
}
96+
Err(error) => {
97+
tracing::warn!(key=%arg.updated_key, error, "Received unexpected PStor Update");
98+
}
99+
}
100+
return cnt;
101+
}
102+
103+
let Ok(nexus_info) = serde_json::from_str::<NexusInfo>(arg.value) else {
104+
tracing::error!(
105+
key = arg.updated_key,
106+
value = arg.value,
107+
"Failed to parse health value information"
108+
);
109+
return cnt;
110+
};
111+
112+
match nexus_info.with_key(arg.updated_key) {
113+
Ok(Some(info)) => {
114+
tracing::debug!(?info, "Updating Health info");
115+
health_cln.lock().insert(*info.uuid, Arc::new(info));
116+
}
117+
Ok(None) => {
118+
tracing::warn!(key=%arg.updated_key, "Received unexpected PStor Update");
119+
}
120+
Err(error) => tracing::warn!(key=%arg.updated_key, %error, "Failed to parse uuids"),
121+
}
122+
123+
cnt
124+
});
125+
126+
Self {
127+
watcher,
128+
key_prefix: NexusInfoKey::key_prefix(),
129+
health,
130+
}
131+
}
132+
/// Get the health key prefix.
133+
pub(crate) fn key_prefix(&self) -> &str {
134+
&self.key_prefix
135+
}
136+
/// Start the watcher.
137+
/// All registered pstor key updates will be propagated via the callback.
138+
pub(crate) async fn init(&self) -> Result<(), agents::errors::SvcError> {
139+
self.watcher
140+
.watch(pstor::WatchKey::new(NexusInfoKey::key_prefix()), ())?;
141+
Ok(())
142+
}
143+
/// If the health info hasn't been added yet, insert it.
144+
pub(crate) fn if_empty_insert(&self, info: NexusInfo) {
145+
let mut health = self.health.lock();
146+
if health.get(&info.uuid).is_none() {
147+
health.insert(*info.uuid, Arc::new(info));
148+
}
149+
}
150+
/// Get the volume health info for the given target.
151+
pub(crate) fn health(&self, target: &NexusId) -> Option<Arc<NexusInfo>> {
152+
self.health.lock().get(target.uuid()).cloned()
153+
}
154+
/// Expose a retain interface, allowing clean up of objects.
155+
pub(crate) fn retain<F: FnMut(&uuid::Uuid, &mut Arc<NexusInfo>) -> bool>(&self, retain: F) {
156+
self.health.lock().retain(retain);
157+
}
158+
}

0 commit comments

Comments
 (0)