Skip to content

Commit 8b87ae4

Browse files
committed
snowflake s3 streaming support
1 parent 3a58cb8 commit 8b87ae4

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

backend/windmill-worker/src/snowflake_executor.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use serde_json::{json, value::RawValue, Value};
99
use sha2::{Digest, Sha256};
1010
use std::collections::HashMap;
1111
use windmill_common::error::to_anyhow;
12+
use windmill_common::s3_helpers::convert_json_line_stream;
1213
use windmill_common::worker::Connection;
1314

1415
use windmill_common::{error::Error, worker::to_raw_value};
@@ -198,6 +199,13 @@ fn do_snowflake_inner<'a>(
198199
);
199200
}
200201

202+
// Clones are because, in s3 mode, reqwest::Body::wrap_stream requires the stream to be
203+
// 'static even though it doesn't make sense to be in our case since the request is
204+
// awaited and the stream is fully read before the function returns.
205+
// Turns out it is a real pain to trick the compiler, even using unsafe
206+
let cloned_account_identifier: String = account_identifier.to_string();
207+
let cloned_token = token.to_string();
208+
201209
let rows_stream = async_stream::stream! {
202210
for row in response.data {
203211
yield Ok::<Vec<Value>, windmill_common::error::Error>(row);
@@ -207,12 +215,12 @@ fn do_snowflake_inner<'a>(
207215
for idx in 1..response.resultSetMetaData.partitionInfo.len() {
208216
let url = format!(
209217
"https://{}.snowflakecomputing.com/api/v2/statements/{}",
210-
account_identifier.to_uppercase(),
218+
cloned_account_identifier.to_uppercase(),
211219
response.statementHandle
212220
);
213221
let mut request = HTTP_CLIENT
214222
.get(url)
215-
.bearer_auth(token)
223+
.bearer_auth(cloned_token.as_str())
216224
.query(&[("partition", idx.to_string())]);
217225

218226
if token_is_keypair {
@@ -233,7 +241,7 @@ fn do_snowflake_inner<'a>(
233241
}
234242
};
235243

236-
let rows_stream = rows_stream.map_ok(|row| {
244+
let rows_stream = rows_stream.map_ok(move |row| {
237245
let mut row_map = serde_json::Map::new();
238246
row.iter()
239247
.zip(response.resultSetMetaData.rowType.iter())
@@ -244,11 +252,10 @@ fn do_snowflake_inner<'a>(
244252
});
245253

246254
if let Some(s3) = s3 {
247-
// let rows_stream =
248-
// rows_stream.map(|r| serde_json::value::to_value(&r?).map_err(to_anyhow));
249-
// let stream = convert_json_line_stream(rows_stream.boxed(), s3.format).await?;
250-
// TODO fix this
251-
// s3.upload(stream.boxed()).await?;
255+
let rows_stream =
256+
rows_stream.map(|r| serde_json::value::to_value(&r?).map_err(to_anyhow));
257+
let stream = convert_json_line_stream(rows_stream.boxed(), s3.format).await?;
258+
s3.upload(stream.boxed()).await?;
252259
Ok(to_raw_value(&s3.object_key))
253260
} else {
254261
let rows = rows_stream

0 commit comments

Comments
 (0)