2323import org .apache .fluss .row .InternalRow ;
2424import org .apache .fluss .types .RowType ;
2525
26+ import javax .annotation .Nullable ;
27+
2628import java .util .concurrent .CompletableFuture ;
2729
2830/**
@@ -42,6 +44,13 @@ public void flush() {
4244 private final RowType tableSchema ;
4345 private final int [] targetColumns ; // may be null
4446
47+ private final RowType pkProjection ;
48+ @ Nullable private final RowType targetProjection ;
49+
50+ private final PojoToRowConverter <T > pojoToRowConverter ;
51+ private final PojoToRowConverter <T > pkConverter ;
52+ @ Nullable private final PojoToRowConverter <T > targetConverter ;
53+
4554 TypedUpsertWriter (
4655 UpsertWriterImpl delegate ,
4756 Class <T > pojoClass ,
@@ -52,6 +61,19 @@ public void flush() {
5261 this .tableInfo = tableInfo ;
5362 this .tableSchema = tableInfo .getRowType ();
5463 this .targetColumns = targetColumns ;
64+
65+ // Precompute projections
66+ this .pkProjection = this .tableSchema .project (tableInfo .getPhysicalPrimaryKeys ());
67+ this .targetProjection =
68+ (targetColumns == null ) ? null : this .tableSchema .project (targetColumns );
69+
70+ // Initialize reusable converters
71+ this .pojoToRowConverter = PojoToRowConverter .of (pojoClass , tableSchema , tableSchema );
72+ this .pkConverter = PojoToRowConverter .of (pojoClass , tableSchema , pkProjection );
73+ this .targetConverter =
74+ (targetProjection == null )
75+ ? null
76+ : PojoToRowConverter .of (pojoClass , tableSchema , targetProjection );
5577 }
5678
5779 @ Override
@@ -73,18 +95,19 @@ public CompletableFuture<DeleteResult> delete(T record) {
7395 }
7496
7597 private InternalRow convertPojo (T pojo , boolean forDelete ) {
76- RowType projection ;
98+ final RowType projection ;
99+ final PojoToRowConverter <T > converter ;
77100 if (forDelete ) {
78- // for delete we only need primary key columns
79- projection = tableSchema .project (tableInfo .getPhysicalPrimaryKeys ());
80- } else if (targetColumns != null ) {
81- projection = tableSchema .project (targetColumns );
101+ projection = pkProjection ;
102+ converter = pkConverter ;
103+ } else if (targetProjection != null && targetConverter != null ) {
104+ projection = targetProjection ;
105+ converter = targetConverter ;
82106 } else {
83107 projection = tableSchema ;
108+ converter = pojoToRowConverter ;
84109 }
85110
86- // TODO: initialize this on the constructor and reuse
87- PojoToRowConverter <T > converter = PojoToRowConverter .of (pojoClass , tableSchema , projection );
88111 GenericRow projected = converter .toRow (pojo );
89112 if (projection == tableSchema ) {
90113 return projected ;
0 commit comments