Skip to content

Commit 06cedb5

Browse files
committed
[core] Extract ChangelogManager from SnapshotManager
1 parent 4573ae4 commit 06cedb5

36 files changed

+694
-314
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.paimon.tag.SuccessFileTagCallback;
5656
import org.apache.paimon.tag.TagAutoManager;
5757
import org.apache.paimon.types.RowType;
58+
import org.apache.paimon.utils.ChangelogManager;
5859
import org.apache.paimon.utils.FileStorePathFactory;
5960
import org.apache.paimon.utils.SegmentsCache;
6061
import org.apache.paimon.utils.SnapshotManager;
@@ -176,6 +177,11 @@ public SnapshotManager snapshotManager() {
176177
snapshotCache);
177178
}
178179

180+
@Override
181+
public ChangelogManager changelogManager() {
182+
return new ChangelogManager(fileIO, options.path(), options.branch());
183+
}
184+
179185
@Override
180186
public ManifestFile.Factory manifestFileFactory() {
181187
return manifestFileFactory(false);

paimon-core/src/main/java/org/apache/paimon/FileStore.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.table.sink.TagCallback;
4040
import org.apache.paimon.tag.TagAutoManager;
4141
import org.apache.paimon.types.RowType;
42+
import org.apache.paimon.utils.ChangelogManager;
4243
import org.apache.paimon.utils.FileStorePathFactory;
4344
import org.apache.paimon.utils.SegmentsCache;
4445
import org.apache.paimon.utils.SnapshotManager;
@@ -61,6 +62,8 @@ public interface FileStore<T> {
6162

6263
SnapshotManager snapshotManager();
6364

65+
ChangelogManager changelogManager();
66+
6467
RowType partitionType();
6568

6669
CoreOptions options();

paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.paimon.manifest.ManifestList;
3131
import org.apache.paimon.schema.SchemaManager;
3232
import org.apache.paimon.table.FileStoreTable;
33+
import org.apache.paimon.utils.ChangelogManager;
3334
import org.apache.paimon.utils.DateTimeUtils;
3435
import org.apache.paimon.utils.FileStorePathFactory;
3536
import org.apache.paimon.utils.FileSystemBranchManager;
@@ -60,10 +61,10 @@
6061
import java.util.stream.Collectors;
6162

6263
import static java.util.Collections.emptyList;
64+
import static org.apache.paimon.utils.ChangelogManager.CHANGELOG_PREFIX;
6365
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
64-
import static org.apache.paimon.utils.SnapshotManager.CHANGELOG_PREFIX;
65-
import static org.apache.paimon.utils.SnapshotManager.EARLIEST;
66-
import static org.apache.paimon.utils.SnapshotManager.LATEST;
66+
import static org.apache.paimon.utils.HintFileUtils.EARLIEST;
67+
import static org.apache.paimon.utils.HintFileUtils.LATEST;
6768
import static org.apache.paimon.utils.SnapshotManager.SNAPSHOT_PREFIX;
6869
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
6970

@@ -133,6 +134,7 @@ protected void cleanSnapshotDir(
133134
for (String branch : branches) {
134135
FileStoreTable branchTable = table.switchToBranch(branch);
135136
SnapshotManager snapshotManager = branchTable.snapshotManager();
137+
ChangelogManager changelogManager = branchTable.changelogManager();
136138

137139
// specially handle the snapshot directory
138140
List<Pair<Path, Long>> nonSnapshotFiles =
@@ -146,7 +148,7 @@ protected void cleanSnapshotDir(
146148

147149
// specially handle the changelog directory
148150
List<Pair<Path, Long>> nonChangelogFiles =
149-
tryGetNonChangelogFiles(snapshotManager.changelogDirectory(), this::oldEnough);
151+
tryGetNonChangelogFiles(changelogManager.changelogDirectory(), this::oldEnough);
150152
nonChangelogFiles.forEach(
151153
nonChangelogFile ->
152154
cleanFile(
@@ -234,10 +236,11 @@ protected void cleanFile(Path path) {
234236
protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException {
235237
FileStoreTable branchTable = table.switchToBranch(branch);
236238
SnapshotManager snapshotManager = branchTable.snapshotManager();
239+
ChangelogManager changelogManager = branchTable.changelogManager();
237240
TagManager tagManager = branchTable.tagManager();
238241
Set<Snapshot> readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots());
239242
readSnapshots.addAll(tagManager.taggedSnapshots());
240-
readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());
243+
readSnapshots.addAll(changelogManager.safelyGetAllChangelogs());
241244
return readSnapshots;
242245
}
243246

paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.paimon.table.sink.TagCallback;
4444
import org.apache.paimon.tag.TagAutoManager;
4545
import org.apache.paimon.types.RowType;
46+
import org.apache.paimon.utils.ChangelogManager;
4647
import org.apache.paimon.utils.FileStorePathFactory;
4748
import org.apache.paimon.utils.SegmentsCache;
4849
import org.apache.paimon.utils.SnapshotManager;
@@ -79,6 +80,12 @@ public SnapshotManager snapshotManager() {
7980
return wrapped.snapshotManager();
8081
}
8182

83+
@Override
84+
public ChangelogManager changelogManager() {
85+
privilegeChecker.assertCanSelectOrInsert(identifier);
86+
return wrapped.changelogManager();
87+
}
88+
8289
@Override
8390
public RowType partitionType() {
8491
return wrapped.partitionType();

paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.paimon.table.source.StreamDataTableScan;
3838
import org.apache.paimon.table.source.snapshot.SnapshotReader;
3939
import org.apache.paimon.utils.BranchManager;
40+
import org.apache.paimon.utils.ChangelogManager;
4041
import org.apache.paimon.utils.SnapshotManager;
4142
import org.apache.paimon.utils.TagManager;
4243

@@ -65,6 +66,12 @@ public SnapshotManager snapshotManager() {
6566
return wrapped.snapshotManager();
6667
}
6768

69+
@Override
70+
public ChangelogManager changelogManager() {
71+
privilegeChecker.assertCanSelectOrInsert(identifier);
72+
return wrapped.changelogManager();
73+
}
74+
6875
@Override
6976
public OptionalLong latestSnapshotId() {
7077
privilegeChecker.assertCanSelectOrInsert(identifier);

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.paimon.tag.TagPreview;
6262
import org.apache.paimon.utils.BranchManager;
6363
import org.apache.paimon.utils.CatalogBranchManager;
64+
import org.apache.paimon.utils.ChangelogManager;
6465
import org.apache.paimon.utils.FileSystemBranchManager;
6566
import org.apache.paimon.utils.InternalRowPartitionComputer;
6667
import org.apache.paimon.utils.Preconditions;
@@ -259,6 +260,7 @@ public SnapshotReader newSnapshotReader() {
259260
tableSchema,
260261
coreOptions(),
261262
snapshotManager(),
263+
changelogManager(),
262264
splitGenerator(),
263265
nonPartitionFilterConsumer(),
264266
DefaultValueAssigner.create(tableSchema),
@@ -282,6 +284,7 @@ public StreamDataTableScan newStreamScan() {
282284
coreOptions(),
283285
newSnapshotReader(),
284286
snapshotManager(),
287+
changelogManager(),
285288
supportStreamingReadOverwrite(),
286289
DefaultValueAssigner.create(tableSchema));
287290
}
@@ -419,16 +422,27 @@ public SnapshotManager snapshotManager() {
419422
return store().snapshotManager();
420423
}
421424

425+
@Override
426+
public ChangelogManager changelogManager() {
427+
return store().changelogManager();
428+
}
429+
422430
@Override
423431
public ExpireSnapshots newExpireSnapshots() {
424432
return new ExpireSnapshotsImpl(
425-
snapshotManager(), store().newSnapshotDeletion(), store().newTagManager());
433+
snapshotManager(),
434+
changelogManager(),
435+
store().newSnapshotDeletion(),
436+
store().newTagManager());
426437
}
427438

428439
@Override
429440
public ExpireSnapshots newExpireChangelog() {
430441
return new ExpireChangelogImpl(
431-
snapshotManager(), tagManager(), store().newChangelogDeletion());
442+
snapshotManager(),
443+
changelogManager(),
444+
tagManager(),
445+
store().newChangelogDeletion());
432446
}
433447

434448
@Override
@@ -750,6 +764,7 @@ public FileStoreTable switchToBranch(String branchName) {
750764
private RollbackHelper rollbackHelper() {
751765
return new RollbackHelper(
752766
snapshotManager(),
767+
changelogManager(),
753768
tagManager(),
754769
fileIO,
755770
store().newSnapshotDeletion(),

paimon-core/src/main/java/org/apache/paimon/table/DataTable.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.table.source.DataTableScan;
2525
import org.apache.paimon.table.source.snapshot.SnapshotReader;
2626
import org.apache.paimon.utils.BranchManager;
27+
import org.apache.paimon.utils.ChangelogManager;
2728
import org.apache.paimon.utils.SnapshotManager;
2829
import org.apache.paimon.utils.TagManager;
2930

@@ -39,6 +40,8 @@ public interface DataTable extends InnerTable {
3940

4041
SnapshotManager snapshotManager();
4142

43+
ChangelogManager changelogManager();
44+
4245
SchemaManager schemaManager();
4346

4447
TagManager tagManager();

paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.paimon.table.source.StreamDataTableScan;
4141
import org.apache.paimon.table.source.snapshot.SnapshotReader;
4242
import org.apache.paimon.utils.BranchManager;
43+
import org.apache.paimon.utils.ChangelogManager;
4344
import org.apache.paimon.utils.SegmentsCache;
4445
import org.apache.paimon.utils.SimpleFileReader;
4546
import org.apache.paimon.utils.SnapshotManager;
@@ -95,6 +96,11 @@ public SnapshotManager snapshotManager() {
9596
return wrapped.snapshotManager();
9697
}
9798

99+
@Override
100+
public ChangelogManager changelogManager() {
101+
return wrapped.changelogManager();
102+
}
103+
98104
@Override
99105
public SchemaManager schemaManager() {
100106
return wrapped.schemaManager();

paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.manifest.ExpireFileEntry;
2525
import org.apache.paimon.operation.ChangelogDeletion;
2626
import org.apache.paimon.options.ExpireConfig;
27+
import org.apache.paimon.utils.ChangelogManager;
2728
import org.apache.paimon.utils.Preconditions;
2829
import org.apache.paimon.utils.SnapshotManager;
2930
import org.apache.paimon.utils.TagManager;
@@ -45,6 +46,7 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
4546
public static final Logger LOG = LoggerFactory.getLogger(ExpireChangelogImpl.class);
4647

4748
private final SnapshotManager snapshotManager;
49+
private final ChangelogManager changelogManager;
4850
private final ConsumerManager consumerManager;
4951
private final ChangelogDeletion changelogDeletion;
5052
private final TagManager tagManager;
@@ -53,9 +55,11 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
5355

5456
public ExpireChangelogImpl(
5557
SnapshotManager snapshotManager,
58+
ChangelogManager changelogManager,
5659
TagManager tagManager,
5760
ChangelogDeletion changelogDeletion) {
5861
this.snapshotManager = snapshotManager;
62+
this.changelogManager = changelogManager;
5963
this.tagManager = tagManager;
6064
this.consumerManager =
6165
new ConsumerManager(
@@ -90,11 +94,11 @@ public int expire() {
9094
return 0;
9195
}
9296

93-
Long latestChangelogId = snapshotManager.latestLongLivedChangelogId();
97+
Long latestChangelogId = changelogManager.latestLongLivedChangelogId();
9498
if (latestChangelogId == null) {
9599
return 0;
96100
}
97-
Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId();
101+
Long earliestChangelogId = changelogManager.earliestLongLivedChangelogId();
98102
if (earliestChangelogId == null) {
99103
return 0;
100104
}
@@ -123,8 +127,8 @@ public int expire() {
123127
maxExclusive = Math.min(maxExclusive, latestChangelogId);
124128

125129
for (long id = min; id <= maxExclusive; id++) {
126-
if (snapshotManager.longLivedChangelogExists(id)
127-
&& olderThanMills <= snapshotManager.longLivedChangelog(id).timeMillis()) {
130+
if (changelogManager.longLivedChangelogExists(id)
131+
&& olderThanMills <= changelogManager.longLivedChangelog(id).timeMillis()) {
128132
return expireUntil(earliestChangelogId, id);
129133
}
130134
}
@@ -140,13 +144,13 @@ public int expireUntil(long earliestId, long endExclusiveId) {
140144

141145
List<Snapshot> skippingSnapshots =
142146
findSkippingTags(taggedSnapshots, earliestId, endExclusiveId);
143-
skippingSnapshots.add(snapshotManager.changelog(endExclusiveId));
147+
skippingSnapshots.add(changelogManager.changelog(endExclusiveId));
144148
Set<String> manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots);
145149
for (long id = earliestId; id < endExclusiveId; id++) {
146150
if (LOG.isDebugEnabled()) {
147151
LOG.debug("Ready to delete changelog files from changelog #" + id);
148152
}
149-
Changelog changelog = snapshotManager.longLivedChangelog(id);
153+
Changelog changelog = changelogManager.longLivedChangelog(id);
150154
Predicate<ExpireFileEntry> skipper;
151155
try {
152156
skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
@@ -161,7 +165,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
161165

162166
changelogDeletion.cleanUnusedDataFiles(changelog, skipper);
163167
changelogDeletion.cleanUnusedManifests(changelog, manifestSkippSet);
164-
snapshotManager.fileIO().deleteQuietly(snapshotManager.longLivedChangelogPath(id));
168+
changelogManager.fileIO().deleteQuietly(changelogManager.longLivedChangelogPath(id));
165169
}
166170

167171
changelogDeletion.cleanEmptyDirectories();
@@ -171,7 +175,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
171175

172176
private void writeEarliestHintFile(long earliest) {
173177
try {
174-
snapshotManager.commitLongLivedChangelogEarliestHint(earliest);
178+
changelogManager.commitLongLivedChangelogEarliestHint(earliest);
175179
} catch (IOException e) {
176180
throw new UncheckedIOException(e);
177181
}

paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.manifest.ExpireFileEntry;
2626
import org.apache.paimon.operation.SnapshotDeletion;
2727
import org.apache.paimon.options.ExpireConfig;
28+
import org.apache.paimon.utils.ChangelogManager;
2829
import org.apache.paimon.utils.Preconditions;
2930
import org.apache.paimon.utils.SnapshotManager;
3031
import org.apache.paimon.utils.TagManager;
@@ -50,6 +51,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
5051
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsImpl.class);
5152

5253
private final SnapshotManager snapshotManager;
54+
private final ChangelogManager changelogManager;
5355
private final ConsumerManager consumerManager;
5456
private final SnapshotDeletion snapshotDeletion;
5557
private final TagManager tagManager;
@@ -58,9 +60,11 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
5860

5961
public ExpireSnapshotsImpl(
6062
SnapshotManager snapshotManager,
63+
ChangelogManager changelogManager,
6164
SnapshotDeletion snapshotDeletion,
6265
TagManager tagManager) {
6366
this.snapshotManager = snapshotManager;
67+
this.changelogManager = changelogManager;
6468
this.consumerManager =
6569
new ConsumerManager(
6670
snapshotManager.fileIO(),
@@ -135,7 +139,8 @@ public int expireUntil(long earliestId, long endExclusiveId) {
135139
// No expire happens:
136140
// write the hint file in order to see the earliest snapshot directly next time
137141
// should avoid duplicate writes when the file exists
138-
if (snapshotManager.readHint(SnapshotManager.EARLIEST) == null) {
142+
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
143+
if (earliestSnapshotId == null) {
139144
writeEarliestHint(earliestId);
140145
}
141146

@@ -261,8 +266,8 @@ public int expireUntil(long earliestId, long endExclusiveId) {
261266

262267
private void commitChangelog(Changelog changelog) {
263268
try {
264-
snapshotManager.commitChangelog(changelog, changelog.id());
265-
snapshotManager.commitLongLivedChangelogLatestHint(changelog.id());
269+
changelogManager.commitChangelog(changelog, changelog.id());
270+
changelogManager.commitLongLivedChangelogLatestHint(changelog.id());
266271
} catch (IOException e) {
267272
throw new UncheckedIOException(e);
268273
}

0 commit comments

Comments
 (0)