Skip to content

Commit 6f39876

Browse files
committed
address yuxia's comments
1 parent a313d4f commit 6f39876

File tree

2 files changed

+18
-43
lines changed

2 files changed

+18
-43
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
* <p>The enumerator is responsible for:
8585
*
8686
* <ul>
87-
* <li>Get the all splits(lake split + snapshot split + log split) for a table of Fluss to be
87+
* <li>Get the all splits(lake split + kv snapshot split + log split) for a table of Fluss to be
8888
* read.
8989
* <li>Assign the splits to readers with the guarantee that the splits belong to the same bucket
9090
* will be assigned to same reader.

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -772,12 +772,17 @@ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws Exception {
772772
// first of all, start tiering
773773
JobClient jobClient = buildTieringJob(execEnv);
774774

775-
String tableName = "expired_partition_pkTable";
775+
String tableName = "expired_partition_pk_table";
776776
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
777+
// create table and write data
777778
Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
778-
// create table & write initial data
779+
Function<String, List<InternalRow>> rowGenerator =
780+
(partition) ->
781+
Arrays.asList(
782+
row(3, "string", partition), row(30, "another_string", partition));
779783
long tableId =
780-
preparePKTableFullType(tablePath, DEFAULT_BUCKET_NUM, true, bucketLogEndOffset);
784+
prepareSimplePKTable(
785+
tablePath, DEFAULT_BUCKET_NUM, true, rowGenerator, bucketLogEndOffset);
781786

782787
// wait until records has been synced
783788
waitUntilBucketSynced(tablePath, tableId, DEFAULT_BUCKET_NUM, true);
@@ -787,62 +792,31 @@ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws Exception {
787792
assertThat(partitionNameByIds.size()).isGreaterThan(0);
788793

789794
// Build expected rows for all partitions
795+
// Simple PK table has 3 columns: c1 (INT), c2 (STRING), c3 (STRING) where c3 is partition
790796
List<Row> expectedAllRows = new ArrayList<>();
791797
for (String partition : partitionNameByIds.values()) {
792-
expectedAllRows.add(
793-
Row.of(
794-
false,
795-
(byte) 1,
796-
(short) 2,
797-
3,
798-
4L,
799-
5.1f,
800-
6.0d,
801-
"string",
802-
Decimal.fromUnscaledLong(9, 5, 2),
803-
Decimal.fromBigDecimal(new java.math.BigDecimal(10), 20, 0),
804-
TimestampLtz.fromEpochMillis(1698235273182L),
805-
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
806-
TimestampNtz.fromMillis(1698235273183L),
807-
TimestampNtz.fromMillis(1698235273183L, 6000),
808-
new byte[] {1, 2, 3, 4},
809-
partition));
810-
expectedAllRows.add(
811-
Row.of(
812-
true,
813-
(byte) 10,
814-
(short) 20,
815-
30,
816-
40L,
817-
50.1f,
818-
60.0d,
819-
"another_string",
820-
Decimal.fromUnscaledLong(90, 5, 2),
821-
Decimal.fromBigDecimal(new java.math.BigDecimal(100), 20, 0),
822-
TimestampLtz.fromEpochMillis(1698235273200L),
823-
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
824-
TimestampNtz.fromMillis(1698235273201L),
825-
TimestampNtz.fromMillis(1698235273201L, 6000),
826-
new byte[] {1, 2, 3, 4},
827-
partition));
798+
expectedAllRows.add(Row.of(3, "string", partition));
799+
expectedAllRows.add(Row.of(30, "another_string", partition));
828800
}
829801

830802
// Select one partition to drop (expire in Fluss)
831803
Long partitionToDropId = partitionNameByIds.keySet().iterator().next();
832804
String partitionToDropName = partitionNameByIds.get(partitionToDropId);
833805

834806
// Filter rows that belong to the partition to be dropped
807+
// c3 is the partition column (index 2)
835808
List<Row> rowsInExpiredPartition =
836809
expectedAllRows.stream()
837-
.filter(row -> partitionToDropName.equals(row.getField(15)))
810+
.filter(row -> partitionToDropName.equals(row.getField(2)))
838811
.collect(Collectors.toList());
839812
assertThat(rowsInExpiredPartition).isNotEmpty();
840813

841814
// Now drop the partition in Fluss (make it expired)
842815
// The partition data still exists in Paimon
816+
// c3 is the partition column for simple PK table
843817
admin.dropPartition(
844818
tablePath,
845-
new PartitionSpec(Collections.singletonMap("c16", partitionToDropName)),
819+
new PartitionSpec(Collections.singletonMap("c3", partitionToDropName)),
846820
false)
847821
.get();
848822

@@ -871,12 +845,13 @@ void testUnionReadPartitionsExistInPaimonButExpiredInFluss() throws Exception {
871845
expectedAllRows.stream().map(Row::toString).collect(Collectors.toList()));
872846

873847
// Test partition filter - query only the expired partition
848+
// c3 is the partition column for simple PK table
874849
String sqlWithPartitionFilter =
875850
"select"
876851
+ " /*+ OPTIONS('scan.partition.discovery.interval'='100ms') */"
877852
+ " * FROM "
878853
+ tableName
879-
+ " WHERE c16 = '"
854+
+ " WHERE c3 = '"
880855
+ partitionToDropName
881856
+ "'";
882857
iterator = streamTEnv.executeSql(sqlWithPartitionFilter).collect();

0 commit comments

Comments
 (0)