@@ -33,7 +33,7 @@ use parquet::basic::Compression;
3333use parquet:: file:: properties:: WriterProperties ;
3434use std:: collections:: HashMap ;
3535
36- use super :: { ReadResult , WriteResult } ;
36+ use super :: { BatchOperation , ReadResult , WriteResult } ;
3737
3838/// Unified S3 storage backend that implements both [`Source`] and [`Target`].
3939///
@@ -102,6 +102,87 @@ impl S3Storage {
102102 ObjectPath :: from ( format ! ( "{}/{table_name}/" , self . prefix) )
103103 }
104104 }
105+
106+ pub ( crate ) fn batch_metadata_object_path ( & self , table_name : & str , batch_id : u64 ) -> ObjectPath {
107+ if self . prefix . is_empty ( ) {
108+ ObjectPath :: from ( format ! ( "{table_name}/batch-{batch_id:06}.metadata.json" ) )
109+ } else {
110+ ObjectPath :: from ( format ! (
111+ "{}/{table_name}/batch-{batch_id:06}.metadata.json" ,
112+ self . prefix
113+ ) )
114+ }
115+ }
116+
117+ async fn read_batch_operation (
118+ & self ,
119+ table_name : & str ,
120+ batch_id : u64 ,
121+ ) -> anyhow:: Result < BatchOperation > {
122+ let metadata_path = self . batch_metadata_object_path ( table_name, batch_id) ;
123+ let get_result = match self . store . get ( & metadata_path) . await {
124+ Ok ( r) => r,
125+ Err ( object_store:: Error :: NotFound { .. } ) => return Ok ( BatchOperation :: Insert ) ,
126+ Err ( e) => return Err ( e. into ( ) ) ,
127+ } ;
128+
129+ let bytes = get_result. bytes ( ) . await ?;
130+ let json: serde_json:: Value = serde_json:: from_slice ( & bytes) ?;
131+
132+ let op = json
133+ . get ( "operation" )
134+ . and_then ( serde_json:: Value :: as_str)
135+ . or_else ( || json. get ( "op" ) . and_then ( serde_json:: Value :: as_str) )
136+ . unwrap_or ( "insert" )
137+ . to_ascii_lowercase ( ) ;
138+
139+ let parse_key_columns = || -> anyhow:: Result < Vec < String > > {
140+ let Some ( keys_value) = json. get ( "key_columns" ) else {
141+ anyhow:: bail!(
142+ "Missing 'key_columns' in metadata sidecar for {}" ,
143+ metadata_path
144+ ) ;
145+ } ;
146+ let Some ( keys) = keys_value. as_array ( ) else {
147+ anyhow:: bail!(
148+ "Invalid 'key_columns' (expected string array) in metadata sidecar for {}" ,
149+ metadata_path
150+ ) ;
151+ } ;
152+
153+ let parsed = keys
154+ . iter ( )
155+ . map ( |v| {
156+ v. as_str ( ) . map ( ToOwned :: to_owned) . ok_or_else ( || {
157+ anyhow:: anyhow!(
158+ "Invalid key column entry (expected string) in metadata sidecar for {}" ,
159+ metadata_path
160+ )
161+ } )
162+ } )
163+ . collect :: < anyhow:: Result < Vec < String > > > ( ) ?;
164+
165+ if parsed. is_empty ( ) {
166+ anyhow:: bail!( "'key_columns' cannot be empty in metadata sidecar for {}" , metadata_path) ;
167+ }
168+
169+ Ok ( parsed)
170+ } ;
171+
172+ match op. as_str ( ) {
173+ "insert" => Ok ( BatchOperation :: Insert ) ,
174+ "update" => Ok ( BatchOperation :: Update {
175+ key_columns : parse_key_columns ( ) ?,
176+ } ) ,
177+ "delete" => Ok ( BatchOperation :: Delete {
178+ key_columns : parse_key_columns ( ) ?,
179+ } ) ,
180+ other => anyhow:: bail!(
181+ "Unsupported operation '{other}' in metadata sidecar for {}" ,
182+ metadata_path
183+ ) ,
184+ }
185+ }
105186}
106187
107188#[ async_trait]
@@ -179,6 +260,33 @@ impl DataStorage for S3Storage {
179260 } )
180261 }
181262
263+ async fn write_batch_operation (
264+ & self ,
265+ table_name : & str ,
266+ batch_id : u64 ,
267+ operation : & BatchOperation ,
268+ ) -> anyhow:: Result < ( ) > {
269+ let path = self . batch_metadata_object_path ( table_name, batch_id) ;
270+
271+ let value = match operation {
272+ BatchOperation :: Insert => serde_json:: json!( {
273+ "operation" : "insert"
274+ } ) ,
275+ BatchOperation :: Update { key_columns } => serde_json:: json!( {
276+ "operation" : "update" ,
277+ "key_columns" : key_columns,
278+ } ) ,
279+ BatchOperation :: Delete { key_columns } => serde_json:: json!( {
280+ "operation" : "delete" ,
281+ "key_columns" : key_columns,
282+ } ) ,
283+ } ;
284+
285+ let bytes = serde_json:: to_vec ( & value) ?;
286+ self . store . put ( & path, PutPayload :: from ( bytes) ) . await ?;
287+ Ok ( ( ) )
288+ }
289+
182290 async fn list_batches ( & self , table_name : & str ) -> anyhow:: Result < Vec < String > > {
183291 let prefix = self . table_object_prefix ( table_name) ;
184292
@@ -218,10 +326,13 @@ impl DataStorage for S3Storage {
218326 batches. push ( batch) ;
219327 }
220328
329+ let operation = self . read_batch_operation ( table_name, batch_id) . await ?;
330+
221331 Ok ( Some ( ReadResult {
222332 batches,
223333 rows_read,
224334 bytes_read,
335+ operation,
225336 } ) )
226337 }
227338}
0 commit comments