Skip to content

Commit 04c2f2d

Browse files
mayastor-borsAbhinandan-Purkait
andcommitted
chore(bors): merge pull request #997
997: feat(stsAffinityGroup): honor stsAffinity on backup restores via external tools r=Abhinandan-Purkait a=Abhinandan-Purkait Cherry-pick #991 Co-authored-by: Abhinandan Purkait <[email protected]>
2 parents 712a70c + 4dc7eea commit 04c2f2d

File tree

2 files changed

+177
-58
lines changed

2 files changed

+177
-58
lines changed

control-plane/csi-driver/src/bin/controller/controller.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,21 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
394394
debug!(
395395
"Volume {volume_uuid} already exists and is compatible with requested config"
396396
);
397+
let sts_affinity_group_name = context.sts_affinity_group().await?;
398+
if let Some(info) = sts_affinity_group_name {
399+
info.set_sts_affinity_annotation().await?;
400+
debug!(
401+
volume.uuid = volume_uuid,
402+
"Volume successfully patched with affinity group annotation"
403+
);
404+
}
397405
}
398406
// If the volume doesn't exist, create it.
399407
Err(ApiClientError::ResourceNotExists(_)) => {
400408
let volume_topology = context_into_topology(&context);
401409

402-
let sts_affinity_group_name = context.sts_affinity_group();
410+
let sts_affinity_group_info = context.sts_affinity_group().await?;
411+
let affinity_group_name = sts_affinity_group_info.as_ref().map(|info| info.name());
403412
let max_snapshots = context.max_snapshots();
404413
let encrypted = context.encrypted().unwrap_or_default();
405414

@@ -418,7 +427,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
418427
size,
419428
volume_topology,
420429
thin,
421-
sts_affinity_group_name.clone().map(AffinityGroup::new),
430+
affinity_group_name.map(AffinityGroup::new),
422431
max_snapshots,
423432
encrypted,
424433
)
@@ -432,7 +441,7 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
432441
size,
433442
volume_topology,
434443
thin,
435-
sts_affinity_group_name.clone().map(AffinityGroup::new),
444+
affinity_group_name.map(AffinityGroup::new),
436445
max_snapshots,
437446
encrypted,
438447
)
@@ -448,12 +457,13 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
448457
volume_context.insert("fsId".to_string(), volume_uuid.clone());
449458
}
450459

451-
if let Some(ag_name) = sts_affinity_group_name {
460+
if let Some(info) = sts_affinity_group_info {
452461
debug!(
453462
volume.uuid = volume_uuid,
454-
volume.affinity_group = ag_name,
463+
volume.affinity_group = info.name(),
455464
"Volume successfully created"
456465
);
466+
info.set_sts_affinity_annotation().await?;
457467
} else {
458468
debug!(volume.uuid = volume_uuid, "Volume successfully created");
459469
}

control-plane/csi-driver/src/context.rs

Lines changed: 162 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
use crate::filesystem::FileSystem;
2+
use k8s_openapi::api::core::v1::PersistentVolumeClaim;
3+
use kube::api::{Patch, PatchParams};
4+
use kube::{Api, Client};
25
use regex::Regex;
36
use std::{
47
collections::HashMap,
58
convert::AsRef,
69
num::ParseIntError,
710
str::{FromStr, ParseBoolError},
811
};
12+
use stor_port::platform;
913
use stor_port::types::v0::openapi::models::VolumeShareProtocol;
1014
use strum_macros::{AsRefStr, Display, EnumString};
1115
use tracing::log::warn;
16+
use tracing::{debug, trace};
1217
use utils::K8S_STS_PVC_NAMING_REGEX;
1318

1419
use uuid::{Error as UuidError, Uuid};
@@ -472,17 +477,21 @@ pub(crate) fn validate_topology_params(
472477
Ok(())
473478
}
474479

480+
const ANNOTATION_KEY: &str = "openebs.io/stsAffinityGroup";
481+
475482
/// Volume Creation parameters.
476483
#[allow(dead_code)]
477484
#[derive(Debug)]
478485
pub struct CreateParams {
479486
publish_params: PublishParams,
480487
share_protocol: VolumeShareProtocol,
481488
replica_count: u8,
482-
sts_affinity_group: Option<String>,
489+
sts_affinity_group: Option<bool>,
483490
clone_fs_id_as_volume_id: Option<bool>,
484491
max_snapshots: Option<u32>,
485492
encrypted: Option<bool>,
493+
pvc_name: Option<String>,
494+
pvc_namespace: Option<String>,
486495
}
487496
impl CreateParams {
488497
/// Get the `Parameters::PublishParams` value.
@@ -497,11 +506,6 @@ impl CreateParams {
497506
pub fn replica_count(&self) -> u8 {
498507
self.replica_count
499508
}
500-
/// Get the final affinity group name, using the `Parameters::PvcName, Parameters::PvcNamespace,
501-
/// Parameters::AffinityGroup` values.
502-
pub fn sts_affinity_group(&self) -> &Option<String> {
503-
&self.sts_affinity_group
504-
}
505509
/// Get the `Parameters::CloneFsIdAsVolumeId` value.
506510
pub fn clone_fs_id_as_volume_id(&self) -> &Option<bool> {
507511
&self.clone_fs_id_as_volume_id
@@ -514,7 +518,124 @@ impl CreateParams {
514518
pub fn encrypted(&self) -> Option<bool> {
515519
self.encrypted
516520
}
521+
/// Get the sts_affinity_group name from annotations if exists else generate it.
522+
pub async fn sts_affinity_group(&self) -> Result<Option<StsAffinityGroupInfo>, tonic::Status> {
523+
if !self.sts_affinity_group.unwrap_or(false) {
524+
return Ok(None);
525+
}
526+
527+
if platform::current_platform_type() != platform::PlatformType::K8s {
528+
debug!("Detected non-K8s platform, skipping annotation fetch for STS affinity group");
529+
return Ok(None);
530+
}
531+
532+
let pvc_name = self
533+
.pvc_name
534+
.as_ref()
535+
.ok_or_else(|| tonic::Status::invalid_argument("PVC name is missing"))?;
536+
let pvc_namespace = self
537+
.pvc_namespace
538+
.as_ref()
539+
.ok_or_else(|| tonic::Status::invalid_argument("PVC namespace is missing"))?;
540+
541+
let client = Client::try_default()
542+
.await
543+
.map_err(|e| tonic::Status::aborted(e.to_string()))?;
544+
let pvc_api: Api<PersistentVolumeClaim> = Api::namespaced(client, pvc_namespace);
545+
546+
let pvc = pvc_api
547+
.get(pvc_name)
548+
.await
549+
.map_err(|e| tonic::Status::aborted(e.to_string()))?;
550+
551+
let annotation_value = pvc
552+
.metadata
553+
.annotations
554+
.as_ref()
555+
.and_then(|a| a.get(ANNOTATION_KEY))
556+
.map(|val| val.to_string());
557+
558+
let generated_value = generate_sts_affinity_group_name(pvc_name, pvc_namespace);
559+
560+
// 1. If the annotation was set and the generated name is different, we should use the annotation value, this can happen at the point of restore, so we will just stick to the set value.
561+
// 2. If the annotation was set and the generated name is same, we should use the annotation value, this can happen at the point of import of the same volume.
562+
// 3. If the annotation was not set and the generated name is not None, we should use the generated value to patch, this would be at the first creation
563+
let (annotation_exists, final_name) = match (&annotation_value, &generated_value) {
564+
(Some(a_val), Some(gen_val)) => {
565+
if a_val != gen_val {
566+
warn!(
567+
"Conflicting STS affinity group names: annotation = {a_val}, generated = {gen_val}"
568+
);
569+
}
570+
(true, a_val)
571+
}
572+
(Some(a_val), None) => (true, a_val),
573+
(None, Some(gen_val)) => (false, gen_val),
574+
(None, None) => return Ok(None),
575+
};
576+
577+
Ok(Some(StsAffinityGroupInfo {
578+
pvc_name: pvc_name.to_string(),
579+
pvc_namespace: pvc_namespace.to_string(),
580+
name: final_name.to_string(),
581+
annotation_exists,
582+
}))
583+
}
584+
}
585+
586+
/// Information about the STS affinity group.
587+
pub struct StsAffinityGroupInfo {
588+
pvc_name: String,
589+
pvc_namespace: String,
590+
name: String,
591+
annotation_exists: bool,
517592
}
593+
594+
impl StsAffinityGroupInfo {
595+
/// If sts_affinity_group was requested we should patch pvc with it.
596+
pub async fn set_sts_affinity_annotation(&self) -> Result<(), tonic::Status> {
597+
if self.annotation_exists {
598+
trace!(pvc.name=%self.pvc_name, pvc.stsAffinityGroup=%self.name(), "Annotation for STS affinity group already present, skipping patch");
599+
return Ok(());
600+
}
601+
let client = Client::try_default()
602+
.await
603+
.map_err(|e| tonic::Status::aborted(e.to_string()))?;
604+
605+
let pvc_api: Api<PersistentVolumeClaim> = Api::namespaced(client, &self.pvc_namespace);
606+
607+
let patch = serde_json::json!({
608+
"metadata": {
609+
"annotations": {
610+
ANNOTATION_KEY: self.name()
611+
}
612+
}
613+
});
614+
615+
pvc_api
616+
.patch(
617+
&self.pvc_name,
618+
&PatchParams::default(),
619+
&Patch::Merge(&patch),
620+
)
621+
.await
622+
.map_err(|e| tonic::Status::internal(format!("Failed to patch PVC: {e}")))?;
623+
624+
trace!(
625+
"Patched annotation: {ANNOTATION_KEY}: {} on the pvc {} in namespace {}",
626+
self.name(),
627+
self.pvc_name,
628+
self.pvc_namespace
629+
);
630+
631+
Ok(())
632+
}
633+
634+
pub fn name(&self) -> &String {
635+
&self.name
636+
}
637+
}
638+
518639
impl TryFrom<&HashMap<String, String>> for CreateParams {
519640
type Error = tonic::Status;
520641

@@ -547,14 +668,14 @@ impl TryFrom<&HashMap<String, String>> for CreateParams {
547668
)
548669
})?;
549670

550-
let sts_affinity_group_name = if sts_affinity_group.unwrap_or(false) {
551-
generate_sts_affinity_group_name(
552-
&args.get(Parameters::PvcName.as_ref()).cloned(),
553-
&args.get(Parameters::PvcNamespace.as_ref()).cloned(),
554-
)
555-
} else {
556-
None
557-
};
671+
let pvc_name = args.get(Parameters::PvcName.as_ref()).cloned();
672+
let pvc_namespace = args.get(Parameters::PvcNamespace.as_ref()).cloned();
673+
674+
if sts_affinity_group == Some(true) && (pvc_name.is_none() || pvc_namespace.is_none()) {
675+
return Err(tonic::Status::invalid_argument(
676+
"`pvcName` and `pvcNamespace` must be present when `stsAffinityGroup` is true",
677+
));
678+
}
558679

559680
let clone_fs_id_as_volume_id = Parameters::clone_fs_id_as_volume_id(
560681
args.get(Parameters::CloneFsIdAsVolumeId.as_ref()),
@@ -575,41 +696,33 @@ impl TryFrom<&HashMap<String, String>> for CreateParams {
575696
publish_params,
576697
share_protocol,
577698
replica_count,
578-
sts_affinity_group: sts_affinity_group_name,
699+
sts_affinity_group,
579700
clone_fs_id_as_volume_id,
580701
max_snapshots,
581702
encrypted,
703+
pvc_name,
704+
pvc_namespace,
582705
})
583706
}
584707
}
585708

586709
// Generate a affinity group name from the parameters.
587710
// 1. Both pvc name and ns should be valid.
588711
// 2. Pvc name should follow the sts pvc naming convention.
589-
fn generate_sts_affinity_group_name(
590-
pvc_name: &Option<String>,
591-
pvc_ns: &Option<String>,
592-
) -> Option<String> {
593-
match (pvc_name, pvc_ns) {
594-
(Some(pvc_name), Some(pvc_ns)) => {
595-
let re = Regex::from_str(K8S_STS_PVC_NAMING_REGEX);
596-
if let Ok(regex) = re {
597-
if regex.is_match(pvc_name.as_str()) {
598-
if let Some(captures) = regex.captures(pvc_name.as_str()) {
599-
if let Some(common_binding) = captures.get(1) {
600-
return Some(format!("{pvc_ns}/{}", common_binding.as_str()));
601-
}
602-
}
712+
fn generate_sts_affinity_group_name(pvc_name: &String, pvc_ns: &String) -> Option<String> {
713+
let re = Regex::from_str(K8S_STS_PVC_NAMING_REGEX);
714+
if let Ok(regex) = re {
715+
if regex.is_match(pvc_name.as_str()) {
716+
if let Some(captures) = regex.captures(pvc_name.as_str()) {
717+
if let Some(common_binding) = captures.get(1) {
718+
let name = format!("{pvc_ns}/{}", common_binding.as_str());
719+
return Some(name);
603720
}
604721
}
605-
warn!("PVC Name: {pvc_name} is not a valid statefulset pvc naming format, not triggering statefulset volume replica anti-affinity");
606-
None
607-
}
608-
_ => {
609-
warn!("Invalid PVC Name: {pvc_name:?} or PVC Namespace: {pvc_ns:?}, not triggering statefulset volume replica anti-affinity");
610-
None
611722
}
612723
}
724+
warn!("PVC Name: {pvc_name} is not a valid statefulset pvc naming format, not triggering statefulset volume replica anti-affinity");
725+
None
613726
}
614727

615728
#[derive(EnumString, Clone, Debug, Eq, PartialEq)]
@@ -651,16 +764,16 @@ mod tests {
651764
use crate::context::generate_sts_affinity_group_name;
652765

653766
struct VolGrpTestEntry {
654-
pvc_name: Option<String>,
655-
pvc_namespace: Option<String>,
767+
pvc_name: String,
768+
pvc_namespace: String,
656769
result: Option<String>,
657770
}
658771

659772
impl VolGrpTestEntry {
660-
fn new(pvc_name: Option<&str>, pvc_namespace: Option<&str>, result: Option<&str>) -> Self {
773+
fn new(pvc_name: &str, pvc_namespace: &str, result: Option<&str>) -> Self {
661774
Self {
662-
pvc_name: pvc_name.map(|s| s.to_string()),
663-
pvc_namespace: pvc_namespace.map(|s| s.to_string()),
775+
pvc_name: pvc_name.to_string(),
776+
pvc_namespace: pvc_namespace.to_string(),
664777
result: result.map(|s| s.to_string()),
665778
}
666779
}
@@ -669,25 +782,21 @@ mod tests {
669782
#[test]
670783
fn ag_name_generator() {
671784
let vol_grp_test_entries: Vec<VolGrpTestEntry> = vec![
785+
VolGrpTestEntry::new("mongo-db-0", "default", Some("default/mongo-db")),
786+
VolGrpTestEntry::new("", "default", None),
787+
VolGrpTestEntry::new("mongo-db-0", "", Some("/mongo-db")),
788+
VolGrpTestEntry::new("", "", None),
672789
VolGrpTestEntry::new(
673-
Some("mongo-db-0"),
674-
Some("default"),
675-
Some("default/mongo-db"),
676-
),
677-
VolGrpTestEntry::new(None, Some("default"), None),
678-
VolGrpTestEntry::new(Some("mongo-db-0"), None, None),
679-
VolGrpTestEntry::new(None, None, None),
680-
VolGrpTestEntry::new(
681-
Some("mongo-db-2424"),
682-
Some("mayastor-123"),
790+
"mongo-db-2424",
791+
"mayastor-123",
683792
Some("mayastor-123/mongo-db"),
684793
),
685794
VolGrpTestEntry::new(
686-
Some("mongo-db-123-abcd-2"),
687-
Some("default"),
795+
"mongo-db-123-abcd-2",
796+
"default",
688797
Some("default/mongo-db-123-abcd"),
689798
),
690-
VolGrpTestEntry::new(Some("mongo-db-123-abcd"), Some("xyz-12"), None),
799+
VolGrpTestEntry::new("mongo-db-123-abcd", "xyz-12", None),
691800
];
692801

693802
for test_entry in vol_grp_test_entries {

0 commit comments

Comments
 (0)