Skip to content

Commit 1f19535

Browse files
Merge pull request #52 from code0-tech/33-added-host-check
Added Host Check
2 parents e1d2be4 + 1057f07 commit 1f19535

File tree

12 files changed

+190
-97
lines changed

12 files changed

+190
-97
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,6 @@ syn = "2.0"
2323
quote = "1.0"
2424
proc-macro2 = "1.0"
2525
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
26-
redis = { version = "0.30.0", features = [
27-
"aio",
28-
"tokio-comp",
29-
"async-std-comp",
30-
"json",
31-
] }
3226
uuid = { version = "1.16.0", features = ["v4"] }
3327
tonic = "0.13.0"
3428

adapter/rest/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ config = { workspace = true }
99
validator = { workspace = true }
1010
code0-flow = { workspace = true }
1111
tokio = { workspace = true }
12-
redis = { workspace = true }
1312
tucana = { workspace = true }
1413
serde_json = { workspace = true }
1514
serde = { workspace = true }

adapter/rest/src/.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ PORT=8081
22
REDIS_URL=redis://localhost:6379
33
RABBITMQ_URL=amqp://localhost:5672
44
AQUILA_URL=http://localhost:8080
5+
IS_STATIC=true

adapter/rest/src/main.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ pub mod store;
33
mod types;
44

55
use code0_flow::{
6-
flow_queue::service::RabbitmqClient, flow_store::connection::create_flow_store_connection,
6+
flow_queue::service::RabbitmqClient,
7+
flow_store::{
8+
connection::create_flow_store_connection,
9+
service::{FlowStoreService, FlowStoreServiceBase},
10+
},
711
};
812
use config::FromEnv;
913
use http::{
@@ -13,19 +17,22 @@ use http::{
1317
};
1418
use queue::queue::handle_connection;
1519
use std::{future::Future, pin::Pin, sync::Arc};
20+
use tokio::sync::Mutex;
1621
use types::{get_data_types, get_flow_types};
1722

1823
pub struct FlowConnectionHandler {
19-
flow_store: code0_flow::flow_store::connection::FlowStore,
24+
flow_store: Arc<Mutex<FlowStoreService>>,
2025
rabbitmq_client: Arc<RabbitmqClient>,
2126
}
2227

2328
impl FlowConnectionHandler {
2429
pub async fn new(config: &Config) -> Self {
2530
let flow_store = create_flow_store_connection(config.redis_url.clone()).await;
31+
let flow_store_service = Arc::new(Mutex::new(FlowStoreServiceBase::new(flow_store).await));
32+
2633
let rabbitmq_client = Arc::new(RabbitmqClient::new(config.rabbitmq_url.as_str()).await);
2734
FlowConnectionHandler {
28-
flow_store,
35+
flow_store: flow_store_service,
2936
rabbitmq_client,
3037
}
3138
}

adapter/rest/src/queue/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ pub mod queue {
22

33
use code0_flow::{
44
flow_queue::service::{Message, RabbitmqClient},
5-
flow_store::connection::FlowStore,
5+
flow_store::service::FlowStoreService,
66
};
77
use http::{request::HttpRequest, response::HttpResponse};
88
use std::{collections::HashMap, sync::Arc, time::Duration};
9+
use tokio::sync::Mutex;
910
use tucana::shared::{Struct, Value};
1011
use validator::{resolver::flow_resolver::resolve_flow, verify_flow};
1112

@@ -30,11 +31,11 @@ pub mod queue {
3031

3132
pub async fn handle_connection(
3233
mut request: HttpRequest,
33-
flow_store: FlowStore,
34+
flow_store: Arc<Mutex<FlowStoreService>>,
3435
rabbitmq_client: Arc<RabbitmqClient>,
3536
) -> Option<HttpResponse> {
3637
// Check if a flow exists for the given settings, return none if not exsist for http handler
37-
let flow_exists = check_flow_exists(&flow_store, &request).await;
38+
let flow_exists = check_flow_exists(flow_store, &request).await;
3839

3940
let flow_result = match flow_exists {
4041
Some(flow) => flow,

adapter/rest/src/store/mod.rs

Lines changed: 46 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,110 +1,75 @@
11
pub mod store {
2-
use code0_flow::flow_store::connection::FlowStore;
2+
use std::sync::Arc;
3+
4+
use code0_flow::flow_store::service::{FlowStoreService, FlowStoreServiceBase};
35
use http::request::HttpRequest;
4-
use redis::{AsyncCommands, JsonAsyncCommands};
56
use regex::Regex;
6-
use tucana::shared::{value::Kind, Flow, Struct};
7+
use tokio::sync::Mutex;
8+
use tucana::shared::{value::Kind, Flow, FlowSetting};
79

810
//The regex is required for later purposes --> resolve the parameter of the url
911
pub struct FlowExistResult {
1012
pub flow: Flow,
1113
pub regex_pattern: Regex,
1214
}
1315

14-
pub async fn check_flow_exists(
15-
flow_store: &FlowStore,
16-
request: &HttpRequest,
17-
) -> Option<FlowExistResult> {
18-
let mut store = flow_store.lock().await;
19-
20-
// Get all keys from Redis
21-
let keys: Vec<String> = store.keys("*").await.unwrap_or_default();
22-
let mut result: Vec<Flow> = Vec::new();
23-
24-
// Retrieve JSON values for each key
25-
for key in keys {
26-
if let Ok(json_value) = store.json_get::<&String, &str, String>(&key, "$").await {
27-
let flow = match serde_json::from_str::<Vec<Flow>>(json_value.as_str()) {
28-
Ok(flow) => flow[0].clone(),
29-
Err(_) => continue,
30-
};
31-
32-
result.push(flow);
16+
fn extract_field(settings: &[FlowSetting], def_key: &str, field_name: &str) -> Option<String> {
17+
settings.iter().find_map(|setting| {
18+
let def = setting.definition.as_ref()?;
19+
if def.key != def_key {
20+
return None;
3321
}
34-
}
35-
36-
for flow in result {
37-
let mut correct_url = false;
38-
let mut correct_method = false;
39-
let mut flow_regex: Option<Regex> = None;
40-
41-
for setting in flow.settings.clone() {
42-
let definition = match setting.definition {
43-
Some(definition) => definition,
44-
None => continue,
45-
};
46-
47-
if definition.key == "HTTP_METHOD" {
48-
let object: Struct = match setting.object {
49-
Some(object) => object,
50-
None => continue,
51-
};
5222

53-
for field in object.fields {
54-
if field.0 == "method" {
55-
if let Some(Kind::StringValue(method)) = field.1.kind {
56-
if method == request.method.to_string() {
57-
correct_method = true;
58-
}
59-
}
60-
}
23+
let obj = setting.object.as_ref()?;
24+
obj.fields.iter().find_map(|(k, v)| {
25+
if k == field_name {
26+
if let Some(Kind::StringValue(s)) = &v.kind {
27+
return Some(s.clone());
6128
}
62-
63-
continue;
6429
}
30+
None
31+
})
32+
})
33+
}
6534

66-
if definition.key == "URL" {
67-
let object: Struct = match setting.object {
68-
Some(object) => object,
69-
None => continue,
70-
};
35+
pub async fn check_flow_exists(
36+
flow_store: Arc<Mutex<FlowStoreService>>,
37+
request: &HttpRequest,
38+
) -> Option<FlowExistResult> {
39+
let flows = {
40+
let mut store = flow_store.lock().await;
41+
let pattern = format!("*::*::{}::{}", request.host, request.method.to_string());
42+
let result = store.query_flows(pattern).await;
43+
44+
match result {
45+
Ok(flows) => flows.flows,
46+
Err(_) => return None,
47+
}
48+
};
7149

72-
for field in object.fields {
73-
if field.0 == "url" {
74-
if let Some(Kind::StringValue(regex_str)) = field.1.kind {
75-
let regex = match regex::Regex::new(&regex_str) {
76-
Ok(regex) => regex,
77-
Err(err) => {
78-
log::error!("Failed to compile regex: {}", err);
79-
continue;
80-
}
81-
};
50+
for flow in flows {
51+
let url = extract_field(&flow.settings, "HTTP_URL", "url");
8252

83-
if regex.is_match(&request.path) {
84-
correct_url = true;
85-
flow_regex = Some(regex);
86-
}
87-
}
88-
}
89-
}
53+
let regex_str = match url {
54+
Some(string) => string,
55+
None => continue,
56+
};
9057

58+
let regex = match regex::Regex::new(&regex_str) {
59+
Ok(regex) => regex,
60+
Err(err) => {
61+
log::error!("Failed to compile regex: {}", err);
9162
continue;
9263
}
93-
}
94-
95-
if correct_method && correct_url {
96-
let regex_pattern = match flow_regex {
97-
Some(regex) => regex.clone(),
98-
None => continue,
99-
};
64+
};
10065

66+
if regex.is_match(&request.path) {
10167
return Some(FlowExistResult {
10268
flow,
103-
regex_pattern,
69+
regex_pattern: regex,
10470
});
10571
}
10672
}
107-
10873
None
10974
}
11075
}

crates/http/src/request.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ pub struct HttpRequest {
106106
pub method: HttpOption,
107107
pub path: String,
108108
pub version: String,
109+
pub host: String,
109110
pub headers: HeaderMap,
110111

111112
/// The body of the request.
@@ -278,10 +279,23 @@ fn parse_request(
278279
None
279280
};
280281

282+
let host = {
283+
match header_map.get("host") {
284+
Some(host) => host.clone(),
285+
None => {
286+
return Err(HttpResponse::bad_request(
287+
"Missing Host in Headers!".to_string(),
288+
HashMap::new(),
289+
));
290+
}
291+
}
292+
};
293+
281294
Ok(HttpRequest {
282295
method,
283296
path: path.to_string(),
284297
version: version.to_string(),
298+
host,
285299
headers: header_map,
286300
body,
287301
})

helper/flow/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use code0_flow::flow_store::{
66
};
77
use tokio::sync::Mutex;
88
use tucana::shared::Flows;
9-
use typed_flows::add_flow::get_add_rest_flow;
9+
use typed_flows::{add_flow::get_add_rest_flow, mutiply_flow::get_multiply_rest_flow};
1010

1111
pub mod typed_data_types;
1212
pub mod typed_flows;
@@ -24,7 +24,7 @@ async fn main() {
2424
let mut client = flow_store_client.lock().await;
2525
let _ = client
2626
.insert_flows(Flows {
27-
flows: vec![get_add_rest_flow()],
27+
flows: vec![get_add_rest_flow(), get_multiply_rest_flow()],
2828
})
2929
.await;
3030
}

helper/flow/src/typed_flows/add_flow.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@ pub fn get_add_rest_flow() -> Flow {
3232
input_type_identifier: Some(String::from("HTTP_REQUEST")),
3333
return_type_identifier: Some(String::from("HTTP_RESPONSE")),
3434
settings: vec![
35+
FlowSetting {
36+
definition: Some(FlowSettingDefinition {
37+
id: String::from("14234234"),
38+
key: String::from("HTTP_HOST"),
39+
}),
40+
object: Some(Struct {
41+
fields: {
42+
let mut map = HashMap::new();
43+
map.insert(String::from("host"), get_string_value("localhost:8081"));
44+
map
45+
},
46+
}),
47+
},
3548
FlowSetting {
3649
definition: Some(FlowSettingDefinition {
3750
id: String::from("1424525"),

helper/flow/src/typed_flows/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod add_flow;
2+
pub mod mutiply_flow;

0 commit comments

Comments
 (0)