Skip to content
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ea8e976
stream to s3 boilerplate
diegoimbert May 5, 2025
f29110e
S3 works with new syntax
diegoimbert May 5, 2025
beba862
snowflake s3 streaming support
diegoimbert May 5, 2025
f7e36bf
postgres s3 support
diegoimbert May 5, 2025
b6e9269
fix postgres stream format
diegoimbert May 5, 2025
ed0a7a3
mysql s3 streaming
diegoimbert May 6, 2025
62df7af
mssql s3 streaming
diegoimbert May 6, 2025
92c8925
new s3 mode syntax
diegoimbert May 6, 2025
6ebaf1c
optional folder param
diegoimbert May 6, 2025
794de6b
rename folder to prefix
diegoimbert May 6, 2025
b86d51b
json_stream_arr_values
diegoimbert May 7, 2025
8990be9
cargo toml rollback
diegoimbert May 7, 2025
d1d4fc0
convert_ndjson with datafusion
diegoimbert May 7, 2025
b8c1243
format conversion kinda works
diegoimbert May 8, 2025
dc8e041
Fixed not finishing the datafusion writer
diegoimbert May 8, 2025
1d29f54
support for pg and mssql
diegoimbert May 8, 2025
6b4e8bc
fix file ext
diegoimbert May 8, 2025
8bdf4dd
bigquery conversion and works with s3 streaming
diegoimbert May 8, 2025
a576f55
fix s3 flag parser
diegoimbert May 8, 2025
7609263
snowflake s3 streaming support
diegoimbert May 8, 2025
2895221
factor out duplicate code
diegoimbert May 8, 2025
755b423
remove anyhow
diegoimbert May 8, 2025
7440173
Err case for parse s3 mode
diegoimbert May 8, 2025
967b839
Send error to mpsc
diegoimbert May 8, 2025
1b261ce
bigquery s3 streaming fix for huge queries
diegoimbert May 9, 2025
3a58cb8
remove extra stuff
diegoimbert May 9, 2025
8b87ae4
snowflake s3 streaming support
diegoimbert May 9, 2025
d582557
small regex mistake
diegoimbert May 9, 2025
83906ea
cfg(not(feature = "parquet"))
diegoimbert May 9, 2025
d834ca5
Merge branch 'main' into di/stream-sql-to-s3
diegoimbert May 9, 2025
af7468d
Merge branch 'main' into di/stream-sql-to-s3
diegoimbert May 12, 2025
05283a8
fix CI (unused import)
diegoimbert May 12, 2025
cde77fe
error handling fix (graphite)
diegoimbert May 12, 2025
bda882a
Merge branch 'main' into di/stream-sql-to-s3
diegoimbert May 12, 2025
58d5bd8
Merge branch 'main' into di/stream-sql-to-s3
diegoimbert May 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions backend/parsers/windmill-parser-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,60 @@ pub fn parse_db_resource(code: &str) -> Option<String> {
cap.map(|x| x.get(1).map(|x| x.as_str().to_string()).unwrap())
}

#[derive(Clone, Copy, Debug)]
pub enum S3ModeFormat {
Json,
Csv,
Parquet,
}
pub fn s3_mode_extension(format: S3ModeFormat) -> &'static str {
match format {
S3ModeFormat::Json => "json",
S3ModeFormat::Csv => "csv",
S3ModeFormat::Parquet => "parquet",
}
}
pub struct S3ModeArgs {
pub prefix: Option<String>,
pub storage: Option<String>,
pub format: S3ModeFormat,
}
pub fn parse_s3_mode(code: &str) -> anyhow::Result<Option<S3ModeArgs>> {
let cap = match RE_S3_MODE.captures(code) {
Some(x) => x,
None => return Ok(None),
};
let args_str = cap
.get(1)
.map(|x| x.as_str().to_string())
.unwrap_or_default();

let mut prefix = None;
let mut storage = None;
let mut format = S3ModeFormat::Json;

for kv in args_str.split(' ').map(|kv| kv.trim()) {
if kv.is_empty() {
continue;
}
let mut it = kv.split('=');
let (Some(key), Some(value)) = (it.next(), it.next()) else {
return Err(anyhow!("Invalid S3 mode argument: {}", kv));
};
match (key.trim(), value.trim()) {
("prefix", _) => prefix = Some(value.to_string()),
("storage", _) => storage = Some(value.to_string()),
("format", "json") => format = S3ModeFormat::Json,
("format", "parquet") => format = S3ModeFormat::Parquet,
("format", "csv") => format = S3ModeFormat::Csv,
("format", format) => return Err(anyhow!("Invalid S3 mode format: {}", format)),
(_, _) => return Err(anyhow!("Invalid S3 mode argument: {}", kv)),
}
}

Ok(Some(S3ModeArgs { prefix, storage, format }))
}

pub fn parse_sql_blocks(code: &str) -> Vec<&str> {
let mut blocks = vec![];
let mut last_idx = 0;
Expand Down Expand Up @@ -147,6 +201,7 @@ lazy_static::lazy_static! {
static ref RE_NONEMPTY_SQL_BLOCK: Regex = Regex::new(r#"(?m)^\s*[^\s](?:[^-]|$)"#).unwrap();

static ref RE_DB: Regex = Regex::new(r#"(?m)^-- database (\S+) *(?:\r|\n|$)"#).unwrap();
static ref RE_S3_MODE: Regex = Regex::new(r#"(?m)^-- s3( (.+))? *(?:\r|\n|$)"#).unwrap();

// -- $1 name (type) = default
static ref RE_ARG_MYSQL: Regex = Regex::new(r#"(?m)^-- \? (\w+) \((\w+)\)(?: ?\= ?(.+))? *(?:\r|\n|$)"#).unwrap();
Expand Down
6 changes: 5 additions & 1 deletion backend/windmill-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ tantivy = []
prometheus = ["dep:prometheus"]
loki = ["dep:tracing-loki"]
benchmark = []
parquet = ["dep:object_store", "dep:aws-config", "dep:aws-sdk-sts"]
parquet = ["dep:object_store", "dep:aws-config", "dep:aws-sdk-sts", "dep:datafusion"]
aws_auth = ["dep:aws-sdk-sts", "dep:aws-config"]
otel = ["dep:opentelemetry-semantic-conventions", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk",
"dep:opentelemetry", "dep:tracing-opentelemetry", "dep:opentelemetry-appender-tracing", "dep:tonic"]
Expand Down Expand Up @@ -44,6 +44,9 @@ tracing = { workspace = true }
axum = { workspace = true }
hyper = { workspace = true }
tokio = { workspace = true }
tokio-stream.workspace = true
tokio-util.workspace = true
datafusion = { workspace = true, optional = true}
reqwest = { workspace = true }
tracing-subscriber = { workspace = true }
lazy_static.workspace = true
Expand All @@ -67,6 +70,7 @@ async-stream.workspace = true
const_format.workspace = true
crc.workspace = true
windmill-macros.workspace = true
windmill-parser-sql.workspace = true
jsonwebtoken.workspace = true
backon.workspace = true

Expand Down
208 changes: 207 additions & 1 deletion backend/windmill-common/src/s3_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,35 @@ use object_store::{aws::AmazonS3Builder, ClientOptions};
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
#[cfg(feature = "parquet")]
use std::sync::Arc;
use std::sync::{Arc, Mutex};
#[cfg(feature = "parquet")]
use tokio::sync::RwLock;

#[cfg(feature = "parquet")]
use crate::error::to_anyhow;
#[cfg(feature = "parquet")]
use crate::utils::rd_string;
#[cfg(feature = "parquet")]
use bytes::Bytes;
#[cfg(feature = "parquet")]
use datafusion::arrow::array::{RecordBatch, RecordBatchWriter};
#[cfg(feature = "parquet")]
use datafusion::arrow::error::ArrowError;
#[cfg(feature = "parquet")]
use datafusion::arrow::json::writer::JsonArray;
#[cfg(feature = "parquet")]
use datafusion::arrow::{csv, json};
#[cfg(feature = "parquet")]
use datafusion::parquet::arrow::ArrowWriter;
#[cfg(feature = "parquet")]
use futures::TryStreamExt;
#[cfg(feature = "parquet")]
use std::io::Write;
#[cfg(feature = "parquet")]
use tokio::task;
#[cfg(feature = "parquet")]
use windmill_parser_sql::S3ModeFormat;

#[cfg(feature = "parquet")]
lazy_static::lazy_static! {

Expand Down Expand Up @@ -480,3 +505,184 @@ pub fn bundle(w_id: &str, hash: &str) -> String {
pub fn raw_app(w_id: &str, version: &i64) -> String {
format!("/home/rfiszel/raw_app/{}/{}", w_id, version)
}

// Originally used a Arc<Mutex<dyn RecordBatchWriter + Send>>
// But cannot call .close() on it because it moves the value and the object is not Sized
#[cfg(feature = "parquet")]
enum RecordBatchWriterEnum {
Parquet(ArrowWriter<ChannelWriter>),
Csv(csv::Writer<ChannelWriter>),
Json(json::Writer<ChannelWriter, JsonArray>),
}

#[cfg(feature = "parquet")]
impl RecordBatchWriter for RecordBatchWriterEnum {
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
match self {
RecordBatchWriterEnum::Parquet(w) => w.write(batch).map_err(|e| e.into()),
RecordBatchWriterEnum::Csv(w) => w.write(batch),
RecordBatchWriterEnum::Json(w) => w.write(batch),
}
}

fn close(self) -> Result<(), ArrowError> {
match self {
RecordBatchWriterEnum::Parquet(w) => w.close().map_err(|e| e.into()).map(drop),
RecordBatchWriterEnum::Csv(w) => w.close(),
RecordBatchWriterEnum::Json(w) => w.close(),
}
}
}

#[cfg(feature = "parquet")]
struct ChannelWriter {
sender: tokio::sync::mpsc::Sender<anyhow::Result<Bytes>>,
}

#[cfg(feature = "parquet")]
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let data: Bytes = buf.to_vec().into();
self.sender.blocking_send(Ok(data)).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
format!("Channel send error: {}", e),
)
})?;
Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

#[cfg(not(feature = "parquet"))]
pub async fn convert_json_line_stream<E: Into<anyhow::Error>>(
mut _stream: impl futures::TryStreamExt<Item = Result<serde_json::Value, E>> + Unpin,
_output_format: windmill_parser_sql::S3ModeFormat,
) -> anyhow::Result<impl futures::TryStreamExt<Item = anyhow::Result<bytes::Bytes>>> {
Ok(async_stream::stream! {
yield Err(anyhow::anyhow!("Parquet feature is not enabled. Cannot convert JSON line stream."));
})
}

#[cfg(feature = "parquet")]
pub async fn convert_json_line_stream<E: Into<anyhow::Error>>(
mut stream: impl TryStreamExt<Item = Result<serde_json::Value, E>> + Unpin,
output_format: S3ModeFormat,
) -> anyhow::Result<impl TryStreamExt<Item = anyhow::Result<bytes::Bytes>>> {
const MAX_MPSC_SIZE: usize = 1000;

use datafusion::{execution::context::SessionContext, prelude::NdJsonReadOptions};
use futures::StreamExt;
use std::path::PathBuf;
use tokio::io::AsyncWriteExt;

let mut path = PathBuf::from(std::env::temp_dir());
path.push(format!("{}.json", rd_string(8)));
let path_str = path
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid path"))?;

// Write the stream to a temporary file
let mut file: tokio::fs::File = tokio::fs::File::create(&path).await.map_err(to_anyhow)?;

while let Some(chunk) = stream.next().await {
match chunk {
Ok(chunk) => {
// Convert the chunk to bytes and write it to the file
let b: bytes::Bytes = serde_json::to_string(&chunk)?.into();
file.write_all(&b).await?;
file.write_all(b"\n").await?;
}
Err(e) => {
tokio::fs::remove_file(&path).await?;
return Err(e.into());
}
}
}

file.flush().await?;
file.sync_all().await?;
drop(file);

let ctx = SessionContext::new();
ctx.register_json(
"my_table",
path_str,
NdJsonReadOptions { ..Default::default() },
)
.await
.map_err(to_anyhow)?;

let df = ctx.sql("SELECT * FROM my_table").await.map_err(to_anyhow)?;
let schema = df.schema().clone().into();
let mut datafusion_stream = df.execute_stream().await.map_err(to_anyhow)?;

let (tx, rx) = tokio::sync::mpsc::channel(MAX_MPSC_SIZE);
let writer: Arc<Mutex<Option<RecordBatchWriterEnum>>> =
Arc::new(Mutex::new(Some(match output_format {
S3ModeFormat::Parquet => RecordBatchWriterEnum::Parquet(
ArrowWriter::try_new(ChannelWriter { sender: tx.clone() }, Arc::new(schema), None)
.map_err(to_anyhow)?,
),

S3ModeFormat::Csv => {
RecordBatchWriterEnum::Csv(csv::Writer::new(ChannelWriter { sender: tx.clone() }))
}
S3ModeFormat::Json => {
RecordBatchWriterEnum::Json(json::Writer::<_, JsonArray>::new(ChannelWriter {
sender: tx.clone(),
}))
}
})));

// This spawn is so that the data is sent in the background. Else the function would deadlock
// when hitting the mpsc channel limit
task::spawn(async move {
while let Some(batch_result) = datafusion_stream.next().await {
let batch: RecordBatch = match batch_result {
Ok(batch) => batch,
Err(e) => {
tracing::error!("Error in datafusion stream: {:?}", &e);
match tx.send(Err(e.into())).await {
Ok(_) => {}
Err(e) => tracing::error!("Failed to write error to channel: {:?}", &e),
}
break;
}
};
let writer = writer.clone();
// Writer calls blocking_send which would crash if called from the async context
let write_result = task::spawn_blocking(move || {
// SAFETY: We await so the code is actually sequential, lock unwrap cannot panic
// Second unwrap is ok because we initialized the option with Some
writer.lock().unwrap().as_mut().unwrap().write(&batch)
})
.await;
match write_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::error!("Error writing batch: {:?}", &e);
match tx.send(Err(e.into())).await {
Ok(_) => {}
Err(e) => tracing::error!("Failed to write error to channel: {:?}", &e),
}
}
Err(e) => tracing::error!("Error in blocking task: {:?}", &e),
};
}
task::spawn_blocking(move || {
writer.lock().unwrap().take().unwrap().close()?;
drop(writer);
Ok::<_, anyhow::Error>(())
})
.await??;
drop(ctx);
tokio::fs::remove_file(&path).await?;
Ok::<_, anyhow::Error>(())
});

Ok(tokio_stream::wrappers::ReceiverStream::new(rx))
}
1 change: 1 addition & 0 deletions backend/windmill-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ deno_tls = { workspace = true, optional = true }
deno_permissions = { workspace = true, optional = true }
deno_io = { workspace = true, optional = true }
deno_error = { workspace = true, optional = true }
async-stream.workspace = true

postgres-native-tls.workspace = true
native-tls.workspace = true
Expand Down
Loading