Skip to content

Commit 8ba3eb8

Browse files
committed
Add CBOR support for etcd encoding
Up until now we would write CBOR values back as etcd, after this commit we preserve the original encoding, so CBOR values are written back as CBOR. This required tracking the encoding type in the in-memory etcd cache, and modifying the encode function to handle CBOR.
1 parent dec8fd7 commit 8ba3eb8

File tree

9 files changed

+167
-21
lines changed

9 files changed

+167
-21
lines changed

src/cluster_crypto/cert_key_pair.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,8 @@ impl CertKeyPair {
357357
.as_bytes()
358358
.to_vec(),
359359
)
360-
.await;
360+
.await
361+
.context("putting in etcd")?;
361362

362363
Ok(())
363364
}

src/cluster_crypto/distributed_jwt.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ impl DistributedJwt {
8686
&k8slocation.resource_location.as_etcd_key(),
8787
serde_json::to_string(&resource)?.as_bytes().to_vec(),
8888
)
89-
.await;
89+
.await
90+
.context("putting in etcd")?;
9091

9192
Ok(())
9293
}

src/cluster_crypto/distributed_private_key.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ impl DistributedPrivateKey {
9696
.as_bytes()
9797
.to_vec(),
9898
)
99-
.await;
99+
.await
100+
.context("putting in etcd")?;
100101

101102
Ok(())
102103
}

src/cluster_crypto/distributed_public_key.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ impl DistributedPublicKey {
129129
.as_bytes()
130130
.to_vec(),
131131
)
132-
.await;
132+
.await
133+
.context("putting in etcd")?;
133134

134135
Ok(())
135136
}

src/etcd_encoding.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,26 @@ k8s_type!(ValidatingWebhookConfigurationWithMeta, ValidatingWebhookConfiguration
6363
k8s_type!(MutatingWebhookConfigurationWithMeta, MutatingWebhookConfiguration);
6464
k8s_type!(OAuthClientWithMeta, OAuthClient);
6565

66-
pub(crate) async fn decode(data: &[u8]) -> Result<Vec<u8>> {
66+
mod k8s_cbor;
67+
68+
#[derive(Clone)]
69+
pub(crate) enum Encoding {
70+
Protobuf,
71+
Cbor,
72+
Json,
73+
}
74+
75+
pub(crate) async fn decode(data: &[u8]) -> Result<(Vec<u8>, Encoding)> {
6776
if !data.starts_with("k8s\x00".as_bytes()) {
68-
return Ok(data.to_vec());
77+
// k8s uses CBOR with the self-describing tag 55799, we can use its bytes to detect CBOR
78+
if data.starts_with([0xd9, 0xd9, 0xf7].as_ref()) {
79+
// It's CBOR, just convert to JSON
80+
let json_value = k8s_cbor::k8s_cbor_bytes_to_json(data).context("converting CBOR to JSON")?;
81+
return Ok((serde_json::to_vec(&json_value)?, Encoding::Cbor));
82+
}
83+
84+
// Not CBOR, not protobuf, it's probably just raw JSON, return as-is
85+
return Ok((data.to_vec(), Encoding::Json));
6986
}
7087

7188
let data = &data[4..];
@@ -79,7 +96,7 @@ pub(crate) async fn decode(data: &[u8]) -> Result<Vec<u8>> {
7996
.context("missing kind")?
8097
.as_str();
8198

82-
Ok(match kind {
99+
let decoded_data = match kind {
83100
"Route" => serde_json::to_vec(&RouteWithMeta::try_from(unknown)?)?,
84101
"Deployment" => serde_json::to_vec(&DeploymentWithMeta::try_from(unknown)?)?,
85102
"ControllerRevision" => serde_json::to_vec(&ControllerRevisionWithMeta::try_from(unknown)?)?,
@@ -95,11 +112,20 @@ pub(crate) async fn decode(data: &[u8]) -> Result<Vec<u8>> {
95112
"MutatingWebhookConfiguration" => serde_json::to_vec(&MutatingWebhookConfigurationWithMeta::try_from(unknown)?)?,
96113
"OAuthClient" => serde_json::to_vec(&OAuthClientWithMeta::try_from(unknown)?)?,
97114
_ => bail!("unknown kind {}", kind),
98-
})
115+
};
116+
117+
Ok((decoded_data, Encoding::Protobuf))
99118
}
100119

101-
pub(crate) async fn encode(data: &[u8]) -> Result<Vec<u8>> {
120+
pub(crate) async fn encode(data: &[u8], encoding: Encoding) -> Result<Vec<u8>> {
102121
let value: Value = serde_json::from_slice(data)?;
122+
123+
if matches!(encoding, Encoding::Cbor) {
124+
return k8s_cbor::json_to_k8s_cbor_bytes(value).context("converting JSON to CBOR");
125+
}
126+
127+
// If kind is a known protobuf kind, write it back as protobuf, otherwise return raw JSON
128+
// TODO: Just look at the new encoding param?
103129
let kind = value
104130
.pointer("/kind")
105131
.context("missing kind")?

src/etcd_encoding/k8s_cbor.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
use anyhow::{bail, Context, Result};
2+
use ciborium::value::Value as CborValue;
3+
use serde_json::{value::Number as JsonNumber, Value as JsonValue};
4+
5+
const SELF_DESCRIBING_CBOR_TAG: u64 = 55799;
6+
7+
fn cbor_to_json(cbor: CborValue) -> Result<JsonValue> {
8+
Ok(match cbor {
9+
CborValue::Null => JsonValue::Null,
10+
CborValue::Bool(boolean) => JsonValue::Bool(boolean),
11+
CborValue::Text(string) => JsonValue::String(string),
12+
CborValue::Integer(int) => JsonValue::Number({
13+
let int: i128 = int.into();
14+
if let Ok(int) = u64::try_from(int) {
15+
JsonNumber::from(int)
16+
} else if let Ok(int) = i64::try_from(int) {
17+
JsonNumber::from(int)
18+
} else {
19+
JsonNumber::from_f64(int as f64).context("Integer not JSON compatible")?
20+
}
21+
}),
22+
CborValue::Float(float) => JsonValue::Number(JsonNumber::from_f64(float).context("Float not JSON compatible")?),
23+
CborValue::Array(vec) => JsonValue::Array(vec.into_iter().map(cbor_to_json).collect::<Result<Vec<_>>>()?),
24+
CborValue::Map(map) => JsonValue::Object(serde_json::Map::from_iter(
25+
map.into_iter()
26+
.map(|(k, v)| {
27+
let key_str = match k {
28+
CborValue::Bytes(bytes) => String::from_utf8(bytes).context("Invalid UTF-8 in CBOR map key")?,
29+
CborValue::Text(text) => text,
30+
_ => bail!("Unsupported CBOR map key type {:?}", k),
31+
};
32+
Ok((key_str, cbor_to_json(v)?))
33+
})
34+
.collect::<Result<Vec<(String, JsonValue)>>>()?,
35+
)),
36+
// TODO: Handle proposed-encoding tags for CBOR bytes? https://github.com/kubernetes/kubernetes/pull/125419
37+
// It seems that in a typical k8s cluster these are not used anywhere (secrets are
38+
// protobuf, and they're pretty much the only place where raw bytes are used in
39+
// values), so I don't have an example to test that implementation on. For now we will
40+
// crash on unhandled tags below to be safe.
41+
CborValue::Bytes(vec) => JsonValue::String(String::from_utf8(vec).context("Invalid UTF-8 in CBOR bytes")?),
42+
CborValue::Tag(value, _tag) => unimplemented!("Unsupported CBOR tag {:?}", value),
43+
_ => unimplemented!("Unsupported CBOR type {:?}", cbor),
44+
})
45+
}
46+
47+
fn json_to_cbor(json: JsonValue) -> Result<CborValue> {
48+
Ok(match json {
49+
JsonValue::Null => CborValue::Null,
50+
JsonValue::Bool(boolean) => CborValue::Bool(boolean),
51+
JsonValue::String(string) => CborValue::Bytes(string.into_bytes()),
52+
JsonValue::Number(number) => {
53+
if let Some(int) = number.as_i64() {
54+
CborValue::Integer(int.into())
55+
} else if let Some(uint) = number.as_u64() {
56+
CborValue::Integer(uint.into())
57+
} else if let Some(float) = number.as_f64() {
58+
CborValue::Float(float)
59+
} else {
60+
bail!("Unsupported number type")
61+
}
62+
}
63+
JsonValue::Array(arr) => CborValue::Array(arr.into_iter().map(json_to_cbor).collect::<Result<Vec<_>>>()?),
64+
JsonValue::Object(map) => {
65+
// Fallback for regular JSON objects (shouldn't happen in our flow)
66+
let map_entries: Vec<(CborValue, CborValue)> = map
67+
.into_iter()
68+
.map(|(k, v)| Ok((CborValue::Bytes(k.into_bytes()), json_to_cbor(v)?)))
69+
.collect::<Result<Vec<_>>>()?;
70+
CborValue::Map(map_entries)
71+
}
72+
})
73+
}
74+
75+
pub(crate) fn k8s_cbor_bytes_to_json(cbor_bytes: &[u8]) -> Result<JsonValue> {
76+
let v: CborValue = ciborium::de::from_reader(cbor_bytes)?;
77+
78+
let (v, had_self_describing_tag) = match v {
79+
CborValue::Tag(value, contents) => match value {
80+
SELF_DESCRIBING_CBOR_TAG => {
81+
// Self-describing CBOR tag, unwrap the contents
82+
(*contents, true)
83+
}
84+
_ => panic!("Unsupported CBOR tag {}", value),
85+
},
86+
// We expected a self-describing CBOR tag at the root. Of course we could just proceed
87+
// as is (since it's just raw CBOR) but it's a bit fishy, so just bail
88+
_ => bail!("CBOR data that does not start with self-describing tag is not supported"),
89+
};
90+
91+
cbor_to_json(v)
92+
}
93+
94+
pub(crate) fn json_to_k8s_cbor_bytes(json: JsonValue) -> Result<Vec<u8>> {
95+
let cbor = json_to_cbor(json)?;
96+
97+
// Put back the self-describing CBOR tag that we stripped
98+
let tagged_cbor = CborValue::Tag(SELF_DESCRIBING_CBOR_TAG, Box::new(cbor));
99+
100+
let mut bytes = Vec::new();
101+
ciborium::ser::into_writer(&tagged_cbor, &mut bytes)?;
102+
103+
Ok(bytes)
104+
}

src/k8s_etcd.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::encrypt::ResourceTransformers;
33
use crate::etcd_encoding;
44
use anyhow::{bail, ensure, Context, Result};
55
use etcd_client::{Client as EtcdClient, GetOptions};
6+
use etcd_encoding::Encoding;
67
use futures_util::future::join_all;
78
use serde_json::Value;
89
use std::collections::{HashMap, HashSet};
@@ -22,7 +23,7 @@ pub(crate) struct EtcdResult {
2223
/// have to go through etcd for every single edit.
2324
pub(crate) struct InMemoryK8sEtcd {
2425
pub(crate) etcd_client: Option<Arc<EtcdClient>>,
25-
etcd_keyvalue_hashmap: Mutex<HashMap<String, Vec<u8>>>,
26+
etcd_keyvalue_hashmap: Mutex<HashMap<String, (Encoding, Vec<u8>)>>,
2627
edited: Mutex<HashMap<String, Vec<u8>>>,
2728
deleted_keys: Mutex<HashSet<String>>,
2829
decrypt_resource_transformers: Option<ResourceTransformers>,
@@ -105,10 +106,10 @@ impl InMemoryK8sEtcd {
105106
continue;
106107
}
107108
let key = key.clone();
108-
let value = value.clone();
109+
let (encoding, value) = value.clone();
109110
let etcd_client = Arc::clone(etcd_client);
110111

111-
let mut value = etcd_encoding::encode(value.as_slice()).await.context("encoding value")?;
112+
let mut value = etcd_encoding::encode(value.as_slice(), encoding).await.context("encoding value")?;
112113

113114
if let Some(resource_transformers) = &self.encrypt_resource_transformers {
114115
// https://github.com/kubernetes/apiserver/blob/3423727e46efe7dfa40dcdb1a9c5c5027b07303d/pkg/storage/value/transformer.go#L172
@@ -184,7 +185,7 @@ impl InMemoryK8sEtcd {
184185

185186
{
186187
let hashmap = self.etcd_keyvalue_hashmap.lock().await;
187-
if let Some(value) = hashmap.get(&key) {
188+
if let Some((_encoding, value)) = hashmap.get(&key) {
188189
result.value.clone_from(value);
189190
return Ok(Some(result));
190191
}
@@ -195,7 +196,7 @@ impl InMemoryK8sEtcd {
195196
if let Some(value) = get_result.kvs().first() {
196197
let raw_etcd_value = value.value();
197198

198-
let mut decoded_value = etcd_encoding::decode(raw_etcd_value).await.context("decoding value")?;
199+
let (mut decoded_value, mut encoding) = etcd_encoding::decode(raw_etcd_value).await.context("decoding value")?;
199200

200201
if let Some(resource_transformers) = &self.decrypt_resource_transformers {
201202
// https://github.com/kubernetes/apiserver/blob/3423727e46efe7dfa40dcdb1a9c5c5027b07303d/pkg/storage/value/transformer.go#L110
@@ -209,7 +210,7 @@ impl InMemoryK8sEtcd {
209210
.decrypt(key.to_string(), raw_etcd_value.to_vec())
210211
.await
211212
.context("decrypting etcd value")?;
212-
decoded_value = etcd_encoding::decode(&plaintext_value).await.context("decoding value")?;
213+
(decoded_value, encoding) = etcd_encoding::decode(&plaintext_value).await.context("decoding value")?;
213214
break;
214215
}
215216
}
@@ -219,7 +220,7 @@ impl InMemoryK8sEtcd {
219220
self.etcd_keyvalue_hashmap
220221
.lock()
221222
.await
222-
.insert(key.to_string(), decoded_value.clone());
223+
.insert(key.to_string(), (encoding, decoded_value.clone()));
223224

224225
result.value = decoded_value;
225226
return Ok(Some(result));
@@ -228,10 +229,18 @@ impl InMemoryK8sEtcd {
228229
Ok(None)
229230
}
230231

231-
pub(crate) async fn put(&self, key: &str, value: Vec<u8>) {
232-
self.etcd_keyvalue_hashmap.lock().await.insert(key.to_string(), value.clone());
232+
pub(crate) async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
233+
let mut hashmap = self.etcd_keyvalue_hashmap.lock().await;
234+
235+
// Only put if the key already exists in the cache, preserving the encoding
236+
let (encoding, _) = hashmap.get(key).context(format!("key '{}' not found in cache", key))?;
237+
let encoding = encoding.clone(); // Clone the encoding
238+
hashmap.insert(key.to_string(), (encoding, value.clone()));
239+
drop(hashmap); // Release the lock early
240+
233241
self.deleted_keys.lock().await.remove(key);
234242
self.edited.lock().await.insert(key.to_string(), value);
243+
Ok(())
235244
}
236245

237246
pub(crate) async fn list_keys(&self, resource_kind: &str) -> Result<Vec<String>> {
@@ -337,6 +346,7 @@ pub(crate) async fn get_etcd_json(client: &InMemoryK8sEtcd, k8slocation: &K8sRes
337346
pub(crate) async fn put_etcd_yaml(client: &InMemoryK8sEtcd, k8slocation: &K8sResourceLocation, value: Value) -> Result<()> {
338347
client
339348
.put(&k8slocation.as_etcd_key(), serde_json::to_string(&value)?.as_bytes().into())
340-
.await;
349+
.await
350+
.context("putting in etcd")?;
341351
Ok(())
342352
}

src/ocp_postprocess/encryption_config/etcd_rename.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ async fn update_encryption_key(component: &str, etcd_client: &Arc<InMemoryK8sEtc
177177
)),
178178
serde_json::to_string(&secret).context("serializing value")?.as_bytes().to_vec(),
179179
)
180-
.await;
180+
.await
181+
.context("putting in etcd")?;
181182
etcd_client.delete(&key).await.context(format!("deleting {}", key))?;
182183
}
183184
}

src/ocp_postprocess/hostname_rename/etcd_rename.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ pub(crate) async fn fix_etcd_secrets(etcd_client: &Arc<InMemoryK8sEtcd>, origina
227227
&(format!("/kubernetes.io/secrets/openshift-etcd/{new_secret_name}")),
228228
serde_json::to_string(&etcd_value).context("serializing value")?.as_bytes().to_vec(),
229229
)
230-
.await;
230+
.await
231+
.context("putting in etcd")?;
231232

232233
etcd_client.delete(&key).await.context(format!("deleting {}", key))?;
233234

0 commit comments

Comments
 (0)