Skip to content

Commit 4a631f0

Browse files
authored
Basic version of RS (#241)
* Basic version of RS Signed-off-by: ichAB <[email protected]> * fix rkl test Signed-off-by: ichAB <[email protected]> --------- Signed-off-by: ichAB <[email protected]>
1 parent d194021 commit 4a631f0

File tree

11 files changed

+1103
-9
lines changed

11 files changed

+1103
-9
lines changed

project/common/src/lib.rs

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ pub struct TypeMeta {
1414
pub kind: String,
1515
}
1616

17-
#[derive(Debug, Serialize, Deserialize, Clone)]
17+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
1818
pub struct ObjectMeta {
1919
pub name: String,
20+
#[serde(default)]
21+
pub uid: Option<String>,
2022
#[serde(default = "default_namespace")]
2123
pub namespace: String,
2224
#[serde(default)]
@@ -29,7 +31,7 @@ fn default_namespace() -> String {
2931
"default".to_string()
3032
}
3133

32-
#[derive(Debug, Serialize, Deserialize, Clone)]
34+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
3335
pub struct PodSpec {
3436
//if pod is distributed to a node ,then this field should be filled with node-id
3537
#[serde(default)]
@@ -42,18 +44,18 @@ pub struct PodSpec {
4244
pub tolerations: Vec<Toleration>,
4345
}
4446

45-
#[derive(Debug, Serialize, Deserialize, Clone)]
47+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
4648
pub struct ContainerRes {
4749
pub limits: Option<Resource>,
4850
}
4951

50-
#[derive(Debug, Serialize, Deserialize, Clone)]
52+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
5153
pub struct Resource {
5254
pub cpu: Option<String>,
5355
pub memory: Option<String>,
5456
}
5557

56-
#[derive(Debug, Serialize, Deserialize, Clone)]
58+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
5759
pub struct ContainerSpec {
5860
pub name: String,
5961

@@ -68,7 +70,7 @@ pub struct ContainerSpec {
6870
pub resources: Option<ContainerRes>,
6971
}
7072

71-
#[derive(Debug, Serialize, Deserialize, Clone)]
73+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
7274
pub struct Port {
7375
#[serde(rename = "containerPort")]
7476
pub container_port: i32,
@@ -96,13 +98,13 @@ pub struct PodTask {
9698
pub status: PodStatus,
9799
}
98100

99-
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
101+
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
100102
pub struct PodStatus {
101103
#[serde(rename = "podIP")]
102104
pub pod_ip: Option<String>,
103105
}
104106

105-
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
107+
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
106108
pub struct Toleration {
107109
/// Empty means match all taint keys.
108110
pub key: Option<TaintKey>,
@@ -131,7 +133,7 @@ impl Toleration {
131133
}
132134
}
133135
}
134-
#[derive(Debug, Serialize, Deserialize, Clone)]
136+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
135137
pub enum TolerationOperator {
136138
Exists,
137139
Equal,
@@ -322,3 +324,71 @@ pub struct ServiceTask {
322324
pub metadata: ObjectMeta,
323325
pub spec: ServiceSpec,
324326
}
327+
328+
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
329+
pub struct LabelSelector {
330+
#[serde(rename = "matchLabels", default)]
331+
pub match_labels: HashMap<String, String>,
332+
333+
#[serde(rename = "matchExpressions", default)]
334+
pub match_expressions: Vec<LabelSelectorRequirement>,
335+
}
336+
337+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
338+
pub struct LabelSelectorRequirement {
339+
pub key: String,
340+
pub operator: LabelSelectorOperator,
341+
#[serde(default)]
342+
pub values: Vec<String>,
343+
}
344+
345+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
346+
#[serde(rename_all = "PascalCase")]
347+
pub enum LabelSelectorOperator {
348+
In,
349+
NotIn,
350+
Exists,
351+
DoesNotExist,
352+
}
353+
354+
fn default_replicas() -> i32 {
355+
1
356+
}
357+
358+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
359+
pub struct PodTemplateSpec {
360+
pub metadata: ObjectMeta,
361+
pub spec: PodSpec,
362+
}
363+
364+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
365+
pub struct ReplicaSetSpec {
366+
#[serde(default = "default_replicas")]
367+
pub replicas: i32,
368+
pub selector: LabelSelector,
369+
pub template: PodTemplateSpec,
370+
}
371+
372+
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
373+
pub struct ReplicaSetStatus {
374+
#[serde(default)]
375+
pub replicas: i32,
376+
#[serde(rename = "fullyLabeledReplicas", default)]
377+
pub fully_labeled_replicas: i32,
378+
#[serde(rename = "readyReplicas", default)]
379+
pub ready_replicas: i32,
380+
#[serde(rename = "availableReplicas", default)]
381+
pub available_replicas: i32,
382+
}
383+
384+
#[derive(Debug, Serialize, Deserialize, Clone)]
385+
pub struct ReplicaSet {
386+
#[serde(rename = "apiVersion")]
387+
pub api_version: String,
388+
#[serde(rename = "kind")]
389+
pub kind: String,
390+
pub metadata: ObjectMeta,
391+
pub spec: ReplicaSetSpec,
392+
#[serde(default)]
393+
pub status: ReplicaSetStatus,
394+
}

project/libscheduler/tests/xline_test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ fn create_test_node(name: &str, cpu: &str, memory: &str) -> Node {
7676
kind: "Node".to_string(),
7777
metadata: ObjectMeta {
7878
name: name.to_string(),
79+
uid: None,
7980
namespace: "".to_string(),
8081
labels: HashMap::new(),
8182
annotations: HashMap::new(),
@@ -123,6 +124,7 @@ fn create_test_pod(name: &str, cpu_limit: Option<&str>, memory_limit: Option<&st
123124
kind: "Pod".to_string(),
124125
metadata: ObjectMeta {
125126
name: name.to_string(),
127+
uid: None,
126128
namespace: "default".to_string(),
127129
labels: HashMap::new(),
128130
annotations: HashMap::new(),

project/rkl/src/daemon/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ pub async fn generate_node(ext_iface: &ExternalInterface) -> Result<Node> {
526526
kind: "Node".to_string(),
527527
metadata: ObjectMeta {
528528
name: hostname,
529+
uid: None,
529530
namespace: "default".to_string(),
530531
labels: HashMap::new(),
531532
annotations: HashMap::new(),

project/rkl/tests/test_common.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ where
2828
kind: "Pod".to_string(),
2929
metadata: ObjectMeta {
3030
name,
31+
uid: None,
3132
labels: HashMap::from([
3233
("app".to_string(), "my-app".to_string()),
3334
("bundle".to_string(), bundles_path("pause")),

project/rks/src/api/xlinestore.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,84 @@ impl XlineStore {
325325
let (watcher, stream) = client.watch(key_prefix, Some(opts)).await?;
326326
Ok((watcher, stream))
327327
}
328+
329+
/// Insert a replicaset YAML definition into xline.
330+
pub async fn insert_replicaset_yaml(&self, rs_name: &str, rs_yaml: &str) -> Result<()> {
331+
let key = format!("/registry/replicasets/{rs_name}");
332+
let mut client = self.client.write().await;
333+
client.put(key, rs_yaml, Some(PutOptions::new())).await?;
334+
Ok(())
335+
}
336+
337+
/// Get a replicaset YAML definition from xline.
338+
pub async fn get_replicaset_yaml(&self, rs_name: &str) -> Result<Option<String>> {
339+
let key = format!("/registry/replicasets/{rs_name}");
340+
let mut client = self.client.write().await;
341+
let resp = client.get(key, None).await?;
342+
Ok(resp
343+
.kvs()
344+
.first()
345+
.map(|kv| String::from_utf8_lossy(kv.value()).to_string()))
346+
}
347+
348+
/// Delete a replicaset from xline.
349+
pub async fn delete_replicaset(&self, rs_name: &str) -> Result<()> {
350+
let key = format!("/registry/replicasets/{rs_name}");
351+
let mut client = self.client.write().await;
352+
client.delete(key, None).await?;
353+
Ok(())
354+
}
355+
356+
/// List all replicaset YAMLs (deserialize values).
357+
pub async fn list_replicasets(&self) -> Result<Vec<ReplicaSet>> {
358+
let key = "/registry/replicasets/".to_string();
359+
let mut client = self.client.write().await;
360+
let resp = client
361+
.get(key.clone(), Some(GetOptions::new().with_prefix()))
362+
.await?;
363+
364+
let rss: Vec<ReplicaSet> = resp
365+
.kvs()
366+
.iter()
367+
.filter_map(|kv| {
368+
let yaml_str = String::from_utf8_lossy(kv.value());
369+
serde_yaml::from_str::<ReplicaSet>(&yaml_str).ok()
370+
})
371+
.collect();
372+
373+
Ok(rss)
374+
}
375+
376+
/// Take a snapshot of all replicasets and return them with the current revision.
377+
pub async fn replicasets_snapshot_with_rev(&self) -> Result<(Vec<(String, String)>, i64)> {
378+
let key_prefix = "/registry/replicasets/".to_string();
379+
let mut client = self.client.write().await;
380+
let resp = client
381+
.get(key_prefix.clone(), Some(GetOptions::new().with_prefix()))
382+
.await?;
383+
let rev = resp.header().map(|h| h.revision()).unwrap_or(0);
384+
let items: Vec<(String, String)> = resp
385+
.kvs()
386+
.iter()
387+
.map(|kv| {
388+
(
389+
String::from_utf8_lossy(kv.key()).replace("/registry/replicasets/", ""),
390+
String::from_utf8_lossy(kv.value()).to_string(),
391+
)
392+
})
393+
.collect();
394+
Ok((items, rev))
395+
}
396+
397+
/// Create a watch on all replicasets with prefix `/registry/replicasets/`, starting from a given revision.
398+
pub async fn watch_replicasets(&self, start_rev: i64) -> Result<(Watcher, WatchStream)> {
399+
let key_prefix = "/registry/replicasets/".to_string();
400+
let opts = WatchOptions::new()
401+
.with_prefix()
402+
.with_prev_key()
403+
.with_start_revision(start_rev);
404+
let mut client = self.client.write().await;
405+
let (watcher, stream) = client.watch(key_prefix, Some(opts)).await?;
406+
Ok((watcher, stream))
407+
}
328408
}

0 commit comments

Comments
 (0)