Skip to content

Commit f8d6a82

Browse files
committed
Merge branch 'main' into di/stream-sql-to-s3
2 parents d834ca5 + 9e2cdad commit f8d6a82

File tree

16 files changed

+203
-135
lines changed

16 files changed

+203
-135
lines changed

.cursor/rules/rust-best-practices.mdc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@ description:
33
globs: backend/**/*.rs
44
alwaysApply: false
55
---
6-
---
7-
description: Rust best practices for the Windmill backend, covering code organization, error handling, performance optimizations, and common patterns to follow when adding new code.
8-
globs: **/*.rs
9-
---
106
# Windmill Backend - Rust Best Practices
117

128
## Project Structure

backend/src/monitor.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ use windmill_api::{
2828
SCIM_TOKEN,
2929
};
3030

31-
#[cfg(feature = "enterprise")]
32-
use windmill_common::ee::{jobs_waiting_alerts, worker_groups_alerts};
3331
#[cfg(feature = "enterprise")]
3432
use windmill_common::ee::low_disk_alerts;
33+
#[cfg(feature = "enterprise")]
34+
use windmill_common::ee::{jobs_waiting_alerts, worker_groups_alerts};
3535

3636
#[cfg(feature = "oauth2")]
3737
use windmill_common::global_settings::OAUTH_SETTING;
@@ -60,13 +60,9 @@ use windmill_common::{
6060
server::load_smtp_config,
6161
tracing_init::JSON_FMT,
6262
users::truncate_token,
63-
utils::empty_string_as_none,
64-
utils::{now_from_db, rd_string, report_critical_error, Mode},
63+
utils::{empty_string_as_none, now_from_db, rd_string, report_critical_error, Mode},
6564
worker::{
66-
load_worker_config, reload_custom_tags_setting, store_pull_query,
67-
store_suspended_pull_query, update_min_version, Connection, DEFAULT_TAGS_PER_WORKSPACE,
68-
DEFAULT_TAGS_WORKSPACES, INDEXER_CONFIG, SCRIPT_TOKEN_EXPIRY, SMTP_CONFIG, TMP_DIR,
69-
WORKER_CONFIG, WORKER_GROUP,
65+
load_env_vars, load_init_bash_from_env, load_whitelist_env_vars_from_env, load_worker_config, reload_custom_tags_setting, store_pull_query, store_suspended_pull_query, update_min_version, Connection, WorkerConfig, DEFAULT_TAGS_PER_WORKSPACE, DEFAULT_TAGS_WORKSPACES, INDEXER_CONFIG, SCRIPT_TOKEN_EXPIRY, SMTP_CONFIG, TMP_DIR, WORKER_CONFIG, WORKER_GROUP
7066
},
7167
KillpillSender, BASE_URL, CRITICAL_ALERTS_ON_DB_OVERSIZE, CRITICAL_ALERT_MUTE_UI_ENABLED,
7268
CRITICAL_ERROR_CHANNELS, DB, DEFAULT_HUB_BASE_URL, HUB_BASE_URL, JOB_RETENTION_SECS,
@@ -203,10 +199,23 @@ pub async fn initial_load(
203199
}
204200
Connection::Http(_) => {
205201
// TODO: reload worker config from http
206-
WORKER_CONFIG.write().await.worker_tags = DECODED_AGENT_TOKEN
207-
.as_ref()
208-
.map(|x| x.tags.clone())
209-
.unwrap_or_default();
202+
let mut config = WORKER_CONFIG.write().await;
203+
*config = WorkerConfig {
204+
worker_tags: DECODED_AGENT_TOKEN
205+
.as_ref()
206+
.map(|x| x.tags.clone())
207+
.unwrap_or_default(),
208+
env_vars: load_env_vars(
209+
load_whitelist_env_vars_from_env(),
210+
&std::collections::HashMap::new(),
211+
),
212+
priority_tags_sorted: vec![],
213+
dedicated_worker: None,
214+
init_bash: load_init_bash_from_env(),
215+
cache_clear: None,
216+
additional_python_paths: None,
217+
pip_local_dependencies: None,
218+
};
210219
}
211220
}
212221
}

backend/windmill-api/src/args.rs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ where
355355
let uri = request.uri();
356356
let request_query = Query::<RequestQuery>::try_from_uri(uri).unwrap().0;
357357
let headers = build_headers(&headers_map, request_query.include_header, is_http_trigger);
358-
let query_decode = DecodeQueries::from_uri(uri);
358+
let query_decode = DecodeQueries::from_uri(uri, is_http_trigger);
359359
let mut query = HashMap::new();
360360
if let Some(DecodeQueries(queries)) = query_decode {
361361
query.extend(queries);
@@ -500,37 +500,49 @@ where
500500
type Rejection = Response;
501501

502502
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
503-
Ok(DecodeQueries::from_uri(&parts.uri).unwrap_or_else(|| DecodeQueries(HashMap::new())))
503+
Ok(DecodeQueries::from_uri(&parts.uri, false)
504+
.unwrap_or_else(|| DecodeQueries(HashMap::new())))
504505
}
505506
}
506507

507508
impl DecodeQueries {
508-
pub fn from_uri(uri: &Uri) -> Option<Self> {
509+
pub fn from_uri(uri: &Uri, is_http_trigger: bool) -> Option<Self> {
509510
let query = uri.query();
510511
if query.is_none() {
511512
return None;
512513
}
513514
let query = query.unwrap();
514-
let include_query = serde_urlencoded::from_str::<IncludeQuery>(query)
515-
.map(|x| x.include_query)
516-
.ok()
517-
.flatten()
518-
.unwrap_or_default();
519-
let parse_query_args = include_query
520-
.split(",")
521-
.map(|s| s.to_string())
522-
.collect::<Vec<_>>();
523-
let mut args = HashMap::new();
524-
if !parse_query_args.is_empty() {
515+
if is_http_trigger {
525516
let queries =
526517
serde_urlencoded::from_str::<HashMap<String, String>>(query).unwrap_or_default();
527-
parse_query_args.iter().for_each(|h| {
528-
if let Some(v) = queries.get(h) {
529-
args.insert(h.to_string(), to_raw_value(v));
530-
}
531-
});
518+
Some(DecodeQueries(
519+
queries
520+
.into_iter()
521+
.map(|(k, v)| (k, to_raw_value(&v)))
522+
.collect(),
523+
))
524+
} else {
525+
let include_query = serde_urlencoded::from_str::<IncludeQuery>(query)
526+
.map(|x| x.include_query)
527+
.ok()
528+
.flatten()
529+
.unwrap_or_default();
530+
let parse_query_args = include_query
531+
.split(",")
532+
.map(|s| s.to_string())
533+
.collect::<Vec<_>>();
534+
let mut args = HashMap::new();
535+
if !parse_query_args.is_empty() {
536+
let queries = serde_urlencoded::from_str::<HashMap<String, String>>(query)
537+
.unwrap_or_default();
538+
parse_query_args.iter().for_each(|h| {
539+
if let Some(v) = queries.get(h) {
540+
args.insert(h.to_string(), to_raw_value(v));
541+
}
542+
});
543+
}
544+
Some(DecodeQueries(args))
532545
}
533-
Some(DecodeQueries(args))
534546
}
535547
}
536548

backend/windmill-api/src/http_trigger_args.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,10 @@ impl HttpTriggerArgs {
158158
wrap_body: bool,
159159
) -> Result<PushArgsOwned, Error> {
160160
let mut extra = HashMap::new();
161-
162-
extra.insert(
163-
"wm_trigger".to_string(),
161+
let mut wm_trigger = HashMap::new();
162+
wm_trigger.insert("kind".to_string(), to_raw_value(&"http".to_string()));
163+
wm_trigger.insert(
164+
"http".to_string(),
164165
to_raw_value(&HttpTriggerWmTrigger {
165166
route: route_path,
166167
path: called_path,
@@ -170,6 +171,7 @@ impl HttpTriggerArgs {
170171
headers: &self.0.metadata.headers,
171172
}),
172173
);
174+
extra.insert("wm_trigger".to_string(), to_raw_value(&wm_trigger));
173175

174176
let mut args = self.to_main_args(wrap_body)?;
175177

backend/windmill-common/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ pub enum Error {
6666
DatabaseMigration(#[from] MigrateError),
6767
#[error("Non-zero exit status for {0}: {1}")]
6868
ExitStatus(String, i32),
69+
#[error("ExecutionRawError: {0}")]
70+
ExecutionRawError(Box<serde_json::value::RawValue>),
6971
#[error("Error: {error:#} @{location:#}")]
7072
Anyhow { error: anyhow::Error, location: String },
7173
#[error("Error: {0:#?}")]

backend/windmill-common/src/worker.rs

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -382,10 +382,7 @@ fn normalize_path(path: &Path) -> PathBuf {
382382
ret
383383
}
384384

385-
pub fn is_allowed_file_location(
386-
job_dir: &str,
387-
user_defined_path: &str,
388-
) -> error::Result<PathBuf> {
385+
pub fn is_allowed_file_location(job_dir: &str, user_defined_path: &str) -> error::Result<PathBuf> {
389386
let job_dir = Path::new(job_dir);
390387
let user_path = PathBuf::from(user_defined_path);
391388

@@ -1398,18 +1395,66 @@ pub async fn load_worker_config(
13981395
tracing::debug!("Custom tags priority set: {:?}", priority_tags_sorted);
13991396

14001397
let env_vars_static = config.env_vars_static.unwrap_or_default().clone();
1401-
let resolved_env_vars: HashMap<String, String> = env_vars_static
1402-
.keys()
1403-
.map(|x| x.to_string())
1404-
.chain(config.env_vars_allowlist.unwrap_or_default())
1405-
.chain(
1406-
std::env::var("WHITELIST_ENVS")
1407-
.ok()
1408-
.map(|x| x.split(',').map(|x| x.to_string()).collect_vec())
1409-
.unwrap_or_default()
1410-
.into_iter(),
1411-
)
1412-
.sorted()
1398+
let resolved_env_vars: HashMap<String, String> = load_env_vars(
1399+
config
1400+
.env_vars_allowlist
1401+
.unwrap_or_default()
1402+
.into_iter()
1403+
.chain(load_whitelist_env_vars_from_env())
1404+
.chain(env_vars_static.keys().map(|x| x.to_string())),
1405+
&env_vars_static,
1406+
);
1407+
1408+
Ok(WorkerConfig {
1409+
worker_tags,
1410+
priority_tags_sorted,
1411+
dedicated_worker,
1412+
init_bash: config
1413+
.init_bash
1414+
.or_else(|| load_init_bash_from_env())
1415+
.and_then(|x| if x.is_empty() { None } else { Some(x) }),
1416+
cache_clear: config.cache_clear,
1417+
pip_local_dependencies: config
1418+
.pip_local_dependencies
1419+
.or_else(|| load_pip_local_dependencies_from_env()),
1420+
additional_python_paths: config
1421+
.additional_python_paths
1422+
.or_else(|| load_additional_python_paths_from_env()),
1423+
env_vars: resolved_env_vars,
1424+
})
1425+
}
1426+
1427+
pub fn load_init_bash_from_env() -> Option<String> {
1428+
std::env::var("INIT_SCRIPT")
1429+
.ok()
1430+
.and_then(|x| if x.is_empty() { None } else { Some(x) })
1431+
}
1432+
1433+
pub fn load_pip_local_dependencies_from_env() -> Option<Vec<String>> {
1434+
std::env::var("PIP_LOCAL_DEPENDENCIES")
1435+
.ok()
1436+
.map(|x| x.split(',').map(|x| x.to_string()).collect_vec())
1437+
}
1438+
1439+
pub fn load_additional_python_paths_from_env() -> Option<Vec<String>> {
1440+
std::env::var("ADDITIONAL_PYTHON_PATHS")
1441+
.ok()
1442+
.map(|x| x.split(':').map(|x| x.to_string()).collect_vec())
1443+
}
1444+
1445+
pub fn load_whitelist_env_vars_from_env() -> std::vec::IntoIter<String> {
1446+
std::env::var("WHITELIST_ENVS")
1447+
.ok()
1448+
.map(|x| x.split(',').map(|x| x.to_string()).collect_vec())
1449+
.unwrap_or_default()
1450+
.into_iter()
1451+
}
1452+
1453+
pub fn load_env_vars(
1454+
iter: impl Iterator<Item = String>,
1455+
env_vars_static: &HashMap<String, String>,
1456+
) -> HashMap<String, String> {
1457+
iter.sorted()
14131458
.unique()
14141459
.map(|envvar_name| {
14151460
(
@@ -1422,34 +1467,7 @@ pub async fn load_worker_config(
14221467
}),
14231468
)
14241469
})
1425-
.collect();
1426-
1427-
Ok(WorkerConfig {
1428-
worker_tags,
1429-
priority_tags_sorted,
1430-
dedicated_worker,
1431-
init_bash: config
1432-
.init_bash
1433-
.or_else(|| std::env::var("INIT_SCRIPT").ok())
1434-
.and_then(|x| if x.is_empty() { None } else { Some(x) }),
1435-
cache_clear: config.cache_clear,
1436-
pip_local_dependencies: config.pip_local_dependencies.or_else(|| {
1437-
let pip_local_dependencies = std::env::var("PIP_LOCAL_DEPENDENCIES")
1438-
.ok()
1439-
.map(|x| x.split(',').map(|x| x.to_string()).collect());
1440-
if pip_local_dependencies == Some(vec!["".to_string()]) {
1441-
None
1442-
} else {
1443-
pip_local_dependencies
1444-
}
1445-
}),
1446-
additional_python_paths: config.additional_python_paths.or_else(|| {
1447-
std::env::var("ADDITIONAL_PYTHON_PATHS")
1448-
.ok()
1449-
.map(|x| x.split(':').map(|x| x.to_string()).collect())
1450-
}),
1451-
env_vars: resolved_env_vars,
1452-
})
1470+
.collect()
14531471
}
14541472

14551473
#[derive(Clone, PartialEq, Debug)]

backend/windmill-worker/src/bigquery_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ pub async fn do_bigquery(
472472
conn,
473473
mem_peak,
474474
canceled_by,
475-
result_f.map_err(to_anyhow),
475+
result_f,
476476
worker_name,
477477
&job.workspace_id,
478478
&mut Some(occupancy_metrics),

backend/windmill-worker/src/graphql_executor.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::collections::HashMap;
22

3-
use anyhow::anyhow;
43
use futures::{stream, TryStreamExt};
54
use serde_json::{json, value::RawValue};
65
use sqlx::types::Json;
@@ -134,11 +133,13 @@ pub async fn do_graphql(
134133
.map_err(|e| Error::ExecutionErr(e.to_string()))?;
135134

136135
if let Some(errors) = result.errors {
137-
return Err(anyhow!(errors
138-
.into_iter()
139-
.map(|x| x.message)
140-
.collect::<Vec<_>>()
141-
.join("\n"),));
136+
return Err(Error::ExecutionErr(
137+
errors
138+
.into_iter()
139+
.map(|x| x.message)
140+
.collect::<Vec<_>>()
141+
.join("\n"),
142+
));
142143
}
143144

144145
// And then check that we got back the same string we sent over.

backend/windmill-worker/src/handle_child.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ pub async fn run_future_with_polling_update_job_poller<Fut, T, S>(
526526
get_mem: S,
527527
) -> error::Result<T>
528528
where
529-
Fut: Future<Output = anyhow::Result<T>>,
529+
Fut: Future<Output = windmill_common::error::Result<T>>,
530530
S: stream::Stream<Item = i32> + Unpin,
531531
{
532532
let (tx, rx) = broadcast::channel::<()>(3);

0 commit comments

Comments
 (0)