5454import org .apache .paimon .operation .commit .SuccessCommitResult ;
5555import org .apache .paimon .operation .metrics .CommitMetrics ;
5656import org .apache .paimon .operation .metrics .CommitStats ;
57- import org .apache .paimon .options .MemorySize ;
5857import org .apache .paimon .partition .PartitionPredicate ;
5958import org .apache .paimon .partition .PartitionStatistics ;
6059import org .apache .paimon .predicate .Predicate ;
9190import java .util .Objects ;
9291import java .util .Optional ;
9392import java .util .Set ;
93+ import java .util .function .Supplier ;
9494import java .util .stream .Collectors ;
9595
9696import static java .util .Collections .emptyList ;
@@ -135,31 +135,19 @@ public class FileStoreCommitImpl implements FileStoreCommit {
135135 private final String commitUser ;
136136 private final RowType partitionType ;
137137 private final CoreOptions options ;
138- private final String partitionDefaultName ;
139138 private final FileStorePathFactory pathFactory ;
140139 private final SnapshotManager snapshotManager ;
141140 private final ManifestFile manifestFile ;
142141 private final ManifestList manifestList ;
143142 private final IndexManifestFile indexManifestFile ;
144143 @ Nullable private final CommitRollback rollback ;
145144 private final CommitScanner scanner ;
146- private final int numBucket ;
147- private final MemorySize manifestTargetSize ;
148- private final MemorySize manifestFullCompactionSize ;
149- private final int manifestMergeMinCount ;
150- private final boolean dynamicPartitionOverwrite ;
151- private final String branchName ;
152- @ Nullable private final Integer manifestReadParallelism ;
153145 private final List <CommitPreCallback > commitPreCallbacks ;
154146 private final List <CommitCallback > commitCallbacks ;
155147 private final StatsFileHandler statsFileHandler ;
156148 private final BucketMode bucketMode ;
157- private final long commitTimeout ;
158149 private final RetryWaiter retryWaiter ;
159- private final int commitMaxRetries ;
160150 private final InternalRowPartitionComputer partitionComputer ;
161- private final boolean rowTrackingEnabled ;
162- private final boolean discardDuplicateFiles ;
163151 @ Nullable private final StrictModeChecker strictModeChecker ;
164152 private final ConflictDetection conflictDetection ;
165153 private final CommitCleaner commitCleaner ;
@@ -176,32 +164,17 @@ public FileStoreCommitImpl(
176164 String commitUser ,
177165 RowType partitionType ,
178166 CoreOptions options ,
179- String partitionDefaultName ,
180167 FileStorePathFactory pathFactory ,
181168 SnapshotManager snapshotManager ,
182169 ManifestFile .Factory manifestFileFactory ,
183170 ManifestList .Factory manifestListFactory ,
184171 IndexManifestFile .Factory indexManifestFileFactory ,
185- FileStoreScan scan ,
186- int numBucket ,
187- MemorySize manifestTargetSize ,
188- MemorySize manifestFullCompactionSize ,
189- int manifestMergeMinCount ,
190- boolean dynamicPartitionOverwrite ,
191- String branchName ,
172+ Supplier <FileStoreScan > scanSupplier ,
192173 StatsFileHandler statsFileHandler ,
193174 BucketMode bucketMode ,
194- @ Nullable Integer manifestReadParallelism ,
195175 List <CommitPreCallback > commitPreCallbacks ,
196176 List <CommitCallback > commitCallbacks ,
197- int commitMaxRetries ,
198- long commitTimeout ,
199- long commitMinRetryWait ,
200- long commitMaxRetryWait ,
201- boolean rowTrackingEnabled ,
202- boolean discardDuplicateFiles ,
203177 ConflictDetection .Factory conflictDetectFactory ,
204- @ Nullable StrictModeChecker strictModeChecker ,
205178 @ Nullable CommitRollback rollback ) {
206179 this .snapshotCommit = snapshotCommit ;
207180 this .fileIO = fileIO ;
@@ -210,26 +183,17 @@ public FileStoreCommitImpl(
210183 this .commitUser = commitUser ;
211184 this .partitionType = partitionType ;
212185 this .options = options ;
213- this .partitionDefaultName = partitionDefaultName ;
214186 this .pathFactory = pathFactory ;
215187 this .snapshotManager = snapshotManager ;
216188 this .manifestFile = manifestFileFactory .create ();
217189 this .manifestList = manifestListFactory .create ();
218190 this .indexManifestFile = indexManifestFileFactory .create ();
219191 this .rollback = rollback ;
220- this .scanner = new CommitScanner (scan , indexManifestFile , options );
221- this .numBucket = numBucket ;
222- this .manifestTargetSize = manifestTargetSize ;
223- this .manifestFullCompactionSize = manifestFullCompactionSize ;
224- this .manifestMergeMinCount = manifestMergeMinCount ;
225- this .dynamicPartitionOverwrite = dynamicPartitionOverwrite ;
226- this .branchName = branchName ;
227- this .manifestReadParallelism = manifestReadParallelism ;
192+ this .scanner = new CommitScanner (scanSupplier .get (), indexManifestFile , options );
228193 this .commitPreCallbacks = commitPreCallbacks ;
229194 this .commitCallbacks = commitCallbacks ;
230- this .commitMaxRetries = commitMaxRetries ;
231- this .commitTimeout = commitTimeout ;
232- this .retryWaiter = new RetryWaiter (commitMinRetryWait , commitMaxRetryWait );
195+ this .retryWaiter =
196+ new RetryWaiter (options .commitMinRetryWait (), options .commitMaxRetryWait ());
233197 this .partitionComputer =
234198 new InternalRowPartitionComputer (
235199 options .partitionDefaultName (),
@@ -240,9 +204,16 @@ public FileStoreCommitImpl(
240204 this .commitMetrics = null ;
241205 this .statsFileHandler = statsFileHandler ;
242206 this .bucketMode = bucketMode ;
243- this .rowTrackingEnabled = rowTrackingEnabled ;
244- this .discardDuplicateFiles = discardDuplicateFiles ;
245- this .strictModeChecker = strictModeChecker ;
207+ this .strictModeChecker =
208+ options .commitStrictModeLastSafeSnapshot ()
209+ .map (
210+ id ->
211+ new StrictModeChecker (
212+ snapshotManager ,
213+ commitUser ,
214+ scanSupplier .get (),
215+ id ))
216+ .orElse (null );
246217 this .conflictDetection = conflictDetectFactory .create (scanner );
247218 this .commitCleaner = new CommitCleaner (manifestList , manifestFile , indexManifestFile );
248219 }
@@ -468,7 +439,7 @@ public int overwritePartition(
468439 boolean skipOverwrite = false ;
469440 // partition filter is built from static or dynamic partition according to properties
470441 PartitionPredicate partitionFilter = null ;
471- if (dynamicPartitionOverwrite ) {
442+ if (partitionType . getFieldCount () > 0 && options . dynamicPartitionOverwrite () ) {
472443 if (changes .appendTableFiles .isEmpty ()) {
473444 // in dynamic mode, if there is no changes to commit, no data will be deleted
474445 skipOverwrite = true ;
@@ -482,7 +453,8 @@ public int overwritePartition(
482453 } else {
483454 // partition may be partial partition fields, so here must use predicate way.
484455 Predicate partitionPredicate =
485- createPartitionPredicate (partition , partitionType , partitionDefaultName );
456+ createPartitionPredicate (
457+ partition , partitionType , options .partitionDefaultName ());
486458 partitionFilter =
487459 PartitionPredicate .fromPredicate (partitionType , partitionPredicate );
488460 // sanity check, all changes must be done within the given partition
@@ -613,16 +585,19 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden
613585 PartitionPredicate partitionFilter ;
614586 if (fullMode ) {
615587 List <BinaryRow > binaryPartitions =
616- createBinaryPartitions (partitions , partitionType , partitionDefaultName );
588+ createBinaryPartitions (
589+ partitions , partitionType , options .partitionDefaultName ());
617590 partitionFilter = PartitionPredicate .fromMultiple (partitionType , binaryPartitions );
618591 } else {
619- // partitions may be partial partition fields, so here must to use predicate way.
592+ // partitions may be partial partition fields, so here must use predicate way.
620593 Predicate predicate =
621594 partitions .stream ()
622595 .map (
623596 partition ->
624597 createPartitionPredicate (
625- partition , partitionType , partitionDefaultName ))
598+ partition ,
599+ partitionType ,
600+ options .partitionDefaultName ()))
626601 .reduce (PredicateBuilder ::or )
627602 .orElseThrow (
628603 () -> new RuntimeException ("Failed to get partition filter." ));
@@ -688,7 +663,7 @@ public FileIO fileIO() {
688663 }
689664
690665 private ManifestEntryChanges collectChanges (List <CommitMessage > commitMessages ) {
691- ManifestEntryChanges changes = new ManifestEntryChanges (numBucket );
666+ ManifestEntryChanges changes = new ManifestEntryChanges (options . bucket () );
692667 commitMessages .forEach (changes ::collect );
693668 LOG .info ("Finished collecting changes, including: {}" , changes );
694669 return changes ;
@@ -730,12 +705,12 @@ private int tryCommit(
730705
731706 retryResult = (RetryCommitResult ) result ;
732707
733- if (System .currentTimeMillis () - startMillis > commitTimeout
734- || retryCount >= commitMaxRetries ) {
708+ if (System .currentTimeMillis () - startMillis > options . commitTimeout ()
709+ || retryCount >= options . commitMaxRetries () ) {
735710 String message =
736711 String .format (
737712 "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs." ,
738- commitTimeout , retryCount );
713+ options . commitTimeout () , retryCount );
739714 throw new RuntimeException (message , retryResult .exception );
740715 }
741716
@@ -761,7 +736,11 @@ private int tryOverwritePartition(
761736 return tryCommit (
762737 latestSnapshot ->
763738 scanner .readOverwriteChanges (
764- numBucket , changes , indexFiles , latestSnapshot , partitionFilter ),
739+ options .bucket (),
740+ changes ,
741+ indexFiles ,
742+ latestSnapshot ,
743+ partitionFilter ),
765744 identifier ,
766745 watermark ,
767746 properties ,
@@ -836,7 +815,8 @@ CommitResult tryCommitOnce(
836815 }
837816
838817 List <SimpleFileEntry > baseDataFiles = new ArrayList <>();
839- boolean discardDuplicate = discardDuplicateFiles && commitKind == CommitKind .APPEND ;
818+ boolean discardDuplicate =
819+ options .commitDiscardDuplicateFiles () && commitKind == CommitKind .APPEND ;
840820 if (latestSnapshot != null && (discardDuplicate || detectConflicts )) {
841821 // latestSnapshotId is different from the snapshot id we've checked for conflicts,
842822 // so we have to check again
@@ -920,14 +900,14 @@ CommitResult tryCommitOnce(
920900 ManifestFileMerger .merge (
921901 mergeBeforeManifests ,
922902 manifestFile ,
923- manifestTargetSize .getBytes (),
924- manifestMergeMinCount ,
925- manifestFullCompactionSize .getBytes (),
903+ options . manifestTargetSize () .getBytes (),
904+ options . manifestMergeMinCount () ,
905+ options . manifestFullCompactionThresholdSize () .getBytes (),
926906 partitionType ,
927- manifestReadParallelism );
907+ options . scanManifestParallelism () );
928908 baseManifestList = manifestList .write (mergeAfterManifests );
929909
930- if (rowTrackingEnabled ) {
910+ if (options . rowTrackingEnabled () ) {
931911 RowTrackingAssigned assigned =
932912 assignRowTracking (newSnapshotId , firstRowIdStart , deltaFiles );
933913 nextRowIdStart = assigned .nextRowIdStart ;
@@ -1097,12 +1077,12 @@ public void compactManifest() {
10971077 break ;
10981078 }
10991079
1100- if (System .currentTimeMillis () - startMillis > commitTimeout
1101- || retryCount >= commitMaxRetries ) {
1080+ if (System .currentTimeMillis () - startMillis > options . commitTimeout ()
1081+ || retryCount >= options . commitMaxRetries () ) {
11021082 throw new RuntimeException (
11031083 String .format (
11041084 "Commit failed after %s millis with %s retries, there maybe exist commit conflicts between multiple jobs." ,
1105- commitTimeout , retryCount ));
1085+ options . commitTimeout () , retryCount ));
11061086 }
11071087
11081088 retryWaiter .retryWait (retryCount );
@@ -1126,11 +1106,11 @@ private boolean compactManifestOnce() {
11261106 ManifestFileMerger .merge (
11271107 mergeBeforeManifests ,
11281108 manifestFile ,
1129- manifestTargetSize .getBytes (),
1109+ options . manifestTargetSize () .getBytes (),
11301110 1 ,
11311111 1 ,
11321112 partitionType ,
1133- manifestReadParallelism );
1113+ options . scanManifestParallelism () );
11341114
11351115 if (new HashSet <>(mergeBeforeManifests ).equals (new HashSet <>(mergeAfterManifests ))) {
11361116 // no need to commit this snapshot, because no compact were happened
@@ -1173,7 +1153,7 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> de
11731153 for (PartitionEntry entry : deltaStatistics ) {
11741154 statistics .add (entry .toPartitionStatistics (partitionComputer ));
11751155 }
1176- return snapshotCommit .commit (newSnapshot , branchName , statistics );
1156+ return snapshotCommit .commit (newSnapshot , options . branch () , statistics );
11771157 } catch (Throwable e ) {
11781158 // exception when performing the atomic rename,
11791159 // we cannot clean up because we can't determine the success
0 commit comments