Skip to content

Commit f29110e

Browse files
committed
S3 works with new syntax
1 parent ea8e976 commit f29110e

File tree

3 files changed

+61
-11
lines changed

3 files changed

+61
-11
lines changed

backend/parsers/windmill-parser-sql/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ pub fn parse_db_resource(code: &str) -> Option<String> {
121121
}
122122

123123
pub struct S3ModeArgs {
124-
pub object_key: String,
124+
pub folder_key: String,
125125
pub storage: Option<String>,
126126
}
127127
pub fn parse_s3_mode(code: &str) -> Option<S3ModeArgs> {
128128
let cap = RE_S3_MODE.captures(code)?;
129129
let arg1 = cap.get(1).map(|x| x.as_str().to_string())?;
130130
let arg2 = cap.get(2).map(|x| x.as_str().to_string());
131-
Some(S3ModeArgs { object_key: arg1, storage: arg2 })
131+
Some(S3ModeArgs { folder_key: arg1, storage: arg2 })
132132
}
133133

134134
pub fn parse_sql_blocks(code: &str) -> Vec<&str> {

backend/windmill-worker/src/bigquery_executor.rs

+20-8
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,12 @@ struct BigqueryError {
6666
message: String,
6767
}
6868

69-
struct S3Mode<'a> {
70-
client: &'a AuthedClient,
69+
#[derive(Clone)]
70+
struct S3Mode {
71+
client: AuthedClient,
7172
object_key: String,
7273
storage: Option<String>,
74+
workspace_id: String,
7375
}
7476

7577
fn do_bigquery_inner<'a>(
@@ -81,7 +83,7 @@ fn do_bigquery_inner<'a>(
8183
column_order: Option<&'a mut Option<Vec<String>>>,
8284
skip_collect: bool,
8385
http_client: &'a Client,
84-
s3: &Option<S3Mode>,
86+
s3: Option<S3Mode>,
8587
) -> windmill_common::error::Result<BoxFuture<'a, windmill_common::error::Result<Box<RawValue>>>> {
8688
let param_names = parse_sql_statement_named_params(query, '@');
8789

@@ -122,7 +124,16 @@ fn do_bigquery_inner<'a>(
122124
if skip_collect {
123125
return Ok(to_raw_value(&Value::Array(vec![])));
124126
} else if let Some(ref s3) = s3 {
125-
// s3.client.get("", query)
127+
s3.client
128+
.upload_s3_file(
129+
s3.workspace_id.as_str(),
130+
s3.object_key.clone(),
131+
s3.storage.clone(),
132+
response.bytes_stream(),
133+
)
134+
.await?;
135+
136+
Ok(serde_json::value::to_raw_value(&s3.object_key)?)
126137
} else {
127138
let result = response.json::<BigqueryResponse>().await.map_err(|e| {
128139
Error::ExecutionErr(format!(
@@ -231,9 +242,10 @@ pub async fn do_bigquery(
231242

232243
let inline_db_res_path = parse_db_resource(&query);
233244
let s3 = parse_s3_mode(&query).map(|s3_mode| S3Mode {
234-
client: &client,
245+
client: client.clone(),
235246
storage: s3_mode.storage,
236-
object_key: s3_mode.object_key,
247+
object_key: format!("{}/{}.txt", s3_mode.folder_key, job.id),
248+
workspace_id: job.workspace_id.clone(),
237249
});
238250

239251
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
@@ -347,7 +359,7 @@ pub async fn do_bigquery(
347359
None,
348360
annotations.return_last_result && i < queries.len() - 1,
349361
&http_client,
350-
&s3,
362+
s3.clone(),
351363
)
352364
})
353365
.collect::<windmill_common::error::Result<Vec<_>>>()?;
@@ -377,7 +389,7 @@ pub async fn do_bigquery(
377389
Some(column_order),
378390
false,
379391
&http_client,
380-
&s3,
392+
s3,
381393
)?
382394
};
383395

backend/windmill-worker/src/worker.rs

+39-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use windmill_common::METRICS_DEBUG_ENABLED;
3939
#[cfg(feature = "prometheus")]
4040
use windmill_common::METRICS_ENABLED;
4141

42-
use reqwest::Response;
42+
use reqwest::{Body, Response};
4343
use serde::{de::DeserializeOwned, Deserialize, Serialize};
4444
use sqlx::types::Json;
4545
use std::{
@@ -520,6 +520,44 @@ impl AuthedClient {
520520
_ => Err(anyhow::anyhow!(response.text().await.unwrap_or_default())),
521521
}
522522
}
523+
524+
pub async fn upload_s3_file<S>(
525+
&self,
526+
workspace_id: &str,
527+
object_key: String,
528+
storage: Option<String>,
529+
body: S,
530+
) -> anyhow::Result<Response>
531+
where
532+
S: futures::stream::TryStream + Send + 'static,
533+
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
534+
bytes::Bytes: From<S::Ok>,
535+
{
536+
let mut query = vec![("file_key", object_key)];
537+
if let Some(storage) = storage {
538+
query.push(("storage", storage.clone()));
539+
}
540+
self.force_client
541+
.as_ref()
542+
.unwrap_or(&HTTP_CLIENT)
543+
.post(format!(
544+
"{}/api/w/{}/job_helpers/upload_s3_file",
545+
self.base_internal_url, workspace_id
546+
))
547+
.query(&query)
548+
.header(
549+
reqwest::header::ACCEPT,
550+
reqwest::header::HeaderValue::from_static("application/json"),
551+
)
552+
.header(
553+
reqwest::header::AUTHORIZATION,
554+
reqwest::header::HeaderValue::from_str(&format!("Bearer {}", self.token))?,
555+
)
556+
.body(Body::wrap_stream(body))
557+
.send()
558+
.await
559+
.context(format!("Sent upload_s3_file request",))
560+
}
523561
}
524562

525563
#[derive(Clone)]

0 commit comments

Comments
 (0)