Skip to content

Added Host Check #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ syn = "2.0"
quote = "1.0"
proc-macro2 = "1.0"
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
redis = { version = "0.30.0", features = [
"aio",
"tokio-comp",
"async-std-comp",
"json",
] }
uuid = { version = "1.16.0", features = ["v4"] }
tonic = "0.13.0"

Expand Down
1 change: 0 additions & 1 deletion adapter/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ config = { workspace = true }
validator = { workspace = true }
code0-flow = { workspace = true }
tokio = { workspace = true }
redis = { workspace = true }
tucana = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions adapter/rest/src/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ PORT=8081
REDIS_URL=redis://localhost:6379
RABBITMQ_URL=amqp://localhost:5672
AQUILA_URL=http://localhost:8080
IS_STATIC=true
13 changes: 10 additions & 3 deletions adapter/rest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ pub mod store;
mod types;

use code0_flow::{
flow_queue::service::RabbitmqClient, flow_store::connection::create_flow_store_connection,
flow_queue::service::RabbitmqClient,
flow_store::{
connection::create_flow_store_connection,
service::{FlowStoreService, FlowStoreServiceBase},
},
};
use config::FromEnv;
use http::{
Expand All @@ -13,19 +17,22 @@ use http::{
};
use queue::queue::handle_connection;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::Mutex;
use types::{get_data_types, get_flow_types};

pub struct FlowConnectionHandler {
flow_store: code0_flow::flow_store::connection::FlowStore,
flow_store: Arc<Mutex<FlowStoreService>>,
rabbitmq_client: Arc<RabbitmqClient>,
}

impl FlowConnectionHandler {
pub async fn new(config: &Config) -> Self {
let flow_store = create_flow_store_connection(config.redis_url.clone()).await;
let flow_store_service = Arc::new(Mutex::new(FlowStoreServiceBase::new(flow_store).await));

let rabbitmq_client = Arc::new(RabbitmqClient::new(config.rabbitmq_url.as_str()).await);
FlowConnectionHandler {
flow_store,
flow_store: flow_store_service,
rabbitmq_client,
}
}
Expand Down
7 changes: 4 additions & 3 deletions adapter/rest/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ pub mod queue {

use code0_flow::{
flow_queue::service::{Message, RabbitmqClient},
flow_store::connection::FlowStore,
flow_store::service::FlowStoreService,
};
use http::{request::HttpRequest, response::HttpResponse};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tucana::shared::{Struct, Value};
use validator::{resolver::flow_resolver::resolve_flow, verify_flow};

Expand All @@ -30,11 +31,11 @@ pub mod queue {

pub async fn handle_connection(
mut request: HttpRequest,
flow_store: FlowStore,
flow_store: Arc<Mutex<FlowStoreService>>,
rabbitmq_client: Arc<RabbitmqClient>,
) -> Option<HttpResponse> {
// Check if a flow exists for the given settings, return none if not exsist for http handler
let flow_exists = check_flow_exists(&flow_store, &request).await;
let flow_exists = check_flow_exists(flow_store, &request).await;

let flow_result = match flow_exists {
Some(flow) => flow,
Expand Down
127 changes: 46 additions & 81 deletions adapter/rest/src/store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,110 +1,75 @@
pub mod store {
use code0_flow::flow_store::connection::FlowStore;
use std::sync::Arc;

use code0_flow::flow_store::service::{FlowStoreService, FlowStoreServiceBase};
use http::request::HttpRequest;
use redis::{AsyncCommands, JsonAsyncCommands};
use regex::Regex;
use tucana::shared::{value::Kind, Flow, Struct};
use tokio::sync::Mutex;
use tucana::shared::{value::Kind, Flow, FlowSetting};

//The regex is required for later purposes --> resolve the parameter of the url
pub struct FlowExistResult {
pub flow: Flow,
pub regex_pattern: Regex,
}

pub async fn check_flow_exists(
flow_store: &FlowStore,
request: &HttpRequest,
) -> Option<FlowExistResult> {
let mut store = flow_store.lock().await;

// Get all keys from Redis
let keys: Vec<String> = store.keys("*").await.unwrap_or_default();
let mut result: Vec<Flow> = Vec::new();

// Retrieve JSON values for each key
for key in keys {
if let Ok(json_value) = store.json_get::<&String, &str, String>(&key, "$").await {
let flow = match serde_json::from_str::<Vec<Flow>>(json_value.as_str()) {
Ok(flow) => flow[0].clone(),
Err(_) => continue,
};

result.push(flow);
fn extract_field(settings: &[FlowSetting], def_key: &str, field_name: &str) -> Option<String> {
settings.iter().find_map(|setting| {
let def = setting.definition.as_ref()?;
if def.key != def_key {
return None;
}
}

for flow in result {
let mut correct_url = false;
let mut correct_method = false;
let mut flow_regex: Option<Regex> = None;

for setting in flow.settings.clone() {
let definition = match setting.definition {
Some(definition) => definition,
None => continue,
};

if definition.key == "HTTP_METHOD" {
let object: Struct = match setting.object {
Some(object) => object,
None => continue,
};

for field in object.fields {
if field.0 == "method" {
if let Some(Kind::StringValue(method)) = field.1.kind {
if method == request.method.to_string() {
correct_method = true;
}
}
}
let obj = setting.object.as_ref()?;
obj.fields.iter().find_map(|(k, v)| {
if k == field_name {
if let Some(Kind::StringValue(s)) = &v.kind {
return Some(s.clone());
}

continue;
}
None
})
})
}

if definition.key == "URL" {
let object: Struct = match setting.object {
Some(object) => object,
None => continue,
};
pub async fn check_flow_exists(
flow_store: Arc<Mutex<FlowStoreService>>,
request: &HttpRequest,
) -> Option<FlowExistResult> {
let flows = {
let mut store = flow_store.lock().await;
let pattern = format!("*::*::{}::{}", request.host, request.method.to_string());
let result = store.query_flows(pattern).await;

match result {
Ok(flows) => flows.flows,
Err(_) => return None,
}
};

for field in object.fields {
if field.0 == "url" {
if let Some(Kind::StringValue(regex_str)) = field.1.kind {
let regex = match regex::Regex::new(&regex_str) {
Ok(regex) => regex,
Err(err) => {
log::error!("Failed to compile regex: {}", err);
continue;
}
};
for flow in flows {
let url = extract_field(&flow.settings, "HTTP_URL", "url");

if regex.is_match(&request.path) {
correct_url = true;
flow_regex = Some(regex);
}
}
}
}
let regex_str = match url {
Some(string) => string,
None => continue,
};

let regex = match regex::Regex::new(&regex_str) {
Ok(regex) => regex,
Err(err) => {
log::error!("Failed to compile regex: {}", err);
continue;
}
}

if correct_method && correct_url {
let regex_pattern = match flow_regex {
Some(regex) => regex.clone(),
None => continue,
};
};

if regex.is_match(&request.path) {
return Some(FlowExistResult {
flow,
regex_pattern,
regex_pattern: regex,
});
}
}

None
}
}
14 changes: 14 additions & 0 deletions crates/http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub struct HttpRequest {
pub method: HttpOption,
pub path: String,
pub version: String,
pub host: String,
pub headers: HeaderMap,

/// The body of the request.
Expand Down Expand Up @@ -278,10 +279,23 @@ fn parse_request(
None
};

let host = {
match header_map.get("host") {
Some(host) => host.clone(),
None => {
return Err(HttpResponse::bad_request(
"Missing Host in Headers!".to_string(),
HashMap::new(),
));
}
}
};

Ok(HttpRequest {
method,
path: path.to_string(),
version: version.to_string(),
host,
headers: header_map,
body,
})
Expand Down
4 changes: 2 additions & 2 deletions helper/flow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use code0_flow::flow_store::{
};
use tokio::sync::Mutex;
use tucana::shared::Flows;
use typed_flows::add_flow::get_add_rest_flow;
use typed_flows::{add_flow::get_add_rest_flow, mutiply_flow::get_multiply_rest_flow};

pub mod typed_data_types;
pub mod typed_flows;
Expand All @@ -24,7 +24,7 @@ async fn main() {
let mut client = flow_store_client.lock().await;
let _ = client
.insert_flows(Flows {
flows: vec![get_add_rest_flow()],
flows: vec![get_add_rest_flow(), get_multiply_rest_flow()],
})
.await;
}
13 changes: 13 additions & 0 deletions helper/flow/src/typed_flows/add_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ pub fn get_add_rest_flow() -> Flow {
input_type_identifier: Some(String::from("HTTP_REQUEST")),
return_type_identifier: Some(String::from("HTTP_RESPONSE")),
settings: vec![
FlowSetting {
definition: Some(FlowSettingDefinition {
id: String::from("14234234"),
key: String::from("HTTP_HOST"),
}),
object: Some(Struct {
fields: {
let mut map = HashMap::new();
map.insert(String::from("host"), get_string_value("localhost:8081"));
map
},
}),
},
FlowSetting {
definition: Some(FlowSettingDefinition {
id: String::from("1424525"),
Expand Down
1 change: 1 addition & 0 deletions helper/flow/src/typed_flows/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod add_flow;
pub mod mutiply_flow;
Loading