Skip to content

Commit f7e36bf

Browse files
committed
postgres s3 support
1 parent beba862 commit f7e36bf

File tree

1 file changed

+43
-2
lines changed

1 file changed

+43
-2
lines changed

backend/windmill-worker/src/pg_executor.rs

+43-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use anyhow::Context;
88
use base64::{engine, Engine as _};
99
use chrono::Utc;
1010
use futures::future::BoxFuture;
11-
use futures::{FutureExt, TryStreamExt};
11+
use futures::{FutureExt, StreamExt, TryStreamExt};
1212
use itertools::Itertools;
1313
use native_tls::{Certificate, TlsConnector};
1414
use postgres_native_tls::MakeTlsConnector;
@@ -30,7 +30,8 @@ use windmill_common::error::{self, Error};
3030
use windmill_common::worker::{to_raw_value, Connection, CLOUD_HOSTED};
3131
use windmill_parser::{Arg, Typ};
3232
use windmill_parser_sql::{
33-
parse_db_resource, parse_pg_statement_arg_indices, parse_pgsql_sig, parse_sql_blocks,
33+
parse_db_resource, parse_pg_statement_arg_indices, parse_pgsql_sig, parse_s3_mode,
34+
parse_sql_blocks,
3435
};
3536
use windmill_queue::{CanceledBy, MiniPulledJob};
3637

@@ -53,6 +54,14 @@ struct PgDatabase {
5354
root_certificate_pem: Option<String>,
5455
}
5556

57+
#[derive(Clone)]
58+
struct S3Mode {
59+
client: AuthedClient,
60+
object_key: String,
61+
storage: Option<String>,
62+
workspace_id: String,
63+
}
64+
5665
lazy_static! {
5766
pub static ref CONNECTION_CACHE: Arc<Mutex<Option<(String, tokio_postgres::Client)>>> =
5867
Arc::new(Mutex::new(None));
@@ -68,6 +77,7 @@ fn do_postgresql_inner<'a>(
6877
column_order: Option<&'a mut Option<Vec<String>>>,
6978
siz: &'a AtomicUsize,
7079
skip_collect: bool,
80+
s3: Option<S3Mode>,
7181
) -> error::Result<BoxFuture<'a, anyhow::Result<Box<RawValue>>>> {
7282
let mut query_params = vec![];
7383

@@ -106,6 +116,29 @@ fn do_postgresql_inner<'a>(
106116
.execute_raw(&query, query_params)
107117
.await
108118
.map_err(to_anyhow)?;
119+
} else if let Some(ref s3) = s3 {
120+
let rows = client
121+
.query_raw(&query, query_params)
122+
.await?
123+
.map_err(to_anyhow)
124+
.map(|row_result| {
125+
row_result.and_then(|row| {
126+
postgres_row_to_json_value(row)
127+
.map_err(to_anyhow)
128+
.and_then(|ref v| serde_json::to_string(v).map_err(to_anyhow))
129+
})
130+
});
131+
132+
s3.client
133+
.upload_s3_file(
134+
s3.workspace_id.as_str(),
135+
s3.object_key.clone(),
136+
s3.storage.clone(),
137+
rows,
138+
)
139+
.await?;
140+
141+
return Ok(serde_json::value::to_raw_value(&s3.object_key)?);
109142
} else {
110143
let rows = client
111144
.query_raw(&query, query_params)
@@ -171,6 +204,12 @@ pub async fn do_postgresql(
171204
let pg_args = build_args_values(job, client, conn).await?;
172205

173206
let inline_db_res_path = parse_db_resource(&query);
207+
let s3 = parse_s3_mode(&query).map(|s3_mode| S3Mode {
208+
client: client.clone(),
209+
storage: s3_mode.storage,
210+
object_key: format!("{}/{}.txt", s3_mode.folder_key, job.id),
211+
workspace_id: job.workspace_id.clone(),
212+
});
174213

175214
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
176215
Some(
@@ -321,6 +360,7 @@ pub async fn do_postgresql(
321360
None,
322361
&size,
323362
annotations.return_last_result && i < queries.len() - 1,
363+
s3.clone(),
324364
)
325365
})
326366
.collect::<error::Result<Vec<_>>>()?;
@@ -347,6 +387,7 @@ pub async fn do_postgresql(
347387
Some(column_order),
348388
&size,
349389
false,
390+
s3,
350391
)?
351392
};
352393

0 commit comments

Comments
 (0)