Skip to content

Commit 44f81bf

Browse files
naivedoggerluoyuxia
authored andcommitted
[lake] Fix TieringEnumerator will always fail to generate splits when re-create table with same name (#1244)
1 parent 21a1d89 commit 44f81bf

File tree

4 files changed

+131
-26
lines changed

4 files changed

+131
-26
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,11 @@ private ListOffsetsResult listOffsets(
377377
Collection<Integer> buckets,
378378
OffsetSpec offsetSpec) {
379379
Long partitionId = null;
380-
metadataUpdater.checkAndUpdateTableMetadata(
381-
Collections.singleton(physicalTablePath.getTablePath()));
380+
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(), null);
382381
long tableId = metadataUpdater.getTableId(physicalTablePath.getTablePath());
383382
// if partition name is not null, we need to check and update partition metadata
384383
if (physicalTablePath.getPartitionName() != null) {
385-
metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
384+
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
386385
partitionId = metadataUpdater.getPartitionIdOrElseThrow(physicalTablePath);
387386
}
388387
Map<Integer, ListOffsetsRequest> requestMap =

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@
4646
import org.apache.paimon.catalog.Catalog;
4747
import org.apache.paimon.catalog.CatalogContext;
4848
import org.apache.paimon.catalog.CatalogFactory;
49+
import org.apache.paimon.catalog.Identifier;
4950
import org.apache.paimon.options.Options;
51+
import org.apache.paimon.reader.RecordReader;
52+
import org.apache.paimon.table.FileStoreTable;
53+
import org.apache.paimon.utils.CloseableIterator;
5054
import org.junit.jupiter.api.AfterAll;
5155
import org.junit.jupiter.api.BeforeAll;
5256
import org.junit.jupiter.api.BeforeEach;
@@ -57,6 +61,7 @@
5761
import java.util.ArrayList;
5862
import java.util.Arrays;
5963
import java.util.HashMap;
64+
import java.util.Iterator;
6065
import java.util.List;
6166
import java.util.Map;
6267
import java.util.Optional;
@@ -354,6 +359,20 @@ protected long createPkTable(TablePath tablePath, int bucketNum) throws Exceptio
354359
return createTable(tablePath, table1Descriptor);
355360
}
356361

362+
protected void dropTable(TablePath tablePath) throws Exception {
363+
admin.dropTable(tablePath, false).get();
364+
Identifier tableIdentifier = toPaimonIdentifier(tablePath);
365+
try {
366+
paimonCatalog.dropTable(tableIdentifier, false);
367+
} catch (Catalog.TableNotExistException e) {
368+
// do nothing, table not exists
369+
}
370+
}
371+
372+
private Identifier toPaimonIdentifier(TablePath tablePath) {
373+
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
374+
}
375+
357376
protected void assertReplicaStatus(
358377
TablePath tablePath,
359378
long tableId,
@@ -416,4 +435,29 @@ protected void waitUtilBucketSynced(TableBucket tb) {
416435
Duration.ofMinutes(2),
417436
"bucket " + tb + "not synced");
418437
}
438+
439+
protected void checkDataInPaimonPrimayKeyTable(
440+
TablePath tablePath, List<InternalRow> expectedRows) throws Exception {
441+
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
442+
getPaimonRowCloseableIterator(tablePath);
443+
for (InternalRow expectedRow : expectedRows) {
444+
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
445+
assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
446+
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
447+
}
448+
}
449+
450+
protected CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowCloseableIterator(
451+
TablePath tablePath) throws Exception {
452+
Identifier tableIdentifier =
453+
Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
454+
455+
paimonCatalog = getPaimonCatalog();
456+
457+
FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(tableIdentifier);
458+
459+
RecordReader<org.apache.paimon.data.InternalRow> reader =
460+
table.newRead().createReader(table.newReadBuilder().newScan().plan());
461+
return reader.toCloseableIterator();
462+
}
419463
}

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -189,17 +189,6 @@ private void checkDataInPaimonAppendOnlyTable(
189189
assertThat(flussRowIterator.hasNext()).isFalse();
190190
}
191191

192-
private void checkDataInPaimonPrimayKeyTable(
193-
TablePath tablePath, List<InternalRow> expectedRows) throws Exception {
194-
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
195-
getPaimonRowCloseableIterator(tablePath);
196-
for (InternalRow expectedRow : expectedRows) {
197-
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
198-
assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
199-
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
200-
}
201-
}
202-
203192
private void checkDataInPaimonAppendOnlyPartitionedTable(
204193
TablePath tablePath,
205194
Map<String, String> partitionSpec,
@@ -221,18 +210,6 @@ private void checkDataInPaimonAppendOnlyPartitionedTable(
221210
assertThat(flussRowIterator.hasNext()).isFalse();
222211
}
223212

224-
private CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowCloseableIterator(
225-
TablePath tablePath) throws Exception {
226-
Identifier tableIdentifier =
227-
Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
228-
229-
FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(tableIdentifier);
230-
231-
RecordReader<org.apache.paimon.data.InternalRow> reader =
232-
table.newRead().createReader(table.newReadBuilder().newScan().plan());
233-
return reader.toCloseableIterator();
234-
}
235-
236213
private CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowCloseableIterator(
237214
TablePath tablePath, Map<String, String> partitionSpec) throws Exception {
238215
Identifier tableIdentifier =
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.paimon.tiering;
19+
20+
import com.alibaba.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
21+
import com.alibaba.fluss.metadata.TableBucket;
22+
import com.alibaba.fluss.metadata.TablePath;
23+
import com.alibaba.fluss.row.InternalRow;
24+
25+
import org.apache.flink.core.execution.JobClient;
26+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.Arrays;
31+
import java.util.List;
32+
33+
import static com.alibaba.fluss.testutils.DataTestUtils.row;
34+
35+
/** A Test case for dropping a pktable after tiering and creating one with the same tablePath. */
36+
class ReCreateSameTableAfterTieringTest extends FlinkPaimonTieringTestBase {
37+
protected static final String DEFAULT_DB = "fluss";
38+
39+
private static StreamExecutionEnvironment execEnv;
40+
41+
@BeforeAll
42+
protected static void beforeAll() {
43+
FlinkPaimonTieringTestBase.beforeAll();
44+
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
45+
execEnv.setParallelism(2);
46+
execEnv.enableCheckpointing(1000);
47+
}
48+
49+
@Test
50+
void testReCreateSameTable() throws Exception {
51+
// create a pk table, write some records and wait until snapshot finished
52+
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable_drop");
53+
long t1Id = createPkTable(t1);
54+
TableBucket t1Bucket = new TableBucket(t1Id, 0);
55+
// write records
56+
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
57+
writeRows(t1, rows, false);
58+
waitUntilSnapshot(t1Id, 1, 0);
59+
// then start tiering job
60+
JobClient jobClient = buildTieringJob(execEnv);
61+
62+
// check the status of replica after synced
63+
assertReplicaStatus(t1Bucket, 3);
64+
// check data in paimon
65+
checkDataInPaimonPrimayKeyTable(t1, rows);
66+
67+
// then drop the table
68+
dropTable(t1);
69+
// and create a new table with the same table path
70+
long t2Id = createPkTable(t1);
71+
TableBucket t2Bucket = new TableBucket(t2Id, 0);
72+
// write some new records
73+
List<InternalRow> newRows = Arrays.asList(row(4, "v4"), row(5, "v5"));
74+
writeRows(t1, newRows, false);
75+
// new table, so the snapshot id should be 0
76+
waitUntilSnapshot(t2Id, 1, 0);
77+
// check the status of replica after synced
78+
assertReplicaStatus(t2Bucket, 2);
79+
// check data in paimon
80+
checkDataInPaimonPrimayKeyTable(t1, newRows);
81+
82+
// stop the tiering job
83+
jobClient.cancel().get();
84+
}
85+
}

0 commit comments

Comments
 (0)