Skip to content

Commit 8f7bc51

Browse files
committed
Add CBOR support for etcd encoding
Up until now we would write CBOR values back as etcd, after this commit we preserve the original encoding, so CBOR values are written back as CBOR. This required tracking the encoding type in the in-memory etcd cache, and modifying the encode function to handle CBOR.
1 parent 728312c commit 8f7bc51

File tree

9 files changed

+59
-22
lines changed

9 files changed

+59
-22
lines changed

src/cluster_crypto/cert_key_pair.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ impl CertKeyPair {
357357
.as_bytes()
358358
.to_vec(),
359359
)
360-
.await;
360+
.await.context("putting in etcd")?;
361361

362362
Ok(())
363363
}

src/cluster_crypto/distributed_jwt.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl DistributedJwt {
8686
&k8slocation.resource_location.as_etcd_key(),
8787
serde_json::to_string(&resource)?.as_bytes().to_vec(),
8888
)
89-
.await;
89+
.await.context("putting in etcd")?;
9090

9191
Ok(())
9292
}

src/cluster_crypto/distributed_private_key.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl DistributedPrivateKey {
9696
.as_bytes()
9797
.to_vec(),
9898
)
99-
.await;
99+
.await.context("putting in etcd")?;
100100

101101
Ok(())
102102
}

src/cluster_crypto/distributed_public_key.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ impl DistributedPublicKey {
129129
.as_bytes()
130130
.to_vec(),
131131
)
132-
.await;
132+
.await.context("putting in etcd")?;
133133

134134
Ok(())
135135
}

src/etcd_encoding.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,24 @@ k8s_type!(OAuthClientWithMeta, OAuthClient);
6565

6666
mod k8s_cbor;
6767

68-
pub(crate) async fn decode(data: &[u8]) -> Result<Vec<u8>> {
68+
#[derive(Clone)]
69+
pub(crate) enum Encoding {
70+
Protobuf,
71+
Cbor,
72+
Json,
73+
}
74+
75+
pub(crate) async fn decode(data: &[u8]) -> Result<(Vec<u8>, Encoding)> {
6976
if !data.starts_with("k8s\x00".as_bytes()) {
7077
// k8s uses CBOR with the self-describing tag 55799, we can use its bytes to detect CBOR
7178
if data.starts_with([0xd9, 0xd9, 0xf7].as_ref()) {
7279
// It's CBOR, just convert to JSON
7380
let json_value = k8s_cbor::k8s_cbor_bytes_to_json(data).context("converting CBOR to JSON")?;
74-
return Ok(serde_json::to_vec(&json_value)?);
81+
return Ok((serde_json::to_vec(&json_value)?, Encoding::Cbor));
7582
}
7683

7784
// Not CBOR, not protobuf, it's probably just raw JSON, return as-is
78-
return Ok(data.to_vec());
85+
return Ok((data.to_vec(), Encoding::Json));
7986
}
8087

8188
let data = &data[4..];
@@ -89,7 +96,7 @@ pub(crate) async fn decode(data: &[u8]) -> Result<Vec<u8>> {
8996
.context("missing kind")?
9097
.as_str();
9198

92-
Ok(match kind {
99+
let decoded_data = match kind {
93100
"Route" => serde_json::to_vec(&RouteWithMeta::try_from(unknown)?)?,
94101
"Deployment" => serde_json::to_vec(&DeploymentWithMeta::try_from(unknown)?)?,
95102
"ControllerRevision" => serde_json::to_vec(&ControllerRevisionWithMeta::try_from(unknown)?)?,
@@ -105,11 +112,20 @@ pub(crate) async fn decode(data: &[u8]) -> Result<Vec<u8>> {
105112
"MutatingWebhookConfiguration" => serde_json::to_vec(&MutatingWebhookConfigurationWithMeta::try_from(unknown)?)?,
106113
"OAuthClient" => serde_json::to_vec(&OAuthClientWithMeta::try_from(unknown)?)?,
107114
_ => bail!("unknown kind {}", kind),
108-
})
115+
};
116+
117+
Ok((decoded_data, Encoding::Protobuf))
109118
}
110119

111-
pub(crate) async fn encode(data: &[u8]) -> Result<Vec<u8>> {
120+
pub(crate) async fn encode(data: &[u8], encoding: Encoding) -> Result<Vec<u8>> {
112121
let value: Value = serde_json::from_slice(data)?;
122+
123+
if matches!(encoding, Encoding::Cbor) {
124+
return k8s_cbor::json_to_k8s_cbor_bytes(value).context("converting JSON to CBOR");
125+
}
126+
127+
// If kind is a known protobuf kind, write it back as protobuf, otherwise return raw JSON
128+
// TODO: Just look at the new encoding param?
113129
let kind = value
114130
.pointer("/kind")
115131
.context("missing kind")?

src/etcd_encoding/k8s_cbor.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,15 @@ pub(crate) fn k8s_cbor_bytes_to_json(cbor_bytes: &[u8]) -> Result<JsonValue> {
9090

9191
cbor_to_json(v)
9292
}
93+
94+
pub(crate) fn json_to_k8s_cbor_bytes(json: JsonValue) -> Result<Vec<u8>> {
95+
let cbor = json_to_cbor(json)?;
96+
97+
// Put back the self-describing CBOR tag that we stripped
98+
let tagged_cbor = CborValue::Tag(SELF_DESCRIBING_CBOR_TAG, Box::new(cbor));
99+
100+
let mut bytes = Vec::new();
101+
ciborium::ser::into_writer(&tagged_cbor, &mut bytes)?;
102+
103+
Ok(bytes)
104+
}

src/k8s_etcd.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::encrypt::ResourceTransformers;
33
use crate::etcd_encoding;
44
use anyhow::{bail, ensure, Context, Result};
55
use etcd_client::{Client as EtcdClient, GetOptions};
6+
use etcd_encoding::Encoding;
67
use futures_util::future::join_all;
78
use serde_json::Value;
89
use std::collections::{HashMap, HashSet};
@@ -22,7 +23,7 @@ pub(crate) struct EtcdResult {
2223
/// have to go through etcd for every single edit.
2324
pub(crate) struct InMemoryK8sEtcd {
2425
pub(crate) etcd_client: Option<Arc<EtcdClient>>,
25-
etcd_keyvalue_hashmap: Mutex<HashMap<String, Vec<u8>>>,
26+
etcd_keyvalue_hashmap: Mutex<HashMap<String, (Encoding, Vec<u8>)>>,
2627
edited: Mutex<HashMap<String, Vec<u8>>>,
2728
deleted_keys: Mutex<HashSet<String>>,
2829
decrypt_resource_transformers: Option<ResourceTransformers>,
@@ -105,10 +106,10 @@ impl InMemoryK8sEtcd {
105106
continue;
106107
}
107108
let key = key.clone();
108-
let value = value.clone();
109+
let (encoding, value) = value.clone();
109110
let etcd_client = Arc::clone(etcd_client);
110111

111-
let mut value = etcd_encoding::encode(value.as_slice()).await.context("encoding value")?;
112+
let mut value = etcd_encoding::encode(value.as_slice(), encoding).await.context("encoding value")?;
112113

113114
if let Some(resource_transformers) = &self.encrypt_resource_transformers {
114115
// https://github.com/kubernetes/apiserver/blob/3423727e46efe7dfa40dcdb1a9c5c5027b07303d/pkg/storage/value/transformer.go#L172
@@ -184,7 +185,7 @@ impl InMemoryK8sEtcd {
184185

185186
{
186187
let hashmap = self.etcd_keyvalue_hashmap.lock().await;
187-
if let Some(value) = hashmap.get(&key) {
188+
if let Some((_encoding, value)) = hashmap.get(&key) {
188189
result.value.clone_from(value);
189190
return Ok(Some(result));
190191
}
@@ -195,7 +196,7 @@ impl InMemoryK8sEtcd {
195196
if let Some(value) = get_result.kvs().first() {
196197
let raw_etcd_value = value.value();
197198

198-
let mut decoded_value = etcd_encoding::decode(raw_etcd_value).await.context("decoding value")?;
199+
let (mut decoded_value, mut encoding) = etcd_encoding::decode(raw_etcd_value).await.context("decoding value")?;
199200

200201
if let Some(resource_transformers) = &self.decrypt_resource_transformers {
201202
// https://github.com/kubernetes/apiserver/blob/3423727e46efe7dfa40dcdb1a9c5c5027b07303d/pkg/storage/value/transformer.go#L110
@@ -209,7 +210,7 @@ impl InMemoryK8sEtcd {
209210
.decrypt(key.to_string(), raw_etcd_value.to_vec())
210211
.await
211212
.context("decrypting etcd value")?;
212-
decoded_value = etcd_encoding::decode(&plaintext_value).await.context("decoding value")?;
213+
(decoded_value, encoding) = etcd_encoding::decode(&plaintext_value).await.context("decoding value")?;
213214
break;
214215
}
215216
}
@@ -219,7 +220,7 @@ impl InMemoryK8sEtcd {
219220
self.etcd_keyvalue_hashmap
220221
.lock()
221222
.await
222-
.insert(key.to_string(), decoded_value.clone());
223+
.insert(key.to_string(), (encoding, decoded_value.clone()));
223224

224225
result.value = decoded_value;
225226
return Ok(Some(result));
@@ -228,10 +229,18 @@ impl InMemoryK8sEtcd {
228229
Ok(None)
229230
}
230231

231-
pub(crate) async fn put(&self, key: &str, value: Vec<u8>) {
232-
self.etcd_keyvalue_hashmap.lock().await.insert(key.to_string(), value.clone());
232+
pub(crate) async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
233+
let mut hashmap = self.etcd_keyvalue_hashmap.lock().await;
234+
235+
// Only put if the key already exists in the cache, preserving the encoding
236+
let (encoding, _) = hashmap.get(key).context(format!("key '{}' not found in cache", key))?;
237+
let encoding = encoding.clone(); // Clone the encoding
238+
hashmap.insert(key.to_string(), (encoding, value.clone()));
239+
drop(hashmap); // Release the lock early
240+
233241
self.deleted_keys.lock().await.remove(key);
234242
self.edited.lock().await.insert(key.to_string(), value);
243+
Ok(())
235244
}
236245

237246
pub(crate) async fn list_keys(&self, resource_kind: &str) -> Result<Vec<String>> {
@@ -337,6 +346,6 @@ pub(crate) async fn get_etcd_json(client: &InMemoryK8sEtcd, k8slocation: &K8sRes
337346
pub(crate) async fn put_etcd_yaml(client: &InMemoryK8sEtcd, k8slocation: &K8sResourceLocation, value: Value) -> Result<()> {
338347
client
339348
.put(&k8slocation.as_etcd_key(), serde_json::to_string(&value)?.as_bytes().into())
340-
.await;
349+
.await.context("putting in etcd")?;
341350
Ok(())
342351
}

src/ocp_postprocess/encryption_config/etcd_rename.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ async fn update_encryption_key(component: &str, etcd_client: &Arc<InMemoryK8sEtc
177177
)),
178178
serde_json::to_string(&secret).context("serializing value")?.as_bytes().to_vec(),
179179
)
180-
.await;
180+
.await.context("putting in etcd")?;
181181
etcd_client.delete(&key).await.context(format!("deleting {}", key))?;
182182
}
183183
}

src/ocp_postprocess/hostname_rename/etcd_rename.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ pub(crate) async fn fix_etcd_secrets(etcd_client: &Arc<InMemoryK8sEtcd>, origina
227227
&(format!("/kubernetes.io/secrets/openshift-etcd/{new_secret_name}")),
228228
serde_json::to_string(&etcd_value).context("serializing value")?.as_bytes().to_vec(),
229229
)
230-
.await;
230+
.await.context("putting in etcd")?;
231231

232232
etcd_client.delete(&key).await.context(format!("deleting {}", key))?;
233233

0 commit comments

Comments
 (0)