Skip to content

Commit 1182c4a

Browse files
mayastor-borsAbhinandan-Purkait
andcommitted
chore(bors): merge pull request #983
983: fix: allow scaling up of volumes of affinity group r=Abhinandan-Purkait a=Abhinandan-Purkait ### Issue Volumes that are part of the affinity group consider restriction while scaling up for first time. ### Fix The fix relaxes the restriction on first scale up. Co-authored-by: Abhinandan Purkait <[email protected]>
2 parents d05dff8 + 8f0e74a commit 1182c4a

File tree

2 files changed

+141
-32
lines changed

2 files changed

+141
-32
lines changed

control-plane/agents/src/bin/core/controller/scheduling/affinity_group.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::controller::{registry::Registry, resources::ResourceUid};
22
use std::collections::HashMap;
33
use stor_port::types::v0::{
4-
store::volume::{AffinityGroupSpec, VolumeSpec},
4+
store::volume::{AffinityGroupSpec, VolumeOperation, VolumeSpec},
55
transport::{NodeId, PoolId},
66
};
77

@@ -12,32 +12,34 @@ pub(crate) fn get_restricted_nodes(
1212
affinity_group_spec: &AffinityGroupSpec,
1313
registry: &Registry,
1414
) -> Vec<NodeId> {
15-
let mut restricted_nodes: Vec<NodeId> = Vec::new();
16-
if volume_spec.num_replicas == 1 {
17-
let specs = registry.specs();
18-
// Get the list of volumes being part of the Affinity Group.
19-
let affinity_group_volumes = affinity_group_spec.volumes();
20-
// Remove the current volume from the list, as it is under creation.
21-
let affinity_group_volumes: Vec<_> = affinity_group_volumes
22-
.iter()
23-
.filter(|volume_id| *volume_id != volume_spec.uid())
24-
.collect();
25-
// List of restricted nodes, which already has replicas from the volumes of the
26-
// Affinity Group.
27-
for volume in &affinity_group_volumes {
28-
// Fetch the list of nodes where the volume replicas are placed. If some
29-
// volume doesn't exist yet, this list will be empty
30-
// for that volume.
31-
let node_ids = specs.volume_replica_nodes(volume);
32-
// Add a new node in the list if it doesn't already exist.
33-
let filtered_nodes: Vec<NodeId> = node_ids
34-
.into_iter()
35-
.filter(|id| !restricted_nodes.contains(id))
36-
.collect();
37-
restricted_nodes.extend(filtered_nodes);
38-
}
15+
// Since num_replicas in VolumeSpec is equal to 1 when scaling up from 1 replica to 2 replicas
16+
// for the volume of an affinity group, if the number of nodes in cluster is equal to number of
17+
// volumes in the affinity group we will exhaust nodes for scale up. This early exit relaxes the
18+
// restriction for the first scale up (only support 1 replica add at a time).
19+
if matches!(
20+
volume_spec
21+
.operation
22+
.as_ref()
23+
.map(|op| op.operation.clone()),
24+
Some(VolumeOperation::SetReplica(2))
25+
) || volume_spec.num_replicas != 1
26+
{
27+
return Vec::new();
3928
}
40-
restricted_nodes
29+
30+
let specs = registry.specs();
31+
// List of restricted nodes, which already host replicas from the volumes of the affinity group
32+
affinity_group_spec
33+
.volumes()
34+
.iter()
35+
.filter(|&vid| vid != volume_spec.uid())
36+
.flat_map(|vid| specs.volume_replica_nodes(vid))
37+
.fold(Vec::new(), |mut acc, node_id| {
38+
if !acc.contains(&node_id) {
39+
acc.push(node_id);
40+
}
41+
acc
42+
})
4143
}
4244

4345
/// Get the map of pool to the number of the Affinity Group replica on the pool.

control-plane/agents/src/bin/core/tests/volume/affinity_group.rs

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
use deployer_cluster::{Cluster, ClusterBuilder};
22
use grpc::operations::{registry::traits::RegistryOperations, volume::traits::VolumeOperations};
3-
use stor_port::types::v0::transport::{AffinityGroup, CreateVolume, GetSpecs, VolumeId};
3+
use stor_port::types::v0::transport::{
4+
AffinityGroup, CreateVolume, DestroyVolume, GetSpecs, SetVolumeReplica, VolumeId,
5+
};
46
use tracing::info;
57

68
#[tokio::test]
79
async fn affinity_group() {
810
let cluster = ClusterBuilder::builder()
911
.with_rest(false)
1012
.with_agents(vec!["core"])
11-
.with_io_engines(2)
12-
.with_pools(2)
13+
.with_io_engines(3)
14+
.with_pools(3)
1315
.with_cache_period("1s")
1416
.build()
1517
.await
1618
.unwrap();
1719

1820
startup_test(&cluster).await;
21+
scale_up_test(&cluster).await;
1922
}
2023

2124
async fn startup_test(cluster: &Cluster) {
@@ -31,7 +34,7 @@ async fn startup_test(cluster: &Cluster) {
3134

3235
// Create all the volumes.
3336
for &item in &vols {
34-
let _ = volume_client
37+
volume_client
3538
.create(
3639
&CreateVolume {
3740
uuid: VolumeId::try_from(item.1).unwrap(),
@@ -42,7 +45,8 @@ async fn startup_test(cluster: &Cluster) {
4245
},
4346
None,
4447
)
45-
.await;
48+
.await
49+
.expect("Volume creation should succeed");
4650
}
4751

4852
// Restart the core-agent.
@@ -61,7 +65,7 @@ async fn startup_test(cluster: &Cluster) {
6165
let specs = registry_client
6266
.get_specs(&GetSpecs {}, None)
6367
.await
64-
.expect("should be able to fetch specs");
68+
.expect("Should be able to fetch specs");
6569

6670
info!("Affinity Group Specs: {:?}", specs.affinity_groups);
6771

@@ -86,4 +90,107 @@ async fn startup_test(cluster: &Cluster) {
8690
_ => {}
8791
}
8892
}
93+
94+
// Create all the volumes.
95+
for &item in &vols {
96+
volume_client
97+
.destroy(
98+
&DestroyVolume {
99+
uuid: VolumeId::try_from(item.1).unwrap(),
100+
},
101+
None,
102+
)
103+
.await
104+
.expect("Volume deletion should succeed");
105+
}
106+
}
107+
108+
async fn scale_up_test(cluster: &Cluster) {
109+
let vols = vec![
110+
(Some("ag1"), "eba487d9-0b57-407b-8b48-0b631a372183"),
111+
(Some("ag1"), "359b7e1a-b724-443b-98b4-e6d97fabbb60"),
112+
(Some("ag1"), "f2296d6a-77a6-401d-aad3-ccdc247b0a56"),
113+
];
114+
115+
let registry_client = cluster.grpc_client().registry();
116+
117+
// The Affinity Group specs should now have been loaded in memory.
118+
// Fetch the specs.
119+
let specs = registry_client
120+
.get_specs(&GetSpecs {}, None)
121+
.await
122+
.expect("Should be able to fetch specs");
123+
124+
// Fail if there are affinity group specs from previous test lingering around.
125+
assert_eq!(specs.affinity_groups.len(), 0);
126+
127+
let volume_client = cluster.grpc_client().volume();
128+
129+
for &item in &vols {
130+
volume_client
131+
.create(
132+
&CreateVolume {
133+
uuid: VolumeId::try_from(item.1).unwrap(),
134+
size: 5242880,
135+
replicas: 1,
136+
affinity_group: item.0.map(|val| AffinityGroup::new(val.to_string())),
137+
..Default::default()
138+
},
139+
None,
140+
)
141+
.await
142+
.expect("Volume creation should succeed");
143+
}
144+
145+
for &item in &vols {
146+
volume_client
147+
.set_replica(
148+
&SetVolumeReplica {
149+
uuid: VolumeId::try_from(item.1).unwrap(),
150+
replicas: 2,
151+
},
152+
None,
153+
)
154+
.await
155+
.expect("Scale up should not fail");
156+
}
157+
158+
for &item in &vols {
159+
volume_client
160+
.set_replica(
161+
&SetVolumeReplica {
162+
uuid: VolumeId::try_from(item.1).unwrap(),
163+
replicas: 3,
164+
},
165+
None,
166+
)
167+
.await
168+
.expect("Scale up should not fail");
169+
}
170+
171+
for &item in &vols {
172+
volume_client
173+
.set_replica(
174+
&SetVolumeReplica {
175+
uuid: VolumeId::try_from(item.1).unwrap(),
176+
replicas: 2,
177+
},
178+
None,
179+
)
180+
.await
181+
.expect("Scale down should not fail");
182+
}
183+
184+
for &item in &vols {
185+
volume_client
186+
.set_replica(
187+
&SetVolumeReplica {
188+
uuid: VolumeId::try_from(item.1).unwrap(),
189+
replicas: 1,
190+
},
191+
None,
192+
)
193+
.await
194+
.expect_err("Scale down should fail");
195+
}
89196
}

0 commit comments

Comments
 (0)