Skip to content

Commit 76ea540

Browse files
committed
s3 streaming works for mssql
1 parent 4df4f3c commit 76ea540

File tree

1 file changed

+69
-20
lines changed

1 file changed

+69
-20
lines changed

backend/windmill-worker/src/mssql_executor.rs

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use base64::{engine::general_purpose, Engine as _};
22
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
3+
use futures::StreamExt;
34
use regex::Regex;
45
use serde::Deserialize;
56
use serde_json::value::RawValue;
@@ -13,7 +14,7 @@ use windmill_common::{
1314
utils::empty_string_as_none,
1415
worker::{to_raw_value, Connection},
1516
};
16-
use windmill_parser_sql::{parse_db_resource, parse_mssql_sig};
17+
use windmill_parser_sql::{parse_db_resource, parse_mssql_sig, parse_s3_mode};
1718
use windmill_queue::MiniPulledJob;
1819
use windmill_queue::{append_logs, CanceledBy};
1920

@@ -49,6 +50,14 @@ lazy_static::lazy_static! {
4950
static ref RE_MSSQL_READONLY_INTENT: Regex = Regex::new(r#"(?mi)^-- ApplicationIntent=ReadOnly *(?:\r|\n|$)"#).unwrap();
5051
}
5152

53+
#[derive(Clone)]
54+
struct S3Mode {
55+
client: AuthedClient,
56+
object_key: String,
57+
storage: Option<String>,
58+
workspace_id: String,
59+
}
60+
5261
pub async fn do_mssql(
5362
job: &MiniPulledJob,
5463
client: &AuthedClient,
@@ -63,6 +72,12 @@ pub async fn do_mssql(
6372
let mssql_args = build_args_values(job, client, conn).await?;
6473

6574
let inline_db_res_path = parse_db_resource(&query);
75+
let s3 = parse_s3_mode(&query).map(|s3_mode| S3Mode {
76+
client: client.clone(),
77+
storage: s3_mode.storage,
78+
object_key: format!("{}/{}.txt", s3_mode.folder_key, job.id),
79+
workspace_id: job.workspace_id.clone(),
80+
});
6681

6782
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
6883
Some(
@@ -197,27 +212,61 @@ pub async fn do_mssql(
197212
// A response to a query is a stream of data, that must be
198213
// polled to the end before querying again. Using streams allows
199214
// fetching data in an asynchronous manner, if needed.
200-
let stream = prepared_query.query(&mut client).await.map_err(to_anyhow)?;
201-
202-
let results = stream.into_results().await.map_err(to_anyhow)?;
203-
let len = results.len();
204-
let mut json_results = vec![];
205-
for (i, statement_result) in results.into_iter().enumerate() {
206-
if annotations.return_last_result && i < len - 1 {
207-
continue;
208-
}
209-
let mut json_rows = vec![];
210-
for row in statement_result {
211-
let row = row_to_json(row)?;
212-
json_rows.push(row);
213-
}
214-
json_results.push(json_rows);
215-
}
216215

217-
if annotations.return_last_result && json_results.len() > 0 {
218-
Ok(to_raw_value(&json_results.pop().unwrap()))
216+
if let Some(s3) = s3 {
217+
let rows_stream = async_stream::stream! {
218+
let mut stream = prepared_query.query(&mut client).await.map_err(to_anyhow)?.into_row_stream().map(|row| {
219+
serde_json::to_string(
220+
&row_to_json(row.map_err(to_anyhow)?).map_err(to_anyhow)?,
221+
)
222+
.map_err(to_anyhow)
223+
});
224+
while let Some(row) = stream.next().await {
225+
yield row;
226+
}
227+
};
228+
let rows_stream = rows_stream.enumerate().map(|(i, row)| {
229+
if i == 0 {
230+
row
231+
} else {
232+
Ok(format!(",\n{}", row?))
233+
}
234+
});
235+
let start_bracket = futures::stream::once(async { Ok("{ rows: [\n".to_string()) });
236+
let end_bracket = futures::stream::once(async { Ok("\n]}".to_string()) });
237+
let rows_stream = start_bracket.chain(rows_stream).chain(end_bracket);
238+
239+
s3.client
240+
.upload_s3_file(
241+
s3.workspace_id.as_str(),
242+
s3.object_key.clone(),
243+
s3.storage.clone(),
244+
rows_stream,
245+
)
246+
.await?;
247+
248+
Ok(serde_json::value::to_raw_value(&s3.object_key)?)
219249
} else {
220-
Ok(to_raw_value(&json_results))
250+
let stream = prepared_query.query(&mut client).await.map_err(to_anyhow)?;
251+
let results = stream.into_results().await.map_err(to_anyhow)?;
252+
let len = results.len();
253+
let mut json_results = vec![];
254+
for (i, statement_result) in results.into_iter().enumerate() {
255+
if annotations.return_last_result && i < len - 1 {
256+
continue;
257+
}
258+
let mut json_rows = vec![];
259+
for row in statement_result {
260+
let row = row_to_json(row)?;
261+
json_rows.push(row);
262+
}
263+
json_results.push(json_rows);
264+
}
265+
if annotations.return_last_result && json_results.len() > 0 {
266+
Ok(to_raw_value(&json_results.pop().unwrap()))
267+
} else {
268+
Ok(to_raw_value(&json_results))
269+
}
221270
}
222271
};
223272

0 commit comments

Comments
 (0)