Skip to content

Commit 94d007b

Browse files
improve performance
1 parent 6eadbf4 commit 94d007b

File tree

7 files changed

+41
-44
lines changed

7 files changed

+41
-44
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/AggregateRowMerger.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
package org.apache.fluss.server.kv.rowmerger;
1919

20-
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
21-
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
22-
* additional information regarding copyright ownership. */
23-
2420
import org.apache.fluss.config.TableConfig;
2521
import org.apache.fluss.metadata.DeleteBehavior;
2622
import org.apache.fluss.metadata.KvFormat;
@@ -31,7 +27,6 @@
3127
import org.apache.fluss.row.BinaryRow;
3228
import org.apache.fluss.row.GenericRow;
3329
import org.apache.fluss.row.InternalRow;
34-
import org.apache.fluss.row.encode.RowEncoder;
3530
import org.apache.fluss.server.kv.rowmerger.aggregate.AggregationContext;
3631
import org.apache.fluss.server.kv.rowmerger.aggregate.AggregationContextCache;
3732
import org.apache.fluss.server.kv.rowmerger.aggregate.FieldAggregator;
@@ -94,40 +89,29 @@ private void aggregateFields(
9489
BinaryRow newRow,
9590
AggregationContext newContext,
9691
AggregationContext oldContext) {
97-
for (int i = 0; i < newContext.getFieldCount(); i++) {
98-
FieldAggregator aggregator = newContext.getAggregators()[i];
92+
// Cache loop-invariant values and array references for better performance
93+
int newFieldCount = newContext.getFieldCount();
94+
int oldFieldCount = oldContext.getFieldCount();
95+
FieldAggregator[] newAggregators = newContext.getAggregators();
96+
InternalRow.FieldGetter[] newFieldGetters = newContext.getFieldGetters();
97+
InternalRow.FieldGetter[] oldFieldGetters = oldContext.getFieldGetters();
98+
99+
for (int i = 0; i < newFieldCount; i++) {
100+
FieldAggregator aggregator = newAggregators[i];
99101

100102
// Get accumulator value from oldRow (null if field doesn't exist in old schema)
101103
Object accumulator =
102-
(i < oldContext.getFieldCount())
103-
? oldContext.getFieldGetters()[i].getFieldOrNull(oldRow)
104-
: null;
104+
(i < oldFieldCount) ? oldFieldGetters[i].getFieldOrNull(oldRow) : null;
105105

106106
// Get input value from newRow (null if field doesn't exist in new schema)
107-
Object inputField = newContext.getFieldGetters()[i].getFieldOrNull(newRow);
107+
Object inputField = newFieldGetters[i].getFieldOrNull(newRow);
108108

109109
// Apply aggregate function and store result
110110
Object mergedField = aggregator.agg(accumulator, inputField);
111111
accumulatorRow.setField(i, mergedField);
112112
}
113113
}
114114

115-
/**
116-
* Encode a GenericRow to BinaryRow using the new schema encoder.
117-
*
118-
* @param accumulatorRow the GenericRow to encode
119-
* @param newContext context for the new schema
120-
* @return the encoded BinaryRow
121-
*/
122-
private BinaryRow encodeRow(GenericRow accumulatorRow, AggregationContext newContext) {
123-
RowEncoder encoder = newContext.getRowEncoder();
124-
encoder.startNewRow();
125-
for (int i = 0; i < newContext.getFieldCount(); i++) {
126-
encoder.encodeField(i, accumulatorRow.getField(i));
127-
}
128-
return encoder.finishRow();
129-
}
130-
131115
@Override
132116
public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
133117
// First write: no existing row
@@ -158,7 +142,7 @@ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
158142
aggregateFields(accumulatorRow, oldValue.row, newValue.row, newContext, oldContext);
159143

160144
// Encode aggregated row to BinaryRow
161-
BinaryRow mergedRow = encodeRow(accumulatorRow, newContext);
145+
BinaryRow mergedRow = newContext.encodeRow(accumulatorRow);
162146

163147
return new BinaryValue(newValue.schemaId, mergedRow);
164148
}
@@ -346,12 +330,7 @@ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
346330
mergeRows(accumulatorRow, oldValue.row, newValue.row, oldContext, newContext);
347331

348332
// Encode and return merged row using new schema encoder
349-
RowEncoder encoder = newContext.getRowEncoder();
350-
encoder.startNewRow();
351-
for (int i = 0; i < newFieldCount; i++) {
352-
encoder.encodeField(i, accumulatorRow.getField(i));
353-
}
354-
BinaryRow mergedRow = encoder.finishRow();
333+
BinaryRow mergedRow = newContext.encodeRow(accumulatorRow);
355334

356335
return new BinaryValue(newValue.schemaId, mergedRow);
357336
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContext.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.fluss.metadata.AggFunction;
2222
import org.apache.fluss.metadata.KvFormat;
2323
import org.apache.fluss.metadata.Schema;
24+
import org.apache.fluss.row.BinaryRow;
25+
import org.apache.fluss.row.GenericRow;
2426
import org.apache.fluss.row.InternalRow;
2527
import org.apache.fluss.row.encode.RowEncoder;
2628
import org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory;
@@ -75,6 +77,23 @@ public int getFieldCount() {
7577
return fieldCount;
7678
}
7779

80+
/**
81+
* Encode a GenericRow to BinaryRow using this context's encoder.
82+
*
83+
* <p>This method is optimized to reuse the encoder from the context and minimize overhead
84+
* during row encoding.
85+
*
86+
* @param accumulatorRow the GenericRow to encode
87+
* @return the encoded BinaryRow
88+
*/
89+
public BinaryRow encodeRow(GenericRow accumulatorRow) {
90+
rowEncoder.startNewRow();
91+
for (int i = 0; i < fieldCount; i++) {
92+
rowEncoder.encodeField(i, accumulatorRow.getField(i));
93+
}
94+
return rowEncoder.finishRow();
95+
}
96+
7897
/**
7998
* Create an aggregation context for a given schema.
8099
*

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* additional information regarding copyright ownership. */
2424

2525
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.types.DataTypeRoot;
2627

2728
import java.io.Serializable;
2829

@@ -32,11 +33,13 @@ public abstract class FieldAggregator implements Serializable {
3233
private static final long serialVersionUID = 1L;
3334

3435
protected final DataType fieldType;
36+
protected final DataTypeRoot typeRoot;
3537
protected final String name;
3638

3739
public FieldAggregator(String name, DataType dataType) {
3840
this.name = name;
3941
this.fieldType = dataType;
42+
this.typeRoot = dataType.getTypeRoot();
4043
}
4144

4245
/**

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldMaxAgg.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
* additional information regarding copyright ownership. */
2424

2525
import org.apache.fluss.types.DataType;
26-
import org.apache.fluss.types.DataTypeRoot;
2726
import org.apache.fluss.utils.InternalRowUtils;
2827

2928
/** Max aggregator - keeps the maximum value. */
@@ -40,8 +39,7 @@ public Object agg(Object accumulator, Object inputField) {
4039
if (accumulator == null || inputField == null) {
4140
return accumulator == null ? inputField : accumulator;
4241
}
43-
DataTypeRoot type = fieldType.getTypeRoot();
44-
return InternalRowUtils.compare(accumulator, inputField, type) < 0
42+
return InternalRowUtils.compare(accumulator, inputField, typeRoot) < 0
4543
? inputField
4644
: accumulator;
4745
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldMinAgg.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
* additional information regarding copyright ownership. */
2424

2525
import org.apache.fluss.types.DataType;
26-
import org.apache.fluss.types.DataTypeRoot;
2726
import org.apache.fluss.utils.InternalRowUtils;
2827

2928
/** Min aggregator - keeps the minimum value. */
@@ -41,8 +40,7 @@ public Object agg(Object accumulator, Object inputField) {
4140
return accumulator == null ? inputField : accumulator;
4241
}
4342

44-
DataTypeRoot type = fieldType.getTypeRoot();
45-
return InternalRowUtils.compare(accumulator, inputField, type) < 0
43+
return InternalRowUtils.compare(accumulator, inputField, typeRoot) < 0
4644
? accumulator
4745
: inputField;
4846
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldProductAgg.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public Object agg(Object accumulator, Object inputField) {
4747
Object product;
4848

4949
// ordered by type root definition
50-
switch (fieldType.getTypeRoot()) {
50+
switch (typeRoot) {
5151
case DECIMAL:
5252
Decimal mergeFieldDD = (Decimal) accumulator;
5353
Decimal inFieldDD = (Decimal) inputField;
@@ -82,7 +82,7 @@ public Object agg(Object accumulator, Object inputField) {
8282
String msg =
8383
String.format(
8484
"type %s not support in %s",
85-
fieldType.getTypeRoot().toString(), this.getClass().getName());
85+
typeRoot.toString(), this.getClass().getName());
8686
throw new IllegalArgumentException(msg);
8787
}
8888
return product;

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldSumAgg.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public Object agg(Object accumulator, Object inputField) {
4343
Object sum;
4444

4545
// ordered by type root definition
46-
switch (fieldType.getTypeRoot()) {
46+
switch (typeRoot) {
4747
case DECIMAL:
4848
Decimal mergeFieldDD = (Decimal) accumulator;
4949
Decimal inFieldDD = (Decimal) inputField;
@@ -80,7 +80,7 @@ public Object agg(Object accumulator, Object inputField) {
8080
String msg =
8181
String.format(
8282
"type %s not support in %s",
83-
fieldType.getTypeRoot().toString(), this.getClass().getName());
83+
typeRoot.toString(), this.getClass().getName());
8484
throw new IllegalArgumentException(msg);
8585
}
8686
return sum;

0 commit comments

Comments
 (0)