@@ -33,7 +33,7 @@ use parquet::basic::Compression;
3333use parquet:: file:: properties:: WriterProperties ;
3434use std:: collections:: HashMap ;
3535
36- use super :: { ReadResult , WriteResult } ;
36+ use super :: { BatchOperation , ReadResult , WriteResult } ;
3737
3838/// Unified S3 storage backend that implements both [`Source`] and [`Target`].
3939///
@@ -102,6 +102,90 @@ impl S3Storage {
102102 ObjectPath :: from ( format ! ( "{}/{table_name}/" , self . prefix) )
103103 }
104104 }
105+
106+ pub ( crate ) fn batch_metadata_object_path ( & self , table_name : & str , batch_id : u64 ) -> ObjectPath {
107+ if self . prefix . is_empty ( ) {
108+ ObjectPath :: from ( format ! ( "{table_name}/batch-{batch_id:06}.metadata.json" ) )
109+ } else {
110+ ObjectPath :: from ( format ! (
111+ "{}/{table_name}/batch-{batch_id:06}.metadata.json" ,
112+ self . prefix
113+ ) )
114+ }
115+ }
116+
117+ async fn read_batch_operation (
118+ & self ,
119+ table_name : & str ,
120+ batch_id : u64 ,
121+ ) -> anyhow:: Result < BatchOperation > {
122+ let metadata_path = self . batch_metadata_object_path ( table_name, batch_id) ;
123+ let get_result = match self . store . get ( & metadata_path) . await {
124+ Ok ( r) => r,
125+ Err ( object_store:: Error :: NotFound { .. } ) => return Ok ( BatchOperation :: Insert ) ,
126+ Err ( e) => return Err ( e. into ( ) ) ,
127+ } ;
128+
129+ let bytes = get_result. bytes ( ) . await ?;
130+ let json: serde_json:: Value = serde_json:: from_slice ( & bytes) ?;
131+
132+ let op = json
133+ . get ( "operation" )
134+ . and_then ( serde_json:: Value :: as_str)
135+ . or_else ( || json. get ( "op" ) . and_then ( serde_json:: Value :: as_str) )
136+ . unwrap_or ( "insert" )
137+ . to_ascii_lowercase ( ) ;
138+
139+ let parse_key_columns = || -> anyhow:: Result < Vec < String > > {
140+ let Some ( keys_value) = json. get ( "key_columns" ) else {
141+ anyhow:: bail!(
142+ "Missing 'key_columns' in metadata sidecar for {}" ,
143+ metadata_path
144+ ) ;
145+ } ;
146+ let Some ( keys) = keys_value. as_array ( ) else {
147+ anyhow:: bail!(
148+ "Invalid 'key_columns' (expected string array) in metadata sidecar for {}" ,
149+ metadata_path
150+ ) ;
151+ } ;
152+
153+ let parsed = keys
154+ . iter ( )
155+ . map ( |v| {
156+ v. as_str ( ) . map ( ToOwned :: to_owned) . ok_or_else ( || {
157+ anyhow:: anyhow!(
158+ "Invalid key column entry (expected string) in metadata sidecar for {}" ,
159+ metadata_path
160+ )
161+ } )
162+ } )
163+ . collect :: < anyhow:: Result < Vec < String > > > ( ) ?;
164+
165+ if parsed. is_empty ( ) {
166+ anyhow:: bail!(
167+ "'key_columns' cannot be empty in metadata sidecar for {}" ,
168+ metadata_path
169+ ) ;
170+ }
171+
172+ Ok ( parsed)
173+ } ;
174+
175+ match op. as_str ( ) {
176+ "insert" => Ok ( BatchOperation :: Insert ) ,
177+ "update" => Ok ( BatchOperation :: Update {
178+ key_columns : parse_key_columns ( ) ?,
179+ } ) ,
180+ "delete" => Ok ( BatchOperation :: Delete {
181+ key_columns : parse_key_columns ( ) ?,
182+ } ) ,
183+ other => anyhow:: bail!(
184+ "Unsupported operation '{other}' in metadata sidecar for {}" ,
185+ metadata_path
186+ ) ,
187+ }
188+ }
105189}
106190
107191#[ async_trait]
@@ -179,6 +263,33 @@ impl DataStorage for S3Storage {
179263 } )
180264 }
181265
266+ async fn write_batch_operation (
267+ & self ,
268+ table_name : & str ,
269+ batch_id : u64 ,
270+ operation : & BatchOperation ,
271+ ) -> anyhow:: Result < ( ) > {
272+ let path = self . batch_metadata_object_path ( table_name, batch_id) ;
273+
274+ let value = match operation {
275+ BatchOperation :: Insert => serde_json:: json!( {
276+ "operation" : "insert"
277+ } ) ,
278+ BatchOperation :: Update { key_columns } => serde_json:: json!( {
279+ "operation" : "update" ,
280+ "key_columns" : key_columns,
281+ } ) ,
282+ BatchOperation :: Delete { key_columns } => serde_json:: json!( {
283+ "operation" : "delete" ,
284+ "key_columns" : key_columns,
285+ } ) ,
286+ } ;
287+
288+ let bytes = serde_json:: to_vec ( & value) ?;
289+ self . store . put ( & path, PutPayload :: from ( bytes) ) . await ?;
290+ Ok ( ( ) )
291+ }
292+
182293 async fn list_batches ( & self , table_name : & str ) -> anyhow:: Result < Vec < String > > {
183294 let prefix = self . table_object_prefix ( table_name) ;
184295
@@ -218,10 +329,13 @@ impl DataStorage for S3Storage {
218329 batches. push ( batch) ;
219330 }
220331
332+ let operation = self . read_batch_operation ( table_name, batch_id) . await ?;
333+
221334 Ok ( Some ( ReadResult {
222335 batches,
223336 rows_read,
224337 bytes_read,
338+ operation,
225339 } ) )
226340 }
227341}
0 commit comments