Skip to content

Commit 4aa3598

Browse files
committed
[core] Extract RowTrackingCommitUtils from FileStoreCommitImpl
1 parent b63b25d commit 4aa3598

File tree

2 files changed

+114
-62
lines changed

2 files changed

+114
-62
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 6 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.paimon.io.DataFilePathFactory;
3131
import org.apache.paimon.manifest.FileEntry;
3232
import org.apache.paimon.manifest.FileKind;
33-
import org.apache.paimon.manifest.FileSource;
3433
import org.apache.paimon.manifest.IndexManifestEntry;
3534
import org.apache.paimon.manifest.IndexManifestFile;
3635
import org.apache.paimon.manifest.ManifestCommittable;
@@ -50,6 +49,7 @@
5049
import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
5150
import org.apache.paimon.operation.commit.ManifestEntryChanges;
5251
import org.apache.paimon.operation.commit.RetryCommitResult;
52+
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
5353
import org.apache.paimon.operation.commit.SuccessCommitResult;
5454
import org.apache.paimon.operation.metrics.CommitMetrics;
5555
import org.apache.paimon.operation.metrics.CommitStats;
@@ -62,7 +62,6 @@
6262
import org.apache.paimon.stats.Statistics;
6363
import org.apache.paimon.stats.StatsFileHandler;
6464
import org.apache.paimon.table.BucketMode;
65-
import org.apache.paimon.table.SpecialFields;
6665
import org.apache.paimon.table.sink.CommitCallback;
6766
import org.apache.paimon.table.sink.CommitMessage;
6867
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -95,14 +94,14 @@
9594

9695
import static java.util.Collections.emptyList;
9796
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
98-
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
9997
import static org.apache.paimon.manifest.ManifestEntry.nullableRecordCount;
10098
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
10199
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
102100
import static org.apache.paimon.operation.commit.ConflictDetection.hasConflictChecked;
103101
import static org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
104102
import static org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck;
105103
import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
104+
import static org.apache.paimon.operation.commit.RowTrackingCommitUtils.assignRowTracking;
106105
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
107106
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
108107
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -971,14 +970,10 @@ CommitResult tryCommitOnce(
971970
baseManifestList = manifestList.write(mergeAfterManifests);
972971

973972
if (rowTrackingEnabled) {
974-
// assigned snapshot id to delta files
975-
List<ManifestEntry> snapshotAssigned = new ArrayList<>();
976-
assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
977-
// assign row id for new files
978-
List<ManifestEntry> rowIdAssigned = new ArrayList<>();
979-
nextRowIdStart =
980-
assignRowTrackingMeta(firstRowIdStart, snapshotAssigned, rowIdAssigned);
981-
deltaFiles = rowIdAssigned;
973+
RowTrackingAssigned assigned =
974+
assignRowTracking(newSnapshotId, firstRowIdStart, deltaFiles);
975+
nextRowIdStart = assigned.nextRowIdStart;
976+
deltaFiles = assigned.assignedEntries;
982977
}
983978

984979
// the added records subtract the deleted records from
@@ -1132,57 +1127,6 @@ public boolean replaceManifestList(
11321127
return commitSnapshotImpl(newSnapshot, emptyList());
11331128
}
11341129

1135-
private long assignRowTrackingMeta(
1136-
long firstRowIdStart,
1137-
List<ManifestEntry> deltaFiles,
1138-
List<ManifestEntry> rowIdAssigned) {
1139-
if (deltaFiles.isEmpty()) {
1140-
return firstRowIdStart;
1141-
}
1142-
// assign row id for new files
1143-
long start = firstRowIdStart;
1144-
long blobStart = firstRowIdStart;
1145-
for (ManifestEntry entry : deltaFiles) {
1146-
checkArgument(
1147-
entry.file().fileSource().isPresent(),
1148-
"This is a bug, file source field for row-tracking table must present.");
1149-
boolean containsRowId =
1150-
entry.file().writeCols() != null
1151-
&& entry.file().writeCols().contains(SpecialFields.ROW_ID.name());
1152-
if (entry.file().fileSource().get().equals(FileSource.APPEND)
1153-
&& entry.file().firstRowId() == null
1154-
&& !containsRowId) {
1155-
if (isBlobFile(entry.file().fileName())) {
1156-
if (blobStart >= start) {
1157-
throw new IllegalStateException(
1158-
String.format(
1159-
"This is a bug, blobStart %d should be less than start %d when assigning a blob entry file.",
1160-
blobStart, start));
1161-
}
1162-
long rowCount = entry.file().rowCount();
1163-
rowIdAssigned.add(entry.assignFirstRowId(blobStart));
1164-
blobStart += rowCount;
1165-
} else {
1166-
long rowCount = entry.file().rowCount();
1167-
rowIdAssigned.add(entry.assignFirstRowId(start));
1168-
blobStart = start;
1169-
start += rowCount;
1170-
}
1171-
} else {
1172-
// for compact file, do not assign first row id.
1173-
rowIdAssigned.add(entry);
1174-
}
1175-
}
1176-
return start;
1177-
}
1178-
1179-
private void assignSnapshotId(
1180-
long snapshotId, List<ManifestEntry> deltaFiles, List<ManifestEntry> snapshotAssigned) {
1181-
for (ManifestEntry entry : deltaFiles) {
1182-
snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, snapshotId));
1183-
}
1184-
}
1185-
11861130
public void compactManifest() {
11871131
int retryCount = 0;
11881132
long startMillis = System.currentTimeMillis();
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.manifest.FileSource;
22+
import org.apache.paimon.manifest.ManifestEntry;
23+
import org.apache.paimon.table.SpecialFields;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Optional;
28+
29+
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
30+
import static org.apache.paimon.utils.Preconditions.checkArgument;
31+
32+
/** Utils for row tracking commit. */
33+
public class RowTrackingCommitUtils {
34+
35+
public static RowTrackingAssigned assignRowTracking(
36+
long newSnapshotId, long firstRowIdStart, List<ManifestEntry> deltaFiles) {
37+
// assigned snapshot id to delta files
38+
List<ManifestEntry> snapshotAssigned = new ArrayList<>();
39+
assignSnapshotId(newSnapshotId, deltaFiles, snapshotAssigned);
40+
// assign row id for new files
41+
List<ManifestEntry> rowIdAssigned = new ArrayList<>();
42+
long nextRowIdStart =
43+
assignRowTrackingMeta(firstRowIdStart, snapshotAssigned, rowIdAssigned);
44+
return new RowTrackingAssigned(nextRowIdStart, rowIdAssigned);
45+
}
46+
47+
private static void assignSnapshotId(
48+
long snapshotId, List<ManifestEntry> deltaFiles, List<ManifestEntry> snapshotAssigned) {
49+
for (ManifestEntry entry : deltaFiles) {
50+
snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, snapshotId));
51+
}
52+
}
53+
54+
private static long assignRowTrackingMeta(
55+
long firstRowIdStart,
56+
List<ManifestEntry> deltaFiles,
57+
List<ManifestEntry> rowIdAssigned) {
58+
if (deltaFiles.isEmpty()) {
59+
return firstRowIdStart;
60+
}
61+
// assign row id for new files
62+
long start = firstRowIdStart;
63+
long blobStart = firstRowIdStart;
64+
for (ManifestEntry entry : deltaFiles) {
65+
Optional<FileSource> fileSource = entry.file().fileSource();
66+
checkArgument(
67+
fileSource.isPresent(),
68+
"This is a bug, file source field for row-tracking table must present.");
69+
List<String> writeCols = entry.file().writeCols();
70+
boolean containsRowId =
71+
writeCols != null && writeCols.contains(SpecialFields.ROW_ID.name());
72+
if (fileSource.get().equals(FileSource.APPEND)
73+
&& entry.file().firstRowId() == null
74+
&& !containsRowId) {
75+
long rowCount = entry.file().rowCount();
76+
if (isBlobFile(entry.file().fileName())) {
77+
if (blobStart >= start) {
78+
throw new IllegalStateException(
79+
String.format(
80+
"This is a bug, blobStart %d should be less than start %d when assigning a blob entry file.",
81+
blobStart, start));
82+
}
83+
rowIdAssigned.add(entry.assignFirstRowId(blobStart));
84+
blobStart += rowCount;
85+
} else {
86+
rowIdAssigned.add(entry.assignFirstRowId(start));
87+
blobStart = start;
88+
start += rowCount;
89+
}
90+
} else {
91+
// for compact file, do not assign first row id.
92+
rowIdAssigned.add(entry);
93+
}
94+
}
95+
return start;
96+
}
97+
98+
/** Assigned results. */
99+
public static class RowTrackingAssigned {
100+
public final long nextRowIdStart;
101+
public final List<ManifestEntry> assignedEntries;
102+
103+
public RowTrackingAssigned(long nextRowIdStart, List<ManifestEntry> assignedEntries) {
104+
this.nextRowIdStart = nextRowIdStart;
105+
this.assignedEntries = assignedEntries;
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)