@@ -21,6 +21,19 @@ use object_store::aws::AmazonS3Builder;
2121use object_store:: path:: Path as ObjectPath ;
2222
2323use crate :: config:: TargetConfig ;
24+ use crate :: storage:: DataStorage ;
25+
26+ use arrow:: array:: RecordBatch ;
27+ use async_trait:: async_trait;
28+ use futures:: TryStreamExt ;
29+ use object_store:: PutPayload ;
30+ use parquet:: arrow:: ArrowWriter ;
31+ use parquet:: arrow:: arrow_reader:: ParquetRecordBatchReaderBuilder ;
32+ use parquet:: basic:: Compression ;
33+ use parquet:: file:: properties:: WriterProperties ;
34+ use std:: collections:: HashMap ;
35+
36+ use super :: { ReadResult , WriteResult } ;
2437
2538/// Unified S3 storage backend that implements both [`Source`] and [`Target`].
2639///
@@ -90,3 +103,125 @@ impl S3Storage {
90103 }
91104 }
92105}
106+
107+ #[ async_trait]
108+ impl DataStorage for S3Storage {
109+ fn expected_files ( & self , table_name : & str , batch_ids : & [ u64 ] ) -> Vec < String > {
110+ batch_ids
111+ . iter ( )
112+ . map ( |id| {
113+ if self . prefix . is_empty ( ) {
114+ format ! ( "s3://{}/{table_name}/batch-{id:06}.parquet" , self . bucket)
115+ } else {
116+ format ! (
117+ "s3://{}/{}/{table_name}/batch-{id:06}.parquet" ,
118+ self . bucket, self . prefix
119+ )
120+ }
121+ } )
122+ . collect ( )
123+ }
124+
125+ fn table_params ( & self , table_name : & str ) -> HashMap < String , serde_json:: Value > {
126+ let mut params = HashMap :: new ( ) ;
127+ params. insert (
128+ "connector" . to_string ( ) ,
129+ serde_json:: Value :: String ( "s3" . to_string ( ) ) ,
130+ ) ;
131+ params. insert (
132+ "from" . to_string ( ) ,
133+ serde_json:: Value :: String ( self . table_s3_path ( table_name) ) ,
134+ ) ;
135+ params. insert (
136+ "file_format" . to_string ( ) ,
137+ serde_json:: Value :: String ( "parquet" . to_string ( ) ) ,
138+ ) ;
139+
140+ if let Some ( region) = & self . region {
141+ params. insert (
142+ "s3_region" . to_string ( ) ,
143+ serde_json:: Value :: String ( region. clone ( ) ) ,
144+ ) ;
145+ }
146+
147+ params
148+ }
149+
150+ async fn write (
151+ & self ,
152+ table_name : & str ,
153+ batch_id : u64 ,
154+ batch : RecordBatch ,
155+ ) -> anyhow:: Result < WriteResult > {
156+ let rows = batch. num_rows ( ) as u64 ;
157+ let schema = batch. schema ( ) ;
158+
159+ // Serialize RecordBatch to Parquet bytes in memory
160+ let props = WriterProperties :: builder ( )
161+ . set_compression ( Compression :: SNAPPY )
162+ . build ( ) ;
163+
164+ let mut buf = Vec :: new ( ) ;
165+ let mut writer = ArrowWriter :: try_new ( & mut buf, schema, Some ( props) ) ?;
166+ writer. write ( & batch) ?;
167+ writer. close ( ) ?;
168+
169+ let bytes_written = buf. len ( ) as u64 ;
170+
171+ // Upload to S3 with per-table directory structure
172+ let path = self . batch_object_path ( table_name, batch_id) ;
173+
174+ self . store . put ( & path, PutPayload :: from ( buf) ) . await ?;
175+
176+ Ok ( WriteResult {
177+ rows_written : rows,
178+ bytes_written,
179+ } )
180+ }
181+
182+ async fn list_batches ( & self , table_name : & str ) -> anyhow:: Result < Vec < String > > {
183+ let prefix = self . table_object_prefix ( table_name) ;
184+
185+ let objects: Vec < _ > = self . store . list ( Some ( & prefix) ) . try_collect ( ) . await ?;
186+
187+ let paths: Vec < String > = objects
188+ . into_iter ( )
189+ . filter ( |meta| meta. location . as_ref ( ) . ends_with ( ".parquet" ) )
190+ . map ( |meta| meta. location . to_string ( ) )
191+ . collect ( ) ;
192+
193+ Ok ( paths)
194+ }
195+
196+ async fn read_batch (
197+ & self ,
198+ table_name : & str ,
199+ batch_id : u64 ,
200+ ) -> anyhow:: Result < Option < ReadResult > > {
201+ let location = self . batch_object_path ( table_name, batch_id) ;
202+
203+ let get_result = match self . store . get ( & location) . await {
204+ Ok ( r) => r,
205+ Err ( object_store:: Error :: NotFound { .. } ) => return Ok ( None ) ,
206+ Err ( e) => return Err ( e. into ( ) ) ,
207+ } ;
208+ let bytes = get_result. bytes ( ) . await ?;
209+ let bytes_read = bytes. len ( ) as u64 ;
210+
211+ let reader = ParquetRecordBatchReaderBuilder :: try_new ( bytes) ?. build ( ) ?;
212+
213+ let mut batches = Vec :: new ( ) ;
214+ let mut rows_read = 0u64 ;
215+ for batch in reader {
216+ let batch = batch?;
217+ rows_read += batch. num_rows ( ) as u64 ;
218+ batches. push ( batch) ;
219+ }
220+
221+ Ok ( Some ( ReadResult {
222+ batches,
223+ rows_read,
224+ bytes_read,
225+ } ) )
226+ }
227+ }
0 commit comments