Skip to content

Commit ea8e976

Browse files
committed
stream to s3 boilerplate
1 parent 4fd0561 commit ea8e976

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

backend/Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/parsers/windmill-parser-sql/src/lib.rs

+12
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,17 @@ pub fn parse_db_resource(code: &str) -> Option<String> {
120120
cap.map(|x| x.get(1).map(|x| x.as_str().to_string()).unwrap())
121121
}
122122

123+
pub struct S3ModeArgs {
124+
pub object_key: String,
125+
pub storage: Option<String>,
126+
}
127+
pub fn parse_s3_mode(code: &str) -> Option<S3ModeArgs> {
128+
let cap = RE_S3_MODE.captures(code)?;
129+
let arg1 = cap.get(1).map(|x| x.as_str().to_string())?;
130+
let arg2 = cap.get(2).map(|x| x.as_str().to_string());
131+
Some(S3ModeArgs { object_key: arg1, storage: arg2 })
132+
}
133+
123134
pub fn parse_sql_blocks(code: &str) -> Vec<&str> {
124135
let mut blocks = vec![];
125136
let mut last_idx = 0;
@@ -147,6 +158,7 @@ lazy_static::lazy_static! {
147158
static ref RE_NONEMPTY_SQL_BLOCK: Regex = Regex::new(r#"(?m)^\s*[^\s](?:[^-]|$)"#).unwrap();
148159

149160
static ref RE_DB: Regex = Regex::new(r#"(?m)^-- database (\S+) *(?:\r|\n|$)"#).unwrap();
161+
static ref RE_S3_MODE: Regex = Regex::new(r#"(?m)^-- s3 (\S+)( +(\S+))? *(?:\r|\n|$)"#).unwrap();
150162

151163
// -- $1 name (type) = default
152164
static ref RE_ARG_MYSQL: Regex = Regex::new(r#"(?m)^-- \? (\w+) \((\w+)\)(?: ?\= ?(.+))? *(?:\r|\n|$)"#).unwrap();

backend/windmill-worker/src/bigquery_executor.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use windmill_common::error::to_anyhow;
88
use windmill_common::worker::Connection;
99
use windmill_common::{error::Error, worker::to_raw_value};
1010
use windmill_parser_sql::{
11-
parse_bigquery_sig, parse_db_resource, parse_sql_blocks, parse_sql_statement_named_params,
11+
parse_bigquery_sig, parse_db_resource, parse_s3_mode, parse_sql_blocks,
12+
parse_sql_statement_named_params,
1213
};
1314
use windmill_queue::CanceledBy;
1415

@@ -65,6 +66,12 @@ struct BigqueryError {
6566
message: String,
6667
}
6768

69+
struct S3Mode<'a> {
70+
client: &'a AuthedClient,
71+
object_key: String,
72+
storage: Option<String>,
73+
}
74+
6875
fn do_bigquery_inner<'a>(
6976
query: &'a str,
7077
all_statement_values: &'a HashMap<String, Value>,
@@ -74,6 +81,7 @@ fn do_bigquery_inner<'a>(
7481
column_order: Option<&'a mut Option<Vec<String>>>,
7582
skip_collect: bool,
7683
http_client: &'a Client,
84+
s3: &Option<S3Mode>,
7785
) -> windmill_common::error::Result<BoxFuture<'a, windmill_common::error::Result<Box<RawValue>>>> {
7886
let param_names = parse_sql_statement_named_params(query, '@');
7987

@@ -113,6 +121,8 @@ fn do_bigquery_inner<'a>(
113121
Ok(_) => {
114122
if skip_collect {
115123
return Ok(to_raw_value(&Value::Array(vec![])));
124+
} else if let Some(ref s3) = s3 {
125+
// s3.client.get("", query)
116126
} else {
117127
let result = response.json::<BigqueryResponse>().await.map_err(|e| {
118128
Error::ExecutionErr(format!(
@@ -220,6 +230,11 @@ pub async fn do_bigquery(
220230
let bigquery_args = build_args_values(job, client, conn).await?;
221231

222232
let inline_db_res_path = parse_db_resource(&query);
233+
let s3 = parse_s3_mode(&query).map(|s3_mode| S3Mode {
234+
client: &client,
235+
storage: s3_mode.storage,
236+
object_key: s3_mode.object_key,
237+
});
223238

224239
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
225240
Some(
@@ -332,6 +347,7 @@ pub async fn do_bigquery(
332347
None,
333348
annotations.return_last_result && i < queries.len() - 1,
334349
&http_client,
350+
&s3,
335351
)
336352
})
337353
.collect::<windmill_common::error::Result<Vec<_>>>()?;
@@ -361,6 +377,7 @@ pub async fn do_bigquery(
361377
Some(column_order),
362378
false,
363379
&http_client,
380+
&s3,
364381
)?
365382
};
366383

0 commit comments

Comments
 (0)