Skip to content

Commit 5b9999a

Browse files
committed
[spark] Make RowIdIndexFieldsExtractor Serializable
1 parent 1072934 commit 5b9999a

File tree

3 files changed

+36
-10
lines changed

3 files changed

+36
-10
lines changed

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
/** Default global index builder. */
4646
public class DefaultGlobalIndexBuilder implements Serializable {
4747

48+
private static final long serialVersionUID = 1L;
49+
4850
private final FileStoreTable table;
4951
private final BinaryRow partition;
5052
private final RowType readType;

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,59 @@
2222
import org.apache.paimon.codegen.Projection;
2323
import org.apache.paimon.data.BinaryRow;
2424
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.data.InternalRow.FieldGetter;
2526
import org.apache.paimon.table.SpecialFields;
2627
import org.apache.paimon.types.RowType;
2728

2829
import javax.annotation.Nullable;
2930

31+
import java.io.Serializable;
3032
import java.util.List;
3133

3234
/** The extractor to get partition, index field and row id from records. */
33-
public class RowIdIndexFieldsExtractor {
35+
public class RowIdIndexFieldsExtractor implements Serializable {
36+
37+
private static final long serialVersionUID = 1L;
3438

35-
private final Projection partitionProjection;
36-
private final InternalRow.FieldGetter indexFieldGetter;
3739
private final int rowIdPos;
40+
private final RowType readType;
41+
private final List<String> partitionKeys;
42+
private final String indexField;
43+
44+
private transient Projection lazyPartitionProjection;
45+
private transient FieldGetter lazyIndexFieldGetter;
3846

3947
public RowIdIndexFieldsExtractor(
4048
RowType readType, List<String> partitionKeys, String indexField) {
41-
this.partitionProjection = CodeGenUtils.newProjection(readType, partitionKeys);
42-
int indexFieldPos = readType.getFieldIndex(indexField);
43-
this.indexFieldGetter =
44-
InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos);
49+
this.readType = readType;
50+
this.partitionKeys = partitionKeys;
51+
this.indexField = indexField;
4552
this.rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
4653
}
4754

55+
private Projection partitionProjection() {
56+
if (lazyPartitionProjection == null) {
57+
lazyPartitionProjection = CodeGenUtils.newProjection(readType, partitionKeys);
58+
}
59+
return lazyPartitionProjection;
60+
}
61+
62+
private FieldGetter indexFieldGetter() {
63+
if (lazyIndexFieldGetter == null) {
64+
int indexFieldPos = readType.getFieldIndex(indexField);
65+
lazyIndexFieldGetter =
66+
InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos);
67+
}
68+
return lazyIndexFieldGetter;
69+
}
70+
4871
public BinaryRow extractPartition(InternalRow record) {
49-
// projection will reuse returning record, copy is necessary
50-
return partitionProjection.apply(record).copy();
72+
return partitionProjection().apply(record).copy();
5173
}
5274

5375
@Nullable
5476
public Object extractIndexField(InternalRow record) {
55-
return indexFieldGetter.getFieldOrNull(record);
77+
return indexFieldGetter().getFieldOrNull(record);
5678
}
5779

5880
public Long extractRowId(InternalRow record) {

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
*/
5555
public class BTreeGlobalIndexBuilder implements Serializable {
5656

57+
private static final long serialVersionUID = 1L;
58+
5759
private static final double FLOATING = 1.2;
5860

5961
private final FileStoreTable table;

0 commit comments

Comments
 (0)