Skip to content

Commit 82c5a41

Browse files
chore: Handle forwarding with consumer instead of regex (#56)
* chore: Handle forwarding with consumer instead of regex * Fix instance
1 parent f401fa7 commit 82c5a41

File tree

5 files changed

+29
-68
lines changed

5 files changed

+29
-68
lines changed

Diff for: proxy/src/auth.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub fn start(state: Arc<State>) {
3333
.iter()
3434
.map(|crd| {
3535
let consumer = Consumer::from(crd);
36-
(consumer.hash_key.clone(), consumer)
36+
(consumer.key.clone(), consumer)
3737
})
3838
.collect();
3939
*state.consumers.write().await = consumers;
@@ -52,7 +52,7 @@ pub fn start(state: Arc<State>) {
5252
.consumers
5353
.write()
5454
.await
55-
.insert(consumer.hash_key.clone(), consumer);
55+
.insert(consumer.key.clone(), consumer);
5656
}
5757
None => {
5858
// New ports are created without status. When the status is added, a new
@@ -67,7 +67,7 @@ pub fn start(state: Arc<State>) {
6767
crd.name_any()
6868
);
6969
let consumer = Consumer::from(&crd);
70-
state.consumers.write().await.remove(&consumer.hash_key);
70+
state.consumers.write().await.remove(&consumer.key);
7171
state.limiter.write().await.remove(&consumer.key);
7272
}
7373
// Empty response from stream. Should never happen.

Diff for: proxy/src/limiter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async fn add_limiter(state: &State, consumer: &Consumer, tier: &Tier) {
5050
pub async fn limiter(state: Arc<State>, consumer: &Consumer) -> Result<(), LimiterError> {
5151
if !has_limiter(&state, consumer).await {
5252
let consumers = state.consumers.read().await.clone();
53-
let refreshed_consumer = match consumers.get(&consumer.hash_key) {
53+
let refreshed_consumer = match consumers.get(&consumer.key) {
5454
Some(consumer) => consumer,
5555
None => return Err(LimiterError::PortDeleted),
5656
};

Diff for: proxy/src/main.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl State {
5252
pub fn try_new() -> Result<Self, Box<dyn Error>> {
5353
let config = Config::new();
5454
let metrics = Metrics::try_new(Registry::default())?;
55-
let host_regex = Regex::new(r"(dmtr_[\w\d-]+)?\.?([\w-]+)-v([\d]).+")?;
55+
let host_regex = Regex::new(r"(dmtr_[\w\d-]+)?\.?.+")?;
5656
let consumers = Default::default();
5757
let tiers = Default::default();
5858
let limiter = Default::default();
@@ -67,10 +67,8 @@ impl State {
6767
})
6868
}
6969

70-
pub async fn get_consumer(&self, network: &str, version: &str, key: &str) -> Option<Consumer> {
71-
let consumers = self.consumers.read().await.clone();
72-
let hash_key = format!("{}.{}.{}", network, version, key);
73-
consumers.get(&hash_key).cloned()
70+
pub async fn get_consumer(&self, key: &str) -> Option<Consumer> {
71+
self.consumers.read().await.clone().get(key).cloned()
7472
}
7573
}
7674

@@ -80,7 +78,8 @@ pub struct Consumer {
8078
port_name: String,
8179
tier: String,
8280
key: String,
83-
hash_key: String,
81+
network: String,
82+
version: String,
8483
}
8584
impl Display for Consumer {
8685
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -90,19 +89,19 @@ impl Display for Consumer {
9089
impl From<&OgmiosPort> for Consumer {
9190
fn from(value: &OgmiosPort) -> Self {
9291
let network = value.spec.network.to_string();
93-
let version = value.spec.version;
92+
let version = value.spec.version.to_string();
9493
let tier = value.spec.throughput_tier.to_string();
9594
let key = value.status.as_ref().unwrap().auth_token.clone();
9695
let namespace = value.metadata.namespace.as_ref().unwrap().clone();
9796
let port_name = value.name_any();
9897

99-
let hash_key = format!("{}.{}.{}", network, version, key);
10098
Self {
10199
namespace,
102100
port_name,
103101
tier,
104102
key,
105-
hash_key,
103+
network,
104+
version,
106105
}
107106
}
108107
}

Diff for: proxy/src/metrics.rs

+9-33
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tracing::{error, info, instrument};
1111

1212
use crate::proxy::ProxyRequest;
1313
use crate::utils::{full, ProxyResponse};
14-
use crate::{Consumer, State};
14+
use crate::State;
1515

1616
#[derive(Debug, Clone)]
1717
pub struct Metrics {
@@ -69,75 +69,51 @@ impl Metrics {
6969
}
7070

7171
pub fn count_ws_total_frame(&self, proxy_req: &ProxyRequest) {
72-
let consumer = proxy_req
73-
.consumer
74-
.as_ref()
75-
.unwrap_or(&Consumer::default())
76-
.clone();
77-
7872
self.ws_total_frame
7973
.with_label_values(&[
8074
&proxy_req.namespace,
8175
&proxy_req.instance,
8276
&proxy_req.host,
83-
&consumer.to_string(),
84-
&consumer.tier,
77+
&proxy_req.consumer.to_string(),
78+
&proxy_req.consumer.tier,
8579
])
8680
.inc()
8781
}
8882

8983
pub fn inc_ws_total_connection(&self, proxy_req: &ProxyRequest) {
90-
let consumer = proxy_req
91-
.consumer
92-
.as_ref()
93-
.unwrap_or(&Consumer::default())
94-
.clone();
95-
9684
self.ws_total_connection
9785
.with_label_values(&[
9886
&proxy_req.namespace,
9987
&proxy_req.instance,
10088
&proxy_req.host,
101-
&consumer.to_string(),
102-
&consumer.tier,
89+
&proxy_req.consumer.to_string(),
90+
&proxy_req.consumer.tier,
10391
])
10492
.inc()
10593
}
10694

10795
pub fn dec_ws_total_connection(&self, proxy_req: &ProxyRequest) {
108-
let consumer = proxy_req
109-
.consumer
110-
.as_ref()
111-
.unwrap_or(&Consumer::default())
112-
.clone();
113-
11496
self.ws_total_connection
11597
.with_label_values(&[
11698
&proxy_req.namespace,
11799
&proxy_req.instance,
118100
&proxy_req.host,
119-
&consumer.to_string(),
120-
&consumer.tier,
101+
&proxy_req.consumer.to_string(),
102+
&proxy_req.consumer.tier,
121103
])
122104
.dec()
123105
}
124106

125107
pub fn count_http_total_request(&self, proxy_req: &ProxyRequest, status_code: StatusCode) {
126-
let consumer = proxy_req
127-
.consumer
128-
.as_ref()
129-
.unwrap_or(&Consumer::default())
130-
.clone();
131-
132108
self.http_total_request
133109
.with_label_values(&[
134110
&proxy_req.namespace,
135111
&proxy_req.instance,
136112
&proxy_req.host,
137113
&status_code.as_u16().to_string(),
138114
&proxy_req.protocol.to_string(),
139-
&consumer.to_string(),
140-
&consumer.tier,
115+
&proxy_req.consumer.to_string(),
116+
&proxy_req.consumer.tier,
141117
])
142118
.inc()
143119
}

Diff for: proxy/src/proxy.rs

+8-22
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,13 @@ async fn handle(
101101
_ => {
102102
let proxy_req_result = ProxyRequest::new(&mut hyper_req, &state).await;
103103
if proxy_req_result.is_none() {
104-
return Ok(Response::builder()
105-
.status(StatusCode::BAD_GATEWAY)
106-
.body(full("Invalid hostname"))
107-
.unwrap());
108-
}
109-
110-
let proxy_req = proxy_req_result.unwrap();
111-
if proxy_req.consumer.is_none() {
112104
return Ok(Response::builder()
113105
.status(StatusCode::UNAUTHORIZED)
114106
.body(full("Unauthorized"))
115107
.unwrap());
116108
}
117109

110+
let proxy_req = proxy_req_result.unwrap();
118111
let response_result = match proxy_req.protocol {
119112
Protocol::Http => handle_http(hyper_req, &proxy_req).await,
120113
Protocol::Websocket => handle_websocket(hyper_req, &proxy_req, state.clone()).await,
@@ -199,11 +192,7 @@ async fn handle_websocket(
199192
while let Some(result) = client_incoming.next().await {
200193
match result {
201194
Ok(data) => {
202-
if let Err(err) = limiter(
203-
state.clone(),
204-
proxy_req.consumer.clone().as_ref().unwrap(),
205-
)
206-
.await
195+
if let Err(err) = limiter(state.clone(), &proxy_req.consumer).await
207196
{
208197
error!(error = err.to_string(), "Failed to run limiter");
209198
break;
@@ -276,7 +265,7 @@ pub struct ProxyRequest {
276265
pub namespace: String,
277266
pub host: String,
278267
pub instance: String,
279-
pub consumer: Option<Consumer>,
268+
pub consumer: Consumer,
280269
pub protocol: Protocol,
281270
}
282271
impl ProxyRequest {
@@ -285,13 +274,6 @@ impl ProxyRequest {
285274
let host_regex = host.clone();
286275

287276
let captures = state.host_regex.captures(&host_regex)?;
288-
let network = captures.get(2)?.as_str().to_string();
289-
let version = captures.get(3)?.as_str().to_string();
290-
291-
let instance = format!(
292-
"ogmios-{network}-{version}.{}:{}",
293-
state.config.ogmios_dns, state.config.ogmios_port
294-
);
295277

296278
let namespace = state.config.proxy_namespace.clone();
297279

@@ -314,7 +296,11 @@ impl ProxyRequest {
314296
}
315297

316298
let token = get_header(hyper_req, DMTR_API_KEY).unwrap_or_default();
317-
let consumer = state.get_consumer(&network, &version, &token).await;
299+
let consumer = state.get_consumer(&token).await?;
300+
let instance = format!(
301+
"ogmios-{}-{}.{}:{}",
302+
consumer.network, consumer.version, state.config.ogmios_dns, state.config.ogmios_port
303+
);
318304

319305
Some(Self {
320306
namespace,

0 commit comments

Comments
 (0)