Skip to content

Commit 14059bc

Browse files
committed
[core] Remote SST should work for upgrade files
1 parent 985b733 commit 14059bc

File tree

2 files changed

+34
-13
lines changed

2 files changed

+34
-13
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,10 @@ private CompactResult rewriteOrProduceChangelog(
184184

185185
if (rewriteCompactFile) {
186186
notifyRewriteCompactBefore(before);
187-
after = notifyRewriteCompactAfter(after);
188187
}
189188

189+
after = notifyRewriteCompactAfter(after);
190+
190191
List<DataFileMeta> changelogFiles =
191192
changelogFileWriter != null
192193
? changelogFileWriter.result()

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupRemoteFileTableTest.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,13 @@ private void innerTestRemoteFile(boolean schemaEvolution, boolean notCompatible)
120120
ReadBuilder readBuilder = table.newReadBuilder();
121121
List<Split> splits = readBuilder.newScan().plan().splits();
122122
assertThat(splits).hasSize(1);
123-
DataSplit firstSplit = (DataSplit) splits.get(0);
124-
DataFileMeta firstFile = firstSplit.dataFiles().get(0);
125-
List<String> extraFiles = firstFile.extraFiles();
126-
String extraFile = extraFiles.get(0);
127-
// data-410685c7-4cc2-47d7-9dec-393f6cfe9d64-0.parquet.115.position.v1.lookup
128-
assertThat(extraFile).endsWith(".position.v1.lookup");
129-
long lookupFileSize =
130-
fileIO.getFileSize(
131-
new Path(new Path(tempPath.toUri()), "default.db/t/bucket-0/" + extraFile));
132-
String[] split = extraFile.split("\\.");
133-
assertThat(split[split.length - 4]).isEqualTo(String.valueOf(lookupFileSize));
123+
allShouldHaveRemoteSst(splits);
134124

135125
// third write with lookup but no data file
136126

137127
// delete file first
128+
DataSplit firstSplit = (DataSplit) splits.get(0);
129+
DataFileMeta firstFile = firstSplit.dataFiles().get(0);
138130
Path firstPath =
139131
table.store()
140132
.pathFactory()
@@ -174,9 +166,12 @@ private void innerTestRemoteFile(boolean schemaEvolution, boolean notCompatible)
174166
}
175167
}
176168

169+
// check remote lookup files
170+
splits = readBuilder.newScan().plan().splits();
171+
allShouldHaveRemoteSst(splits);
172+
177173
// restore file and check reading
178174
fileIO.copyFile(tmpPath, firstPath, false);
179-
splits = readBuilder.newScan().plan().splits();
180175
List<GenericRow> result = new ArrayList<>();
181176
readBuilder
182177
.newRead()
@@ -191,6 +186,31 @@ private void innerTestRemoteFile(boolean schemaEvolution, boolean notCompatible)
191186
GenericRow.of(5, 1));
192187
}
193188

189+
private void allShouldHaveRemoteSst(List<Split> check) {
190+
for (Split split : check) {
191+
for (DataFileMeta file : ((DataSplit) split).dataFiles()) {
192+
List<String> extraFiles = file.extraFiles();
193+
String extraFile = extraFiles.get(0);
194+
// data-410685c7-4cc2-47d7-9dec-393f6cfe9d64-0.parquet.115.position.v1.lookup
195+
assertThat(extraFile).endsWith(".position.v1.lookup");
196+
long lookupFileSize;
197+
try {
198+
lookupFileSize =
199+
LocalFileIO.create()
200+
.getFileSize(
201+
new Path(
202+
new Path(tempPath.toUri()),
203+
"default.db/t/bucket-0/" + extraFile));
204+
} catch (IOException e) {
205+
throw new RuntimeException(e);
206+
}
207+
String[] extraFileSplit = extraFile.split("\\.");
208+
assertThat(extraFileSplit[extraFileSplit.length - 4])
209+
.isEqualTo(String.valueOf(lookupFileSize));
210+
}
211+
}
212+
}
213+
194214
@Test
195215
public void testRemoteFileLevelThreshold() throws Exception {
196216
Options options = new Options();

0 commit comments

Comments
 (0)