Skip to content

Commit 3c7fded

Browse files
committed
fix
1 parent 341c32a commit 3c7fded

File tree

1 file changed

+4
-6
lines changed

1 file changed

+4
-6
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020

2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.schema.TableSchema;
23-
import org.apache.paimon.table.sink.AppendTableRowKeyExtractor;
2423
import org.apache.paimon.table.sink.ChannelComputer;
25-
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
24+
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
2625

2726
/** This is only for partitioned unaware-buckets Append only table. */
2827
public class RowDataHashPartitionChannelComputer implements ChannelComputer<InternalRow> {
@@ -32,7 +31,7 @@ public class RowDataHashPartitionChannelComputer implements ChannelComputer<Inte
3231
private final TableSchema schema;
3332

3433
private transient int numChannels;
35-
private transient KeyAndBucketExtractor<InternalRow> extractor;
34+
private transient RowPartitionKeyExtractor extractor;
3635

3736
public RowDataHashPartitionChannelComputer(TableSchema schema) {
3837
this.schema = schema;
@@ -41,13 +40,12 @@ public RowDataHashPartitionChannelComputer(TableSchema schema) {
4140
@Override
4241
public void setup(int numChannels) {
4342
this.numChannels = numChannels;
44-
this.extractor = new AppendTableRowKeyExtractor(schema);
43+
this.extractor = new RowPartitionKeyExtractor(schema);
4544
}
4645

4746
@Override
4847
public int channel(InternalRow record) {
49-
extractor.setRecord(record);
50-
return ChannelComputer.select(extractor.partition(), 0, numChannels);
48+
return ChannelComputer.select(extractor.partition(record), 0, numChannels);
5149
}
5250

5351
@Override

0 commit comments

Comments
 (0)