Skip to content

Commit d8e478e

Browse files
chore: Handle forwarding with consumer instead of regex
1 parent f401fa7 commit d8e478e

File tree

5 files changed

+37
-66
lines changed

5 files changed

+37
-66
lines changed

proxy/src/auth.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub fn start(state: Arc<State>) {
3333
.iter()
3434
.map(|crd| {
3535
let consumer = Consumer::from(crd);
36-
(consumer.hash_key.clone(), consumer)
36+
(consumer.key.clone(), consumer)
3737
})
3838
.collect();
3939
*state.consumers.write().await = consumers;
@@ -52,7 +52,7 @@ pub fn start(state: Arc<State>) {
5252
.consumers
5353
.write()
5454
.await
55-
.insert(consumer.hash_key.clone(), consumer);
55+
.insert(consumer.key.clone(), consumer);
5656
}
5757
None => {
5858
// New ports are created without status. When the status is added, a new
@@ -67,7 +67,7 @@ pub fn start(state: Arc<State>) {
6767
crd.name_any()
6868
);
6969
let consumer = Consumer::from(&crd);
70-
state.consumers.write().await.remove(&consumer.hash_key);
70+
state.consumers.write().await.remove(&consumer.key);
7171
state.limiter.write().await.remove(&consumer.key);
7272
}
7373
// Empty response from stream. Should never happen.

proxy/src/limiter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async fn add_limiter(state: &State, consumer: &Consumer, tier: &Tier) {
5050
pub async fn limiter(state: Arc<State>, consumer: &Consumer) -> Result<(), LimiterError> {
5151
if !has_limiter(&state, consumer).await {
5252
let consumers = state.consumers.read().await.clone();
53-
let refreshed_consumer = match consumers.get(&consumer.hash_key) {
53+
let refreshed_consumer = match consumers.get(&consumer.key) {
5454
Some(consumer) => consumer,
5555
None => return Err(LimiterError::PortDeleted),
5656
};

proxy/src/main.rs

+16-9
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl State {
5252
pub fn try_new() -> Result<Self, Box<dyn Error>> {
5353
let config = Config::new();
5454
let metrics = Metrics::try_new(Registry::default())?;
55-
let host_regex = Regex::new(r"(dmtr_[\w\d-]+)?\.?([\w-]+)-v([\d]).+")?;
55+
let host_regex = Regex::new(r"(dmtr_[\w\d-]+)?\.?.+")?;
5656
let consumers = Default::default();
5757
let tiers = Default::default();
5858
let limiter = Default::default();
@@ -67,10 +67,8 @@ impl State {
6767
})
6868
}
6969

70-
pub async fn get_consumer(&self, network: &str, version: &str, key: &str) -> Option<Consumer> {
71-
let consumers = self.consumers.read().await.clone();
72-
let hash_key = format!("{}.{}.{}", network, version, key);
73-
consumers.get(&hash_key).cloned()
70+
pub async fn get_consumer(&self, key: &str) -> Option<Consumer> {
71+
self.consumers.read().await.clone().get(key).cloned()
7472
}
7573
}
7674

@@ -80,7 +78,16 @@ pub struct Consumer {
8078
port_name: String,
8179
tier: String,
8280
key: String,
83-
hash_key: String,
81+
network: String,
82+
version: String,
83+
}
84+
impl Consumer {
85+
pub fn instance(&self, state: State) -> String {
86+
format!(
87+
"ogmios-{}-{}.{}:{}",
88+
self.network, self.version, state.config.ogmios_dns, state.config.ogmios_port
89+
)
90+
}
8491
}
8592
impl Display for Consumer {
8693
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -90,19 +97,19 @@ impl Display for Consumer {
9097
impl From<&OgmiosPort> for Consumer {
9198
fn from(value: &OgmiosPort) -> Self {
9299
let network = value.spec.network.to_string();
93-
let version = value.spec.version;
100+
let version = value.spec.version.to_string();
94101
let tier = value.spec.throughput_tier.to_string();
95102
let key = value.status.as_ref().unwrap().auth_token.clone();
96103
let namespace = value.metadata.namespace.as_ref().unwrap().clone();
97104
let port_name = value.name_any();
98105

99-
let hash_key = format!("{}.{}.{}", network, version, key);
100106
Self {
101107
namespace,
102108
port_name,
103109
tier,
104110
key,
105-
hash_key,
111+
network,
112+
version,
106113
}
107114
}
108115
}

proxy/src/metrics.rs

+9-33
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tracing::{error, info, instrument};
1111

1212
use crate::proxy::ProxyRequest;
1313
use crate::utils::{full, ProxyResponse};
14-
use crate::{Consumer, State};
14+
use crate::State;
1515

1616
#[derive(Debug, Clone)]
1717
pub struct Metrics {
@@ -69,75 +69,51 @@ impl Metrics {
6969
}
7070

7171
pub fn count_ws_total_frame(&self, proxy_req: &ProxyRequest) {
72-
let consumer = proxy_req
73-
.consumer
74-
.as_ref()
75-
.unwrap_or(&Consumer::default())
76-
.clone();
77-
7872
self.ws_total_frame
7973
.with_label_values(&[
8074
&proxy_req.namespace,
8175
&proxy_req.instance,
8276
&proxy_req.host,
83-
&consumer.to_string(),
84-
&consumer.tier,
77+
&proxy_req.consumer.to_string(),
78+
&proxy_req.consumer.tier,
8579
])
8680
.inc()
8781
}
8882

8983
pub fn inc_ws_total_connection(&self, proxy_req: &ProxyRequest) {
90-
let consumer = proxy_req
91-
.consumer
92-
.as_ref()
93-
.unwrap_or(&Consumer::default())
94-
.clone();
95-
9684
self.ws_total_connection
9785
.with_label_values(&[
9886
&proxy_req.namespace,
9987
&proxy_req.instance,
10088
&proxy_req.host,
101-
&consumer.to_string(),
102-
&consumer.tier,
89+
&proxy_req.consumer.to_string(),
90+
&proxy_req.consumer.tier,
10391
])
10492
.inc()
10593
}
10694

10795
pub fn dec_ws_total_connection(&self, proxy_req: &ProxyRequest) {
108-
let consumer = proxy_req
109-
.consumer
110-
.as_ref()
111-
.unwrap_or(&Consumer::default())
112-
.clone();
113-
11496
self.ws_total_connection
11597
.with_label_values(&[
11698
&proxy_req.namespace,
11799
&proxy_req.instance,
118100
&proxy_req.host,
119-
&consumer.to_string(),
120-
&consumer.tier,
101+
&proxy_req.consumer.to_string(),
102+
&proxy_req.consumer.tier,
121103
])
122104
.dec()
123105
}
124106

125107
pub fn count_http_total_request(&self, proxy_req: &ProxyRequest, status_code: StatusCode) {
126-
let consumer = proxy_req
127-
.consumer
128-
.as_ref()
129-
.unwrap_or(&Consumer::default())
130-
.clone();
131-
132108
self.http_total_request
133109
.with_label_values(&[
134110
&proxy_req.namespace,
135111
&proxy_req.instance,
136112
&proxy_req.host,
137113
&status_code.as_u16().to_string(),
138114
&proxy_req.protocol.to_string(),
139-
&consumer.to_string(),
140-
&consumer.tier,
115+
&proxy_req.consumer.to_string(),
116+
&proxy_req.consumer.tier,
141117
])
142118
.inc()
143119
}

proxy/src/proxy.rs

+8-20
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,13 @@ async fn handle(
101101
_ => {
102102
let proxy_req_result = ProxyRequest::new(&mut hyper_req, &state).await;
103103
if proxy_req_result.is_none() {
104-
return Ok(Response::builder()
105-
.status(StatusCode::BAD_GATEWAY)
106-
.body(full("Invalid hostname"))
107-
.unwrap());
108-
}
109-
110-
let proxy_req = proxy_req_result.unwrap();
111-
if proxy_req.consumer.is_none() {
112104
return Ok(Response::builder()
113105
.status(StatusCode::UNAUTHORIZED)
114106
.body(full("Unauthorized"))
115107
.unwrap());
116108
}
117109

110+
let proxy_req = proxy_req_result.unwrap();
118111
let response_result = match proxy_req.protocol {
119112
Protocol::Http => handle_http(hyper_req, &proxy_req).await,
120113
Protocol::Websocket => handle_websocket(hyper_req, &proxy_req, state.clone()).await,
@@ -199,11 +192,7 @@ async fn handle_websocket(
199192
while let Some(result) = client_incoming.next().await {
200193
match result {
201194
Ok(data) => {
202-
if let Err(err) = limiter(
203-
state.clone(),
204-
proxy_req.consumer.clone().as_ref().unwrap(),
205-
)
206-
.await
195+
if let Err(err) = limiter(state.clone(), &proxy_req.consumer).await
207196
{
208197
error!(error = err.to_string(), "Failed to run limiter");
209198
break;
@@ -276,7 +265,7 @@ pub struct ProxyRequest {
276265
pub namespace: String,
277266
pub host: String,
278267
pub instance: String,
279-
pub consumer: Option<Consumer>,
268+
pub consumer: Consumer,
280269
pub protocol: Protocol,
281270
}
282271
impl ProxyRequest {
@@ -288,11 +277,6 @@ impl ProxyRequest {
288277
let network = captures.get(2)?.as_str().to_string();
289278
let version = captures.get(3)?.as_str().to_string();
290279

291-
let instance = format!(
292-
"ogmios-{network}-{version}.{}:{}",
293-
state.config.ogmios_dns, state.config.ogmios_port
294-
);
295-
296280
let namespace = state.config.proxy_namespace.clone();
297281

298282
let protocol = get_header(hyper_req, UPGRADE.as_str())
@@ -314,7 +298,11 @@ impl ProxyRequest {
314298
}
315299

316300
let token = get_header(hyper_req, DMTR_API_KEY).unwrap_or_default();
317-
let consumer = state.get_consumer(&network, &version, &token).await;
301+
let consumer = state.get_consumer(&token).await?;
302+
let instance = format!(
303+
"ogmios-{network}-{version}.{}:{}",
304+
state.config.ogmios_dns, state.config.ogmios_port
305+
);
318306

319307
Some(Self {
320308
namespace,

0 commit comments

Comments
 (0)