Skip to content

Commit ed0a7a3

Browse files
committed
mysql s3 streaming
1 parent b6e9269 commit ed0a7a3

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

backend/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/windmill-worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ deno_tls = { workspace = true, optional = true }
9090
deno_permissions = { workspace = true, optional = true }
9191
deno_io = { workspace = true, optional = true }
9292
deno_error = { workspace = true, optional = true }
93+
async-stream.workspace = true
9394

9495
postgres-native-tls.workspace = true
9596
native-tls.workspace = true

backend/windmill-worker/src/mysql_executor.rs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::{collections::HashMap, sync::Arc};
22

3+
use anyhow::anyhow;
34
use base64::Engine;
4-
use futures::{future::BoxFuture, FutureExt};
5+
use futures::{future::BoxFuture, FutureExt, StreamExt};
56
use itertools::Itertools;
67
use mysql_async::{
78
consts::ColumnType, prelude::*, FromValueError, OptsBuilder, Params, Row, SslOpts,
@@ -16,8 +17,8 @@ use windmill_common::{
1617
worker::{to_raw_value, Connection},
1718
};
1819
use windmill_parser_sql::{
19-
parse_db_resource, parse_mysql_sig, parse_sql_blocks, parse_sql_statement_named_params,
20-
RE_ARG_MYSQL_NAMED,
20+
parse_db_resource, parse_mysql_sig, parse_s3_mode, parse_sql_blocks,
21+
parse_sql_statement_named_params, RE_ARG_MYSQL_NAMED,
2122
};
2223
use windmill_queue::CanceledBy;
2324
use windmill_queue::MiniPulledJob;
@@ -39,12 +40,21 @@ struct MysqlDatabase {
3940
ssl: Option<bool>,
4041
}
4142

42-
pub fn do_mysql_inner<'a>(
43+
#[derive(Clone)]
44+
struct S3Mode {
45+
client: AuthedClient,
46+
object_key: String,
47+
storage: Option<String>,
48+
workspace_id: String,
49+
}
50+
51+
fn do_mysql_inner<'a>(
4352
query: &'a str,
4453
all_statement_values: &Params,
4554
conn: Arc<Mutex<mysql_async::Conn>>,
4655
column_order: Option<&'a mut Option<Vec<String>>>,
4756
skip_collect: bool,
57+
s3: Option<S3Mode>,
4858
) -> windmill_common::error::Result<BoxFuture<'a, anyhow::Result<Box<RawValue>>>> {
4959
let param_names = parse_sql_statement_named_params(query, ':')
5060
.into_iter()
@@ -71,6 +81,43 @@ pub fn do_mysql_inner<'a>(
7181
.map_err(to_anyhow)?;
7282

7383
Ok(to_raw_value(&Value::Array(vec![])))
84+
} else if let Some(ref s3) = s3 {
85+
let query = query.to_string();
86+
let rows_stream = async_stream::stream! {
87+
let mut conn = conn.lock().await;
88+
match conn.exec_iter(query, statement_values).await.map_err(to_anyhow) {
89+
Ok(mut result) => {
90+
while let Some(row) = result.next().await? {
91+
let json = serde_json::to_string(&convert_row_to_value(row)).map_err(to_anyhow)?;
92+
yield Ok(json);
93+
}
94+
},
95+
Err(e) => {
96+
yield Err(anyhow!("Error executing query: {:?}", e));
97+
}
98+
};
99+
};
100+
let rows_stream = rows_stream.enumerate().map(|(i, row)| {
101+
if i == 0 {
102+
row
103+
} else {
104+
Ok(format!(",\n{}", row?))
105+
}
106+
});
107+
let start_bracket = futures::stream::once(async { Ok("{ rows: [\n".to_string()) });
108+
let end_bracket = futures::stream::once(async { Ok("\n]}".to_string()) });
109+
let rows_stream = start_bracket.chain(rows_stream).chain(end_bracket);
110+
111+
s3.client
112+
.upload_s3_file(
113+
s3.workspace_id.as_str(),
114+
s3.object_key.clone(),
115+
s3.storage.clone(),
116+
rows_stream,
117+
)
118+
.await?;
119+
120+
Ok(serde_json::value::to_raw_value(&s3.object_key)?)
74121
} else {
75122
let rows: Vec<Row> = conn
76123
.lock()
@@ -118,6 +165,12 @@ pub async fn do_mysql(
118165
let job_args = build_args_values(job, client, conn).await?;
119166

120167
let inline_db_res_path = parse_db_resource(&query);
168+
let s3 = parse_s3_mode(&query).map(|s3_mode| S3Mode {
169+
client: client.clone(),
170+
storage: s3_mode.storage,
171+
object_key: format!("{}/{}.txt", s3_mode.folder_key, job.id),
172+
workspace_id: job.workspace_id.clone(),
173+
});
121174

122175
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
123176
Some(
@@ -252,6 +305,7 @@ pub async fn do_mysql(
252305
conn_a.clone(),
253306
None,
254307
annotations.return_last_result && i < queries.len() - 1,
308+
s3.clone(),
255309
)
256310
})
257311
.collect::<windmill_common::error::Result<Vec<_>>>()?;
@@ -277,6 +331,7 @@ pub async fn do_mysql(
277331
conn_a.clone(),
278332
Some(column_order),
279333
false,
334+
s3,
280335
)?
281336
};
282337

0 commit comments

Comments
 (0)