Skip to content

Commit 29aeec1

Browse files
fix: Update limiters on port modifications (#50)
* fix: Refill limiter with limit * fix: Update limiters on port modifications * Clear limiter map * Remove consumer from limiter on port update
1 parent 2a03215 commit 29aeec1

File tree

2 files changed

+76
-51
lines changed

2 files changed

+76
-51
lines changed

Diff for: proxy/src/auth.rs

+58-44
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
use futures_util::TryStreamExt;
22
use operator::{
33
kube::{
4-
api::ListParams,
5-
runtime::{
6-
watcher::{self, Config},
7-
WatchStreamExt,
8-
},
4+
runtime::watcher::{self, Config, Event},
95
Api, Client, ResourceExt,
106
},
117
OgmiosPort,
128
};
139
use std::{collections::HashMap, sync::Arc};
1410
use tokio::pin;
15-
use tracing::{error, instrument};
11+
use tracing::{error, info, instrument};
1612

1713
use crate::{Consumer, State};
1814

@@ -24,49 +20,67 @@ pub fn start(state: Arc<State>) {
2420
.expect("failed to create kube client");
2521

2622
let api = Api::<OgmiosPort>::all(client.clone());
27-
update_auth(state.clone(), api.clone()).await;
28-
29-
let stream = watcher::watcher(api.clone(), Config::default()).touched_objects();
23+
let stream = watcher::watcher(api.clone(), Config::default());
3024
pin!(stream);
3125

3226
loop {
3327
let result = stream.try_next().await;
34-
if let Err(err) = result {
35-
error!(error = err.to_string(), "fail crd auth watcher");
36-
continue;
37-
}
28+
match result {
29+
// Stream restart, also run on startup.
30+
Ok(Some(Event::Restarted(crds))) => {
31+
info!("auth: Watcher restarted, reseting consumers");
32+
let consumers: HashMap<String, Consumer> = crds
33+
.iter()
34+
.map(|crd| {
35+
let consumer = Consumer::from(crd);
36+
(consumer.hash_key.clone(), consumer)
37+
})
38+
.collect();
39+
*state.consumers.write().await = consumers;
3840

39-
update_auth(state.clone(), api.clone()).await;
41+
// When the watcher is restarted, we reset the limiter because a user
42+
// could have changed the tier on the watcher restart.
43+
state.limiter.write().await.clear();
44+
}
45+
// New port created or updated.
46+
Ok(Some(Event::Applied(crd))) => match crd.status {
47+
Some(_) => {
48+
info!("auth: Adding new consumer: {}", crd.name_any());
49+
let consumer = Consumer::from(&crd);
50+
state.limiter.write().await.remove(&consumer.key);
51+
state
52+
.consumers
53+
.write()
54+
.await
55+
.insert(consumer.hash_key.clone(), consumer);
56+
}
57+
None => {
58+
// New ports are created without status. When the status is added, a new
59+
// Applied event is triggered.
60+
info!("auth: New port created: {}", crd.name_any());
61+
}
62+
},
63+
// Port deleted.
64+
Ok(Some(Event::Deleted(crd))) => {
65+
info!(
66+
"auth: Port deleted, removing from state: {}",
67+
crd.name_any()
68+
);
69+
let consumer = Consumer::from(&crd);
70+
state.consumers.write().await.remove(&consumer.hash_key);
71+
state.limiter.write().await.remove(&consumer.key);
72+
}
73+
// Empty response from stream. Should never happen.
74+
Ok(None) => {
75+
error!("auth: Empty response from watcher.");
76+
continue;
77+
}
78+
// Unexpected error when streaming CRDs.
79+
Err(err) => {
80+
error!(error = err.to_string(), "auth: Failed to update crds.");
81+
std::process::exit(1);
82+
}
83+
}
4084
}
4185
});
4286
}
43-
44-
async fn update_auth(state: Arc<State>, api: Api<OgmiosPort>) {
45-
let result = api.list(&ListParams::default()).await;
46-
if let Err(err) = result {
47-
error!(
48-
error = err.to_string(),
49-
"error to get crds while updating auth keys"
50-
);
51-
return;
52-
}
53-
54-
let mut consumers = HashMap::new();
55-
for crd in result.unwrap().items.iter() {
56-
if crd.status.is_some() {
57-
let network = crd.spec.network.to_string();
58-
let version = crd.spec.version;
59-
let tier = crd.spec.throughput_tier.to_string();
60-
let key = crd.status.as_ref().unwrap().auth_token.clone();
61-
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
62-
let port_name = crd.name_any();
63-
64-
let hash_key = format!("{}.{}.{}", network, version, key);
65-
let consumer = Consumer::new(namespace, port_name, tier, key);
66-
67-
consumers.insert(hash_key, consumer);
68-
}
69-
}
70-
71-
*state.consumers.write().await = consumers;
72-
}

Diff for: proxy/src/main.rs

+18-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use config::Config;
22
use dotenv::dotenv;
33
use leaky_bucket::RateLimiter;
44
use metrics::Metrics;
5+
use operator::{kube::ResourceExt, OgmiosPort};
56
use prometheus::Registry;
67
use regex::Regex;
78
use std::collections::HashMap;
@@ -79,19 +80,29 @@ pub struct Consumer {
7980
port_name: String,
8081
tier: String,
8182
key: String,
83+
hash_key: String,
8284
}
83-
impl Consumer {
84-
pub fn new(namespace: String, port_name: String, tier: String, key: String) -> Self {
85+
impl Display for Consumer {
86+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87+
write!(f, "{}.{}", self.namespace, self.port_name)
88+
}
89+
}
90+
impl From<&OgmiosPort> for Consumer {
91+
fn from(value: &OgmiosPort) -> Self {
92+
let network = value.spec.network.to_string();
93+
let version = value.spec.version;
94+
let tier = value.spec.throughput_tier.to_string();
95+
let key = value.status.as_ref().unwrap().auth_token.clone();
96+
let namespace = value.metadata.namespace.as_ref().unwrap().clone();
97+
let port_name = value.name_any();
98+
99+
let hash_key = format!("{}.{}.{}", network, version, key);
85100
Self {
86101
namespace,
87102
port_name,
88103
tier,
89104
key,
105+
hash_key,
90106
}
91107
}
92108
}
93-
impl Display for Consumer {
94-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95-
write!(f, "{}.{}", self.namespace, self.port_name)
96-
}
97-
}

0 commit comments

Comments
 (0)