@@ -16,10 +16,35 @@ use object_store::{aws::AmazonS3Builder, ClientOptions};
1616use reqwest:: header:: HeaderMap ;
1717use serde:: { Deserialize , Serialize } ;
1818#[ cfg( feature = "parquet" ) ]
19- use std:: sync:: Arc ;
19+ use std:: sync:: { Arc , Mutex } ;
2020#[ cfg( feature = "parquet" ) ]
2121use tokio:: sync:: RwLock ;
2222
23+ #[ cfg( feature = "parquet" ) ]
24+ use crate :: error:: to_anyhow;
25+ #[ cfg( feature = "parquet" ) ]
26+ use crate :: utils:: rd_string;
27+ #[ cfg( feature = "parquet" ) ]
28+ use bytes:: Bytes ;
29+ #[ cfg( feature = "parquet" ) ]
30+ use datafusion:: arrow:: array:: { RecordBatch , RecordBatchWriter } ;
31+ #[ cfg( feature = "parquet" ) ]
32+ use datafusion:: arrow:: error:: ArrowError ;
33+ #[ cfg( feature = "parquet" ) ]
34+ use datafusion:: arrow:: json:: writer:: JsonArray ;
35+ #[ cfg( feature = "parquet" ) ]
36+ use datafusion:: arrow:: { csv, json} ;
37+ #[ cfg( feature = "parquet" ) ]
38+ use datafusion:: parquet:: arrow:: ArrowWriter ;
39+ #[ cfg( feature = "parquet" ) ]
40+ use futures:: TryStreamExt ;
41+ #[ cfg( feature = "parquet" ) ]
42+ use std:: io:: Write ;
43+ #[ cfg( feature = "parquet" ) ]
44+ use tokio:: task;
45+ #[ cfg( feature = "parquet" ) ]
46+ use windmill_parser_sql:: S3ModeFormat ;
47+
2348#[ cfg( feature = "parquet" ) ]
2449lazy_static:: lazy_static! {
2550
@@ -480,3 +505,184 @@ pub fn bundle(w_id: &str, hash: &str) -> String {
480505pub fn raw_app ( w_id : & str , version : & i64 ) -> String {
481506 format ! ( "/home/rfiszel/raw_app/{}/{}" , w_id, version)
482507}
508+
509+ // Originally used a Arc<Mutex<dyn RecordBatchWriter + Send>>
510+ // But cannot call .close() on it because it moves the value and the object is not Sized
511+ #[ cfg( feature = "parquet" ) ]
512+ enum RecordBatchWriterEnum {
513+ Parquet ( ArrowWriter < ChannelWriter > ) ,
514+ Csv ( csv:: Writer < ChannelWriter > ) ,
515+ Json ( json:: Writer < ChannelWriter , JsonArray > ) ,
516+ }
517+
518+ #[ cfg( feature = "parquet" ) ]
519+ impl RecordBatchWriter for RecordBatchWriterEnum {
520+ fn write ( & mut self , batch : & RecordBatch ) -> Result < ( ) , ArrowError > {
521+ match self {
522+ RecordBatchWriterEnum :: Parquet ( w) => w. write ( batch) . map_err ( |e| e. into ( ) ) ,
523+ RecordBatchWriterEnum :: Csv ( w) => w. write ( batch) ,
524+ RecordBatchWriterEnum :: Json ( w) => w. write ( batch) ,
525+ }
526+ }
527+
528+ fn close ( self ) -> Result < ( ) , ArrowError > {
529+ match self {
530+ RecordBatchWriterEnum :: Parquet ( w) => w. close ( ) . map_err ( |e| e. into ( ) ) . map ( drop) ,
531+ RecordBatchWriterEnum :: Csv ( w) => w. close ( ) ,
532+ RecordBatchWriterEnum :: Json ( w) => w. close ( ) ,
533+ }
534+ }
535+ }
536+
537+ #[ cfg( feature = "parquet" ) ]
538+ struct ChannelWriter {
539+ sender : tokio:: sync:: mpsc:: Sender < anyhow:: Result < Bytes > > ,
540+ }
541+
542+ #[ cfg( feature = "parquet" ) ]
543+ impl Write for ChannelWriter {
544+ fn write ( & mut self , buf : & [ u8 ] ) -> std:: io:: Result < usize > {
545+ let data: Bytes = buf. to_vec ( ) . into ( ) ;
546+ self . sender . blocking_send ( Ok ( data) ) . map_err ( |e| {
547+ std:: io:: Error :: new (
548+ std:: io:: ErrorKind :: BrokenPipe ,
549+ format ! ( "Channel send error: {}" , e) ,
550+ )
551+ } ) ?;
552+ Ok ( buf. len ( ) )
553+ }
554+
555+ fn flush ( & mut self ) -> std:: io:: Result < ( ) > {
556+ Ok ( ( ) )
557+ }
558+ }
559+
560+ #[ cfg( not( feature = "parquet" ) ) ]
561+ pub async fn convert_json_line_stream < E : Into < anyhow:: Error > > (
562+ mut _stream : impl futures:: TryStreamExt < Item = Result < serde_json:: Value , E > > + Unpin ,
563+ _output_format : windmill_parser_sql:: S3ModeFormat ,
564+ ) -> anyhow:: Result < impl futures:: TryStreamExt < Item = anyhow:: Result < bytes:: Bytes > > > {
565+ Ok ( async_stream:: stream! {
566+ yield Err ( anyhow:: anyhow!( "Parquet feature is not enabled. Cannot convert JSON line stream." ) ) ;
567+ } )
568+ }
569+
570+ #[ cfg( feature = "parquet" ) ]
571+ pub async fn convert_json_line_stream < E : Into < anyhow:: Error > > (
572+ mut stream : impl TryStreamExt < Item = Result < serde_json:: Value , E > > + Unpin ,
573+ output_format : S3ModeFormat ,
574+ ) -> anyhow:: Result < impl TryStreamExt < Item = anyhow:: Result < bytes:: Bytes > > > {
575+ const MAX_MPSC_SIZE : usize = 1000 ;
576+
577+ use datafusion:: { execution:: context:: SessionContext , prelude:: NdJsonReadOptions } ;
578+ use futures:: StreamExt ;
579+ use std:: path:: PathBuf ;
580+ use tokio:: io:: AsyncWriteExt ;
581+
582+ let mut path = PathBuf :: from ( std:: env:: temp_dir ( ) ) ;
583+ path. push ( format ! ( "{}.json" , rd_string( 8 ) ) ) ;
584+ let path_str = path
585+ . to_str ( )
586+ . ok_or_else ( || anyhow:: anyhow!( "Invalid path" ) ) ?;
587+
588+ // Write the stream to a temporary file
589+ let mut file: tokio:: fs:: File = tokio:: fs:: File :: create ( & path) . await . map_err ( to_anyhow) ?;
590+
591+ while let Some ( chunk) = stream. next ( ) . await {
592+ match chunk {
593+ Ok ( chunk) => {
594+ // Convert the chunk to bytes and write it to the file
595+ let b: bytes:: Bytes = serde_json:: to_string ( & chunk) ?. into ( ) ;
596+ file. write_all ( & b) . await ?;
597+ file. write_all ( b"\n " ) . await ?;
598+ }
599+ Err ( e) => {
600+ tokio:: fs:: remove_file ( & path) . await ?;
601+ return Err ( e. into ( ) ) ;
602+ }
603+ }
604+ }
605+
606+ file. flush ( ) . await ?;
607+ file. sync_all ( ) . await ?;
608+ drop ( file) ;
609+
610+ let ctx = SessionContext :: new ( ) ;
611+ ctx. register_json (
612+ "my_table" ,
613+ path_str,
614+ NdJsonReadOptions { ..Default :: default ( ) } ,
615+ )
616+ . await
617+ . map_err ( to_anyhow) ?;
618+
619+ let df = ctx. sql ( "SELECT * FROM my_table" ) . await . map_err ( to_anyhow) ?;
620+ let schema = df. schema ( ) . clone ( ) . into ( ) ;
621+ let mut datafusion_stream = df. execute_stream ( ) . await . map_err ( to_anyhow) ?;
622+
623+ let ( tx, rx) = tokio:: sync:: mpsc:: channel ( MAX_MPSC_SIZE ) ;
624+ let writer: Arc < Mutex < Option < RecordBatchWriterEnum > > > =
625+ Arc :: new ( Mutex :: new ( Some ( match output_format {
626+ S3ModeFormat :: Parquet => RecordBatchWriterEnum :: Parquet (
627+ ArrowWriter :: try_new ( ChannelWriter { sender : tx. clone ( ) } , Arc :: new ( schema) , None )
628+ . map_err ( to_anyhow) ?,
629+ ) ,
630+
631+ S3ModeFormat :: Csv => {
632+ RecordBatchWriterEnum :: Csv ( csv:: Writer :: new ( ChannelWriter { sender : tx. clone ( ) } ) )
633+ }
634+ S3ModeFormat :: Json => {
635+ RecordBatchWriterEnum :: Json ( json:: Writer :: < _ , JsonArray > :: new ( ChannelWriter {
636+ sender : tx. clone ( ) ,
637+ } ) )
638+ }
639+ } ) ) ) ;
640+
641+ // This spawn is so that the data is sent in the background. Else the function would deadlock
642+ // when hitting the mpsc channel limit
643+ task:: spawn ( async move {
644+ while let Some ( batch_result) = datafusion_stream. next ( ) . await {
645+ let batch: RecordBatch = match batch_result {
646+ Ok ( batch) => batch,
647+ Err ( e) => {
648+ tracing:: error!( "Error in datafusion stream: {:?}" , & e) ;
649+ match tx. send ( Err ( e. into ( ) ) ) . await {
650+ Ok ( _) => { }
651+ Err ( e) => tracing:: error!( "Failed to write error to channel: {:?}" , & e) ,
652+ }
653+ break ;
654+ }
655+ } ;
656+ let writer = writer. clone ( ) ;
657+ // Writer calls blocking_send which would crash if called from the async context
658+ let write_result = task:: spawn_blocking ( move || {
659+ // SAFETY: We await so the code is actually sequential, lock unwrap cannot panic
660+ // Second unwrap is ok because we initialized the option with Some
661+ writer. lock ( ) . unwrap ( ) . as_mut ( ) . unwrap ( ) . write ( & batch)
662+ } )
663+ . await ;
664+ match write_result {
665+ Ok ( Ok ( _) ) => { }
666+ Ok ( Err ( e) ) => {
667+ tracing:: error!( "Error writing batch: {:?}" , & e) ;
668+ match tx. send ( Err ( e. into ( ) ) ) . await {
669+ Ok ( _) => { }
670+ Err ( e) => tracing:: error!( "Failed to write error to channel: {:?}" , & e) ,
671+ }
672+ }
673+ Err ( e) => tracing:: error!( "Error in blocking task: {:?}" , & e) ,
674+ } ;
675+ }
676+ task:: spawn_blocking ( move || {
677+ writer. lock ( ) . unwrap ( ) . take ( ) . unwrap ( ) . close ( ) ?;
678+ drop ( writer) ;
679+ Ok :: < _ , anyhow:: Error > ( ( ) )
680+ } )
681+ . await ??;
682+ drop ( ctx) ;
683+ tokio:: fs:: remove_file ( & path) . await ?;
684+ Ok :: < _ , anyhow:: Error > ( ( ) )
685+ } ) ;
686+
687+ Ok ( tokio_stream:: wrappers:: ReceiverStream :: new ( rx) )
688+ }
0 commit comments