Skip to content

Commit 9726376

Browse files
committed
feat: adjustemnts to latest tucana version
2 parents ea8962c + 1f19535 commit 9726376

File tree

16 files changed

+124
-368
lines changed

16 files changed

+124
-368
lines changed

Cargo.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,6 @@ syn = "2.0"
2323
quote = "1.0"
2424
proc-macro2 = "1.0"
2525
tokio = { version = "1.44.1", features = ["rt-multi-thread"] }
26-
redis = { version = "0.30.0", features = [
27-
"aio",
28-
"tokio-comp",
29-
"async-std-comp",
30-
"json",
31-
] }
3226
uuid = { version = "1.16.0", features = ["v4"] }
3327
tonic = "0.13.0"
3428

adapter/rest/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ config = { workspace = true }
99
validator = { workspace = true }
1010
code0-flow = { workspace = true }
1111
tokio = { workspace = true }
12-
redis = { workspace = true }
1312
tucana = { workspace = true }
1413
serde_json = { workspace = true }
1514
serde = { workspace = true }

adapter/rest/src/.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ PORT=8081
22
REDIS_URL=redis://localhost:6379
33
RABBITMQ_URL=amqp://localhost:5672
44
AQUILA_URL=http://localhost:8080
5+
IS_STATIC=true

adapter/rest/src/main.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ pub mod store;
33
mod types;
44

55
use code0_flow::{
6-
flow_queue::service::RabbitmqClient, flow_store::connection::create_flow_store_connection,
6+
flow_queue::service::RabbitmqClient,
7+
flow_store::{
8+
connection::create_flow_store_connection,
9+
service::{FlowStoreService, FlowStoreServiceBase},
10+
},
711
};
812
use config::FromEnv;
913
use http::{
@@ -13,19 +17,22 @@ use http::{
1317
};
1418
use queue::queue::handle_connection;
1519
use std::{future::Future, pin::Pin, sync::Arc};
20+
use tokio::sync::Mutex;
1621
use types::{get_data_types, get_flow_types};
1722

1823
pub struct FlowConnectionHandler {
19-
flow_store: code0_flow::flow_store::connection::FlowStore,
24+
flow_store: Arc<Mutex<FlowStoreService>>,
2025
rabbitmq_client: Arc<RabbitmqClient>,
2126
}
2227

2328
impl FlowConnectionHandler {
2429
pub async fn new(config: &Config) -> Self {
2530
let flow_store = create_flow_store_connection(config.redis_url.clone()).await;
31+
let flow_store_service = Arc::new(Mutex::new(FlowStoreServiceBase::new(flow_store).await));
32+
2633
let rabbitmq_client = Arc::new(RabbitmqClient::new(config.rabbitmq_url.as_str()).await);
2734
FlowConnectionHandler {
28-
flow_store,
35+
flow_store: flow_store_service,
2936
rabbitmq_client,
3037
}
3138
}

adapter/rest/src/queue/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ pub mod queue {
22

33
use code0_flow::{
44
flow_queue::service::{Message, RabbitmqClient},
5-
flow_store::connection::FlowStore,
5+
flow_store::service::FlowStoreService,
66
};
77
use http::{request::HttpRequest, response::HttpResponse};
88
use std::{collections::HashMap, sync::Arc, time::Duration};
9+
use tokio::sync::Mutex;
910
use tucana::shared::{Struct, Value};
1011
use validator::{resolver::flow_resolver::resolve_flow, verify_flow};
1112

@@ -30,11 +31,11 @@ pub mod queue {
3031

3132
pub async fn handle_connection(
3233
mut request: HttpRequest,
33-
flow_store: FlowStore,
34+
flow_store: Arc<Mutex<FlowStoreService>>,
3435
rabbitmq_client: Arc<RabbitmqClient>,
3536
) -> Option<HttpResponse> {
3637
// Check if a flow exists for the given settings, return none if not exsist for http handler
37-
let flow_exists = check_flow_exists(&flow_store, &request).await;
38+
let flow_exists = check_flow_exists(flow_store, &request).await;
3839

3940
let flow_result = match flow_exists {
4041
Some(flow) => flow,

adapter/rest/src/store/mod.rs

Lines changed: 46 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,110 +1,75 @@
11
pub mod store {
2-
use code0_flow::flow_store::connection::FlowStore;
2+
use std::sync::Arc;
3+
4+
use code0_flow::flow_store::service::{FlowStoreService, FlowStoreServiceBase};
35
use http::request::HttpRequest;
4-
use redis::{AsyncCommands, JsonAsyncCommands};
56
use regex::Regex;
6-
use tucana::shared::{value::Kind, Flow, Struct};
7+
use tokio::sync::Mutex;
8+
use tucana::shared::{value::Kind, Flow, FlowSetting};
79

810
//The regex is required for later purposes --> resolve the parameter of the url
911
pub struct FlowExistResult {
1012
pub flow: Flow,
1113
pub regex_pattern: Regex,
1214
}
1315

14-
pub async fn check_flow_exists(
15-
flow_store: &FlowStore,
16-
request: &HttpRequest,
17-
) -> Option<FlowExistResult> {
18-
let mut store = flow_store.lock().await;
19-
20-
// Get all keys from Redis
21-
let keys: Vec<String> = store.keys("*").await.unwrap_or_default();
22-
let mut result: Vec<Flow> = Vec::new();
23-
24-
// Retrieve JSON values for each key
25-
for key in keys {
26-
if let Ok(json_value) = store.json_get::<&String, &str, String>(&key, "$").await {
27-
let flow = match serde_json::from_str::<Vec<Flow>>(json_value.as_str()) {
28-
Ok(flow) => flow[0].clone(),
29-
Err(_) => continue,
30-
};
31-
32-
result.push(flow);
16+
fn extract_field(settings: &[FlowSetting], def_key: &str, field_name: &str) -> Option<String> {
17+
settings.iter().find_map(|setting| {
18+
let def = setting.definition.as_ref()?;
19+
if def.key != def_key {
20+
return None;
3321
}
34-
}
35-
36-
for flow in result {
37-
let mut correct_url = false;
38-
let mut correct_method = false;
39-
let mut flow_regex: Option<Regex> = None;
40-
41-
for setting in flow.settings.clone() {
42-
let definition = match setting.definition {
43-
Some(definition) => definition,
44-
None => continue,
45-
};
46-
47-
if definition.key == "HTTP_METHOD" {
48-
let object: Struct = match setting.object {
49-
Some(object) => object,
50-
None => continue,
51-
};
5222

53-
for field in object.fields {
54-
if field.0 == "method" {
55-
if let Some(Kind::StringValue(method)) = field.1.kind {
56-
if method == request.method.to_string() {
57-
correct_method = true;
58-
}
59-
}
60-
}
23+
let obj = setting.object.as_ref()?;
24+
obj.fields.iter().find_map(|(k, v)| {
25+
if k == field_name {
26+
if let Some(Kind::StringValue(s)) = &v.kind {
27+
return Some(s.clone());
6128
}
62-
63-
continue;
6429
}
30+
None
31+
})
32+
})
33+
}
6534

66-
if definition.key == "URL" {
67-
let object: Struct = match setting.object {
68-
Some(object) => object,
69-
None => continue,
70-
};
35+
pub async fn check_flow_exists(
36+
flow_store: Arc<Mutex<FlowStoreService>>,
37+
request: &HttpRequest,
38+
) -> Option<FlowExistResult> {
39+
let flows = {
40+
let mut store = flow_store.lock().await;
41+
let pattern = format!("*::*::{}::{}", request.host, request.method.to_string());
42+
let result = store.query_flows(pattern).await;
43+
44+
match result {
45+
Ok(flows) => flows.flows,
46+
Err(_) => return None,
47+
}
48+
};
7149

72-
for field in object.fields {
73-
if field.0 == "url" {
74-
if let Some(Kind::StringValue(regex_str)) = field.1.kind {
75-
let regex = match regex::Regex::new(&regex_str) {
76-
Ok(regex) => regex,
77-
Err(err) => {
78-
log::error!("Failed to compile regex: {}", err);
79-
continue;
80-
}
81-
};
50+
for flow in flows {
51+
let url = extract_field(&flow.settings, "HTTP_URL", "url");
8252

83-
if regex.is_match(&request.path) {
84-
correct_url = true;
85-
flow_regex = Some(regex);
86-
}
87-
}
88-
}
89-
}
53+
let regex_str = match url {
54+
Some(string) => string,
55+
None => continue,
56+
};
9057

58+
let regex = match regex::Regex::new(&regex_str) {
59+
Ok(regex) => regex,
60+
Err(err) => {
61+
log::error!("Failed to compile regex: {}", err);
9162
continue;
9263
}
93-
}
94-
95-
if correct_method && correct_url {
96-
let regex_pattern = match flow_regex {
97-
Some(regex) => regex.clone(),
98-
None => continue,
99-
};
64+
};
10065

66+
if regex.is_match(&request.path) {
10167
return Some(FlowExistResult {
10268
flow,
103-
regex_pattern,
69+
regex_pattern: regex,
10470
});
10571
}
10672
}
107-
10873
None
10974
}
11075
}

0 commit comments

Comments
 (0)