Skip to content

Commit 74dd8c7

Browse files
chore: Limit by connection (#57)
* chore: Limit by connection * Update error message and proxy config * Reinstate logger message * Increase max connections for tier 3 * Fix 429 metrics
1 parent 82c5a41 commit 74dd8c7

File tree

4 files changed

+86
-26
lines changed

4 files changed

+86
-26
lines changed

Diff for: bootstrap/proxy/config.tf

+12-24
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,42 @@
11
locals {
22
tiers = [
33
{
4-
"name" = "0",
4+
"name" = "0",
5+
"max_connections" = 2
56
"rates" = [
67
{
78
"interval" = "1m",
8-
"limit" = floor(60 / var.replicas)
9-
},
10-
{
11-
"interval" = "1d",
12-
"limit" = floor(60 * 60 * 24 / var.replicas)
9+
"limit" = 500
1310
}
1411
]
1512
},
1613
{
17-
"name" = "1",
14+
"name" = "1",
15+
"max_connections" = 5
1816
"rates" = [
1917
{
2018
"interval" = "1m",
21-
"limit" = floor(300 / var.replicas)
22-
},
23-
{
24-
"interval" = "1d",
25-
"limit" = floor(300 * 60 * 24 / var.replicas)
19+
"limit" = 500
2620
}
2721
]
2822
},
2923
{
30-
"name" = "2",
24+
"name" = "2",
25+
"max_connections" = 250
3126
"rates" = [
3227
{
3328
"interval" = "1m",
34-
"limit" = floor(2400 / var.replicas)
35-
},
36-
{
37-
"interval" = "1d",
38-
"limit" = floor(2400 * 60 * 24 / var.replicas)
29+
"limit" = 500
3930
}
4031
]
4132
},
4233
{
43-
"name" = "3",
34+
"name" = "3",
35+
"max_connections" = 250
4436
"rates" = [
4537
{
4638
"interval" = "1m",
47-
"limit" = floor(4800 / var.replicas)
48-
},
49-
{
50-
"interval" = "1d",
51-
"limit" = floor(4800 * 60 * 24 / var.replicas)
39+
"limit" = 500
5240
}
5341
]
5442
}

Diff for: proxy/src/main.rs

+29
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub struct Consumer {
8080
key: String,
8181
network: String,
8282
version: String,
83+
active_connections: usize,
8384
}
8485
impl Display for Consumer {
8586
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -102,6 +103,34 @@ impl From<&OgmiosPort> for Consumer {
102103
key,
103104
network,
104105
version,
106+
active_connections: 0,
105107
}
106108
}
107109
}
110+
impl Consumer {
111+
pub async fn inc_connections(&self, state: Arc<State>) {
112+
state
113+
.consumers
114+
.write()
115+
.await
116+
.entry(self.key.clone())
117+
.and_modify(|consumer| consumer.active_connections += 1);
118+
}
119+
pub async fn dec_connections(&self, state: Arc<State>) {
120+
state
121+
.consumers
122+
.write()
123+
.await
124+
.entry(self.key.clone())
125+
.and_modify(|consumer| consumer.active_connections -= 1);
126+
}
127+
pub async fn get_active_connections(&self, state: Arc<State>) -> usize {
128+
state
129+
.consumers
130+
.read()
131+
.await
132+
.get(&self.key)
133+
.map(|consumer| consumer.active_connections)
134+
.unwrap_or_default()
135+
}
136+
}

Diff for: proxy/src/proxy.rs

+44-2
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,29 @@ async fn handle(
110110
let proxy_req = proxy_req_result.unwrap();
111111
let response_result = match proxy_req.protocol {
112112
Protocol::Http => handle_http(hyper_req, &proxy_req).await,
113-
Protocol::Websocket => handle_websocket(hyper_req, &proxy_req, state.clone()).await,
113+
Protocol::Websocket => {
114+
// Before handling the websocket connection, check if consumer has available
115+
// connections.
116+
let tiers = state.tiers.read().await.clone();
117+
match tiers.get(&proxy_req.consumer.tier) {
118+
Some(tier) => {
119+
if proxy_req.consumer.active_connections >= tier.max_connections {
120+
Ok(Response::builder()
121+
.status(StatusCode::TOO_MANY_REQUESTS)
122+
.body(full("Connection limit exceeded"))
123+
.unwrap())
124+
} else {
125+
handle_websocket(hyper_req, &proxy_req, state.clone()).await
126+
}
127+
}
128+
None => Ok(Response::builder()
129+
.status(StatusCode::INTERNAL_SERVER_ERROR)
130+
.body(full(
131+
"Invalid tier value. Contact support team for more information.",
132+
))
133+
.unwrap()),
134+
}
135+
}
114136
};
115137

116138
match &response_result {
@@ -187,14 +209,24 @@ async fn handle_websocket(
187209
let (mut instance_outgoing, instance_incoming) = instance_stream.split();
188210

189211
state.metrics.inc_ws_total_connection(&proxy_req);
212+
proxy_req.consumer.inc_connections(state.clone()).await;
213+
214+
let active_connections = proxy_req
215+
.consumer
216+
.get_active_connections(state.clone())
217+
.await;
218+
info!(
219+
consumer = proxy_req.consumer.to_string(),
220+
active_connections, "client connected"
221+
);
190222

191223
let client_in = async {
192224
while let Some(result) = client_incoming.next().await {
193225
match result {
194226
Ok(data) => {
195227
if let Err(err) = limiter(state.clone(), &proxy_req.consumer).await
196228
{
197-
error!(error = err.to_string(), "Failed to run limiter");
229+
error!(error = err.to_string(), "Failed to run limiter.");
198230
break;
199231
};
200232
if let Err(err) = instance_outgoing.send(data).await {
@@ -221,6 +253,16 @@ async fn handle_websocket(
221253
select(client_in, instance_in).await;
222254

223255
state.metrics.dec_ws_total_connection(&proxy_req);
256+
proxy_req.consumer.dec_connections(state.clone()).await;
257+
258+
let active_connections = proxy_req
259+
.consumer
260+
.get_active_connections(state.clone())
261+
.await;
262+
info!(
263+
consumer = proxy_req.consumer.to_string(),
264+
active_connections, "client disconnected"
265+
);
224266
}
225267
Err(err) => {
226268
error!(error = err.to_string(), "upgrade error");

Diff for: proxy/src/tiers.rs

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::State;
1212
pub struct Tier {
1313
pub name: String,
1414
pub rates: Vec<TierRate>,
15+
pub max_connections: usize,
1516
}
1617
#[derive(Debug, Clone, Deserialize)]
1718
pub struct TierRate {

0 commit comments

Comments
 (0)