5
5
import time
6
6
import uuid
7
7
import posixpath
8
+ from pathlib import PosixPath
8
9
import threading
9
10
from collections import defaultdict
10
11
@@ -247,6 +248,14 @@ def append_locator_write_path(self, write_path: str):
247
248
locator_write_paths = self ["locator_write_paths" ] = []
248
249
locator_write_paths .append (write_path )
249
250
251
+ @metafile_write_paths .setter
252
+ def metafile_write_paths (self , write_paths : List [str ]) -> None :
253
+ self ["metafile_write_paths" ] = write_paths
254
+
255
+ @locator_write_paths .setter
256
+ def locator_write_paths (self , write_paths : List [str ]):
257
+ self ["locator_write_paths" ] = write_paths
258
+
250
259
251
260
class TransactionOperationList (List [TransactionOperation ]):
252
261
@staticmethod
@@ -436,7 +445,54 @@ def _mark_end_time(self, time_provider: TransactionTimeProvider) -> int:
436
445
end_time = self ["end_time" ] = time_provider .end_time ()
437
446
return end_time
438
447
439
- def to_serializable (self ) -> Transaction :
448
+ @staticmethod
449
+ def _abs_txn_meta_path_to_relative (root : str , target : str ) -> str :
450
+ """
451
+ Takes an absolute root directory path and target absolute path to
452
+ relativize with respect to the root directory. Returns the target
453
+ path relative to the root directory path. Raises an error if the
454
+ target path is not contained in the given root directory path, if
455
+ either path is not an absolute path, or if the target path is equal
456
+ to the root directory path.
457
+ """
458
+ root_path = PosixPath (root )
459
+ target_path = PosixPath (target )
460
+ # TODO (martinezdavid): Check why is_absolute() fails for certain Delta paths
461
+ # if not root_path.is_absolute() or not target_path.is_absolute():
462
+ # raise ValueError("Both root and target must be absolute paths.")
463
+ if root_path == target_path :
464
+ raise ValueError (
465
+ "Target and root are identical, but expected target to be a child of root."
466
+ )
467
+ try :
468
+ relative_path = target_path .relative_to (root_path )
469
+ except ValueError :
470
+ raise ValueError ("Expected target to be a child of root." )
471
+ return str (relative_path )
472
+
473
+ def relativize_operation_paths (
474
+ self , operation : TransactionOperation , catalog_root : str
475
+ ) -> None :
476
+ """
477
+ Converts all absolute paths in an operation to relative paths
478
+ with respect to the catalog root directory.
479
+ """
480
+ # handle metafile paths
481
+ if operation .metafile_write_paths :
482
+ metafile_write_paths = [
483
+ Transaction ._abs_txn_meta_path_to_relative (catalog_root , path )
484
+ for path in operation .metafile_write_paths
485
+ ]
486
+ operation .metafile_write_paths = metafile_write_paths
487
+ # handle locator paths
488
+ if operation .locator_write_paths :
489
+ locator_write_paths = [
490
+ Transaction ._abs_txn_meta_path_to_relative (catalog_root , path )
491
+ for path in operation .locator_write_paths
492
+ ]
493
+ operation .locator_write_paths = locator_write_paths
494
+
495
+ def to_serializable (self , catalog_root ) -> Transaction :
440
496
"""
441
497
Prepare the object for serialization by converting any non-serializable
442
498
types to serializable types. May also run any required pre-write
@@ -459,6 +515,8 @@ def to_serializable(self) -> Transaction:
459
515
f"Transaction operation ${ operation } src metafile does "
460
516
f"not have ID: ${ operation .src_metafile } "
461
517
)
518
+ # relativize after checking that dest and src metafiles are valid
519
+ self .relativize_operation_paths (operation , catalog_root )
462
520
operation .dest_metafile = {
463
521
"id" : operation .dest_metafile .id ,
464
522
"locator" : operation .dest_metafile .locator ,
@@ -583,7 +641,7 @@ def _commit_write(
583
641
self .id ,
584
642
)
585
643
with filesystem .open_output_stream (running_txn_log_file_path ) as file :
586
- packed = msgpack .dumps (self .to_serializable ())
644
+ packed = msgpack .dumps (self .to_serializable (catalog_root_normalized ))
587
645
file .write (packed )
588
646
589
647
# write each metafile associated with the transaction
@@ -615,7 +673,7 @@ def _commit_write(
615
673
self .id ,
616
674
)
617
675
with filesystem .open_output_stream (failed_txn_log_file_path ) as file :
618
- packed = msgpack .dumps (self .to_serializable ())
676
+ packed = msgpack .dumps (self .to_serializable (catalog_root_normalized ))
619
677
file .write (packed )
620
678
621
679
###################################################################
@@ -656,7 +714,7 @@ def _commit_write(
656
714
str (end_time ),
657
715
)
658
716
with filesystem .open_output_stream (success_txn_log_file_path ) as file :
659
- packed = msgpack .dumps (self .to_serializable ())
717
+ packed = msgpack .dumps (self .to_serializable (catalog_root_normalized ))
660
718
file .write (packed )
661
719
try :
662
720
Transaction ._validate_txn_log_file (
0 commit comments