Skip to content

Commit d6722e7

Browse files
committed
instantiate converters once
1 parent ad646e8 commit d6722e7

File tree

4 files changed

+33
-14
lines changed

4 files changed

+33
-14
lines changed

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,6 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
188188
BigDecimal bd = (BigDecimal) v;
189189
BigDecimal scaled = bd.setScale(scale, RoundingMode.HALF_UP);
190190

191-
// Validate precision after scaling; precision is the number of digits in the unscaled
192-
// value.
193191
if (scaled.precision() > precision) {
194192
throw new IllegalArgumentException(
195193
String.format(

fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ final class PojoType<T> {
3737
private final Class<T> pojoClass;
3838
private final Constructor<T> defaultConstructor;
3939
private final Map<String, Property> properties; // property name -> property
40-
// Mapping of primitive types to their boxed counterparts to avoid long if-chains
4140
private static final Map<Class<?>, Class<?>> PRIMITIVE_TO_BOXED = createPrimitiveToBoxedMap();
4241

4342
private PojoType(Class<T> pojoClass, Constructor<T> ctor, Map<String, Property> props) {

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ class TypedAppendWriter<T> implements AppendWriter<T> {
3333
private final AppendWriterImpl delegate;
3434
private final Class<T> pojoClass;
3535
private final RowType tableSchema;
36+
private final PojoToRowConverter<T> pojoToRowConverter;
3637

3738
TypedAppendWriter(AppendWriterImpl delegate, Class<T> pojoClass, TableInfo tableInfo) {
3839
this.delegate = delegate;
3940
this.pojoClass = pojoClass;
4041
this.tableSchema = tableInfo.getRowType();
42+
this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, tableSchema, tableSchema);
4143
}
4244

4345
@Override
@@ -50,10 +52,7 @@ public CompletableFuture<AppendResult> append(T record) {
5052
if (record instanceof InternalRow) {
5153
return delegate.append((InternalRow) record);
5254
}
53-
// TODO: initialize this on the constructor and reuse
54-
PojoToRowConverter<T> converter =
55-
PojoToRowConverter.of(pojoClass, tableSchema, tableSchema);
56-
InternalRow row = converter.toRow(record);
55+
InternalRow row = pojoToRowConverter.toRow(record);
5756
return delegate.append(row);
5857
}
5958
}

fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.fluss.row.InternalRow;
2424
import org.apache.fluss.types.RowType;
2525

26+
import javax.annotation.Nullable;
27+
2628
import java.util.concurrent.CompletableFuture;
2729

2830
/**
@@ -42,6 +44,13 @@ public void flush() {
4244
private final RowType tableSchema;
4345
private final int[] targetColumns; // may be null
4446

47+
private final RowType pkProjection;
48+
@Nullable private final RowType targetProjection;
49+
50+
private final PojoToRowConverter<T> pojoToRowConverter;
51+
private final PojoToRowConverter<T> pkConverter;
52+
@Nullable private final PojoToRowConverter<T> targetConverter;
53+
4554
TypedUpsertWriter(
4655
UpsertWriterImpl delegate,
4756
Class<T> pojoClass,
@@ -52,6 +61,19 @@ public void flush() {
5261
this.tableInfo = tableInfo;
5362
this.tableSchema = tableInfo.getRowType();
5463
this.targetColumns = targetColumns;
64+
65+
// Precompute projections
66+
this.pkProjection = this.tableSchema.project(tableInfo.getPhysicalPrimaryKeys());
67+
this.targetProjection =
68+
(targetColumns == null) ? null : this.tableSchema.project(targetColumns);
69+
70+
// Initialize reusable converters
71+
this.pojoToRowConverter = PojoToRowConverter.of(pojoClass, tableSchema, tableSchema);
72+
this.pkConverter = PojoToRowConverter.of(pojoClass, tableSchema, pkProjection);
73+
this.targetConverter =
74+
(targetProjection == null)
75+
? null
76+
: PojoToRowConverter.of(pojoClass, tableSchema, targetProjection);
5577
}
5678

5779
@Override
@@ -73,18 +95,19 @@ public CompletableFuture<DeleteResult> delete(T record) {
7395
}
7496

7597
private InternalRow convertPojo(T pojo, boolean forDelete) {
76-
RowType projection;
98+
final RowType projection;
99+
final PojoToRowConverter<T> converter;
77100
if (forDelete) {
78-
// for delete we only need primary key columns
79-
projection = tableSchema.project(tableInfo.getPhysicalPrimaryKeys());
80-
} else if (targetColumns != null) {
81-
projection = tableSchema.project(targetColumns);
101+
projection = pkProjection;
102+
converter = pkConverter;
103+
} else if (targetProjection != null && targetConverter != null) {
104+
projection = targetProjection;
105+
converter = targetConverter;
82106
} else {
83107
projection = tableSchema;
108+
converter = pojoToRowConverter;
84109
}
85110

86-
// TODO: initialize this on the constructor and reuse
87-
PojoToRowConverter<T> converter = PojoToRowConverter.of(pojoClass, tableSchema, projection);
88111
GenericRow projected = converter.toRow(pojo);
89112
if (projection == tableSchema) {
90113
return projected;

0 commit comments

Comments
 (0)