11use base64:: { engine:: general_purpose, Engine as _} ;
22use chrono:: { DateTime , NaiveDate , NaiveDateTime , NaiveTime , Utc } ;
3+ use futures:: StreamExt ;
34use regex:: Regex ;
45use serde:: Deserialize ;
56use serde_json:: value:: RawValue ;
@@ -13,7 +14,7 @@ use windmill_common::{
1314 utils:: empty_string_as_none,
1415 worker:: { to_raw_value, Connection } ,
1516} ;
16- use windmill_parser_sql:: { parse_db_resource, parse_mssql_sig} ;
17+ use windmill_parser_sql:: { parse_db_resource, parse_mssql_sig, parse_s3_mode } ;
1718use windmill_queue:: MiniPulledJob ;
1819use windmill_queue:: { append_logs, CanceledBy } ;
1920
@@ -49,6 +50,14 @@ lazy_static::lazy_static! {
4950 static ref RE_MSSQL_READONLY_INTENT : Regex = Regex :: new( r#"(?mi)^-- ApplicationIntent=ReadOnly *(?:\r|\n|$)"# ) . unwrap( ) ;
5051}
5152
53+ #[ derive( Clone ) ]
54+ struct S3Mode {
55+ client : AuthedClient ,
56+ object_key : String ,
57+ storage : Option < String > ,
58+ workspace_id : String ,
59+ }
60+
5261pub async fn do_mssql (
5362 job : & MiniPulledJob ,
5463 client : & AuthedClient ,
@@ -63,6 +72,12 @@ pub async fn do_mssql(
6372 let mssql_args = build_args_values ( job, client, conn) . await ?;
6473
6574 let inline_db_res_path = parse_db_resource ( & query) ;
75+ let s3 = parse_s3_mode ( & query) . map ( |s3_mode| S3Mode {
76+ client : client. clone ( ) ,
77+ storage : s3_mode. storage ,
78+ object_key : format ! ( "{}/{}.json" , s3_mode. folder_key, job. id) ,
79+ workspace_id : job. workspace_id . clone ( ) ,
80+ } ) ;
6681
6782 let db_arg = if let Some ( inline_db_res_path) = inline_db_res_path {
6883 Some (
@@ -197,27 +212,61 @@ pub async fn do_mssql(
197212 // A response to a query is a stream of data, that must be
198213 // polled to the end before querying again. Using streams allows
199214 // fetching data in an asynchronous manner, if needed.
200- let stream = prepared_query. query ( & mut client) . await . map_err ( to_anyhow) ?;
201-
202- let results = stream. into_results ( ) . await . map_err ( to_anyhow) ?;
203- let len = results. len ( ) ;
204- let mut json_results = vec ! [ ] ;
205- for ( i, statement_result) in results. into_iter ( ) . enumerate ( ) {
206- if annotations. return_last_result && i < len - 1 {
207- continue ;
208- }
209- let mut json_rows = vec ! [ ] ;
210- for row in statement_result {
211- let row = row_to_json ( row) ?;
212- json_rows. push ( row) ;
213- }
214- json_results. push ( json_rows) ;
215- }
216215
217- if annotations. return_last_result && json_results. len ( ) > 0 {
218- Ok ( to_raw_value ( & json_results. pop ( ) . unwrap ( ) ) )
216+ if let Some ( s3) = s3 {
217+ let rows_stream = async_stream:: stream! {
218+ let mut stream = prepared_query. query( & mut client) . await . map_err( to_anyhow) ?. into_row_stream( ) . map( |row| {
219+ serde_json:: to_string(
220+ & row_to_json( row. map_err( to_anyhow) ?) . map_err( to_anyhow) ?,
221+ )
222+ . map_err( to_anyhow)
223+ } ) ;
224+ while let Some ( row) = stream. next( ) . await {
225+ yield row;
226+ }
227+ } ;
228+ let rows_stream = rows_stream. enumerate ( ) . map ( |( i, row) | {
229+ if i == 0 {
230+ row
231+ } else {
232+ Ok ( format ! ( ",\n {}" , row?) )
233+ }
234+ } ) ;
235+ let start_bracket = futures:: stream:: once ( async { Ok ( "{ rows: [\n " . to_string ( ) ) } ) ;
236+ let end_bracket = futures:: stream:: once ( async { Ok ( "\n ]}" . to_string ( ) ) } ) ;
237+ let rows_stream = start_bracket. chain ( rows_stream) . chain ( end_bracket) ;
238+
239+ s3. client
240+ . upload_s3_file (
241+ s3. workspace_id . as_str ( ) ,
242+ s3. object_key . clone ( ) ,
243+ s3. storage . clone ( ) ,
244+ rows_stream,
245+ )
246+ . await ?;
247+
248+ Ok ( serde_json:: value:: to_raw_value ( & s3. object_key ) ?)
219249 } else {
220- Ok ( to_raw_value ( & json_results) )
250+ let stream = prepared_query. query ( & mut client) . await . map_err ( to_anyhow) ?;
251+ let results = stream. into_results ( ) . await . map_err ( to_anyhow) ?;
252+ let len = results. len ( ) ;
253+ let mut json_results = vec ! [ ] ;
254+ for ( i, statement_result) in results. into_iter ( ) . enumerate ( ) {
255+ if annotations. return_last_result && i < len - 1 {
256+ continue ;
257+ }
258+ let mut json_rows = vec ! [ ] ;
259+ for row in statement_result {
260+ let row = row_to_json ( row) ?;
261+ json_rows. push ( row) ;
262+ }
263+ json_results. push ( json_rows) ;
264+ }
265+ if annotations. return_last_result && json_results. len ( ) > 0 {
266+ Ok ( to_raw_value ( & json_results. pop ( ) . unwrap ( ) ) )
267+ } else {
268+ Ok ( to_raw_value ( & json_results) )
269+ }
221270 }
222271 } ;
223272
0 commit comments