Skip to content

Commit 5f61f47

Browse files
authored
[paimon] Avoid cast to KeyValueFileStore to get keyComparator in paimon (apache#1649)
1 parent 5c8f781 commit 5f61f47

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@
2020
import org.apache.fluss.lake.source.SortedRecordReader;
2121
import org.apache.fluss.row.InternalRow;
2222

23-
import org.apache.paimon.KeyValueFileStore;
2423
import org.apache.paimon.predicate.Predicate;
2524
import org.apache.paimon.table.FileStoreTable;
25+
import org.apache.paimon.table.PrimaryKeyTableUtils;
2626
import org.apache.paimon.types.RowType;
27+
import org.apache.paimon.utils.KeyComparatorSupplier;
2728

2829
import javax.annotation.Nullable;
2930

@@ -42,10 +43,13 @@ public PaimonSortedRecordReader(
4243
@Nullable Predicate predicate)
4344
throws IOException {
4445
super(fileStoreTable, split, project, predicate);
46+
RowType pkKeyType =
47+
new RowType(
48+
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR.keyFields(
49+
fileStoreTable.schema()));
50+
4551
this.comparator =
46-
toFlussRowComparator(
47-
paimonRowType,
48-
((KeyValueFileStore) fileStoreTable.store()).newKeyComparator());
52+
toFlussRowComparator(paimonRowType, new KeyComparatorSupplier(pkKeyType).get());
4953
}
5054

5155
@Override

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.apache.fluss.metadata.TablePath;
2424
import org.apache.fluss.utils.CloseableIterator;
2525

26+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27+
import org.apache.flink.table.api.EnvironmentSettings;
28+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2629
import org.apache.flink.types.Row;
2730
import org.apache.paimon.catalog.Catalog;
2831
import org.apache.paimon.catalog.CatalogContext;
@@ -60,10 +63,27 @@ protected static void beforeAll() {
6063
Configuration configuration = new Configuration();
6164
configuration.setString("type", "paimon");
6265
configuration.setString("warehouse", tempWarehouseDir.toString());
66+
configuration.setString("user", "root");
67+
configuration.setString("password", "root-password");
6368
lakeStorage = new PaimonLakeStorage(configuration);
6469
paimonCatalog =
6570
CatalogFactory.createCatalog(
6671
CatalogContext.create(Options.fromMap(configuration.toMap())));
72+
initPaimonPrivilege();
73+
}
74+
75+
// Test for paimon privilege table
76+
public static void initPaimonPrivilege() {
77+
StreamTableEnvironment streamTEnv =
78+
StreamTableEnvironment.create(
79+
StreamExecutionEnvironment.getExecutionEnvironment(),
80+
EnvironmentSettings.inStreamingMode());
81+
streamTEnv.executeSql(
82+
String.format(
83+
"create catalog %s with ('type'='paimon', 'warehouse' = '%s')",
84+
"paimon_catalog", tempWarehouseDir));
85+
streamTEnv.executeSql(
86+
"CALL paimon_catalog.sys.init_file_based_privilege('root-password');");
6787
}
6888

6989
public void createTable(TablePath tablePath, Schema schema) throws Exception {

0 commit comments

Comments
 (0)