Skip to content

Commit 34dd539

Browse files
improve listagg performance
1 parent d34b27a commit 34dd539

File tree

3 files changed

+291
-5
lines changed

3 files changed

+291
-5
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/BinaryStringUtils.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.fluss.utils;
1919

20+
import org.apache.fluss.row.BinarySegmentUtils;
2021
import org.apache.fluss.row.BinaryString;
2122
import org.apache.fluss.row.TimestampLtz;
2223
import org.apache.fluss.row.TimestampNtz;
2324

2425
import java.time.DateTimeException;
26+
import java.util.Arrays;
2527
import java.util.List;
2628
import java.util.TimeZone;
2729
import java.util.stream.Collectors;
@@ -81,4 +83,125 @@ public static TimestampLtz toTimestampLtz(
8183
BinaryString input, int precision, TimeZone localTimeZone) throws DateTimeException {
8284
return DateTimeUtils.parseTimestampData(input.toString(), precision, localTimeZone);
8385
}
86+
87+
/**
88+
* Concatenates input strings together into a single string. Returns NULL if any argument is
89+
* NULL.
90+
*
91+
* <p>This method is optimized to avoid unnecessary string conversions and memory allocations.
92+
* It directly operates on the underlying UTF-8 byte arrays of BinaryString objects.
93+
*
94+
* @param inputs the strings to concatenate
95+
* @return the concatenated string, or NULL if any input is NULL
96+
*/
97+
public static BinaryString concat(BinaryString... inputs) {
98+
return concat(Arrays.asList(inputs));
99+
}
100+
101+
/**
102+
* Concatenates input strings together into a single string. Returns NULL if any argument is
103+
* NULL.
104+
*
105+
* <p>This method is optimized to avoid unnecessary string conversions and memory allocations.
106+
* It directly operates on the underlying UTF-8 byte arrays of BinaryString objects.
107+
*
108+
* @param inputs the strings to concatenate
109+
* @return the concatenated string, or NULL if any input is NULL
110+
*/
111+
public static BinaryString concat(Iterable<BinaryString> inputs) {
112+
// Compute the total length of the result.
113+
int totalLength = 0;
114+
for (BinaryString input : inputs) {
115+
if (input == null) {
116+
return null;
117+
}
118+
119+
totalLength += input.getSizeInBytes();
120+
}
121+
122+
// Allocate a new byte array, and copy the inputs one by one into it.
123+
final byte[] result = new byte[totalLength];
124+
int offset = 0;
125+
for (BinaryString input : inputs) {
126+
if (input != null) {
127+
int len = input.getSizeInBytes();
128+
BinarySegmentUtils.copyToBytes(
129+
input.getSegments(), input.getOffset(), result, offset, len);
130+
offset += len;
131+
}
132+
}
133+
return BinaryString.fromBytes(result);
134+
}
135+
136+
/**
137+
* Concatenates input strings together into a single string using the separator. Returns NULL if
138+
* the separator is NULL.
139+
*
140+
* <p>Note: CONCAT_WS() does not skip any empty strings, however it does skip any NULL values
141+
* after the separator. For example, concat_ws(",", "a", null, "c") would yield "a,c".
142+
*
143+
* @param separator the separator to use between strings
144+
* @param inputs the strings to concatenate
145+
* @return the concatenated string with separator, or NULL if separator is NULL
146+
*/
147+
public static BinaryString concatWs(BinaryString separator, BinaryString... inputs) {
148+
return concatWs(separator, Arrays.asList(inputs));
149+
}
150+
151+
/**
152+
* Concatenates input strings together into a single string using the separator. Returns NULL if
153+
* the separator is NULL.
154+
*
155+
* <p>Note: CONCAT_WS() does not skip any empty strings, however it does skip any NULL values
156+
* after the separator. For example, concat_ws(",", "a", null, "c") would yield "a,c".
157+
*
158+
* @param separator the separator to use between strings
159+
* @param inputs the strings to concatenate
160+
* @return the concatenated string with separator, or NULL if separator is NULL
161+
*/
162+
public static BinaryString concatWs(BinaryString separator, Iterable<BinaryString> inputs) {
163+
if (null == separator) {
164+
return null;
165+
}
166+
167+
int numInputBytes = 0; // total number of bytes from the inputs
168+
int numInputs = 0; // number of non-null inputs
169+
for (BinaryString input : inputs) {
170+
if (input != null) {
171+
numInputBytes += input.getSizeInBytes();
172+
numInputs++;
173+
}
174+
}
175+
176+
if (numInputs == 0) {
177+
// Return an empty string if there is no input, or all the inputs are null.
178+
return BinaryString.EMPTY_UTF8;
179+
}
180+
181+
// Allocate a new byte array, and copy the inputs one by one into it.
182+
// The size of the new array is the size of all inputs, plus the separators.
183+
final byte[] result =
184+
new byte[numInputBytes + (numInputs - 1) * separator.getSizeInBytes()];
185+
int offset = 0;
186+
187+
int i = 0;
188+
for (BinaryString input : inputs) {
189+
if (input != null) {
190+
int len = input.getSizeInBytes();
191+
BinarySegmentUtils.copyToBytes(
192+
input.getSegments(), input.getOffset(), result, offset, len);
193+
offset += len;
194+
195+
i++;
196+
// Add separator if this is not the last input
197+
if (i < numInputs) {
198+
int sepLen = separator.getSizeInBytes();
199+
BinarySegmentUtils.copyToBytes(
200+
separator.getSegments(), separator.getOffset(), result, offset, sepLen);
201+
offset += sepLen;
202+
}
203+
}
204+
}
205+
return BinaryString.fromBytes(result);
206+
}
84207
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.utils;
19+
20+
import org.apache.fluss.row.BinaryString;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/** Test for {@link BinaryStringUtils}. */
27+
public class BinaryStringUtilsTest {
28+
29+
@Test
30+
public void testConcat() {
31+
// Test basic concatenation
32+
BinaryString s1 = BinaryString.fromString("hello");
33+
BinaryString s2 = BinaryString.fromString(" ");
34+
BinaryString s3 = BinaryString.fromString("world");
35+
36+
BinaryString result = BinaryStringUtils.concat(s1, s2, s3);
37+
assertThat(result.toString()).isEqualTo("hello world");
38+
39+
// Test concatenation with empty string
40+
BinaryString empty = BinaryString.fromString("");
41+
result = BinaryStringUtils.concat(s1, empty, s3);
42+
assertThat(result.toString()).isEqualTo("helloworld");
43+
44+
// Test concatenation with single string
45+
result = BinaryStringUtils.concat(s1);
46+
assertThat(result.toString()).isEqualTo("hello");
47+
48+
// Test concatenation with null
49+
result = BinaryStringUtils.concat(s1, null, s3);
50+
assertThat(result).isNull();
51+
52+
// Test concatenation with Chinese characters
53+
BinaryString chinese1 = BinaryString.fromString("你好");
54+
BinaryString chinese2 = BinaryString.fromString("世界");
55+
result = BinaryStringUtils.concat(chinese1, chinese2);
56+
assertThat(result.toString()).isEqualTo("你好世界");
57+
58+
// Test concatenation with mixed ASCII and multi-byte characters
59+
BinaryString mixed1 = BinaryString.fromString("Hello");
60+
BinaryString mixed2 = BinaryString.fromString("世界");
61+
BinaryString mixed3 = BinaryString.fromString("!");
62+
result = BinaryStringUtils.concat(mixed1, mixed2, mixed3);
63+
assertThat(result.toString()).isEqualTo("Hello世界!");
64+
}
65+
66+
@Test
67+
public void testConcatWs() {
68+
BinaryString sep = BinaryString.fromString(",");
69+
BinaryString s1 = BinaryString.fromString("a");
70+
BinaryString s2 = BinaryString.fromString("b");
71+
BinaryString s3 = BinaryString.fromString("c");
72+
73+
// Test basic concatenation with separator
74+
BinaryString result = BinaryStringUtils.concatWs(sep, s1, s2, s3);
75+
assertThat(result.toString()).isEqualTo("a,b,c");
76+
77+
// Test with null separator
78+
result = BinaryStringUtils.concatWs(null, s1, s2, s3);
79+
assertThat(result).isNull();
80+
81+
// Test with null values in inputs (should skip nulls)
82+
result = BinaryStringUtils.concatWs(sep, s1, null, s3);
83+
assertThat(result.toString()).isEqualTo("a,c");
84+
85+
// Test with all null inputs
86+
result = BinaryStringUtils.concatWs(sep, null, null, null);
87+
assertThat(result).isEqualTo(BinaryString.EMPTY_UTF8);
88+
89+
// Test with single input
90+
result = BinaryStringUtils.concatWs(sep, s1);
91+
assertThat(result.toString()).isEqualTo("a");
92+
93+
// Test with empty strings (should not skip empty strings)
94+
BinaryString empty = BinaryString.fromString("");
95+
result = BinaryStringUtils.concatWs(sep, s1, empty, s3);
96+
assertThat(result.toString()).isEqualTo("a,,c");
97+
98+
// Test with different separator
99+
BinaryString dashSep = BinaryString.fromString("-");
100+
result = BinaryStringUtils.concatWs(dashSep, s1, s2, s3);
101+
assertThat(result.toString()).isEqualTo("a-b-c");
102+
103+
// Test with multi-character separator
104+
BinaryString multiSep = BinaryString.fromString(" | ");
105+
result = BinaryStringUtils.concatWs(multiSep, s1, s2, s3);
106+
assertThat(result.toString()).isEqualTo("a | b | c");
107+
108+
// Test with Chinese characters
109+
BinaryString chineseSep = BinaryString.fromString(",");
110+
BinaryString chinese1 = BinaryString.fromString("你好");
111+
BinaryString chinese2 = BinaryString.fromString("世界");
112+
result = BinaryStringUtils.concatWs(chineseSep, chinese1, chinese2);
113+
assertThat(result.toString()).isEqualTo("你好,世界");
114+
}
115+
116+
@Test
117+
public void testConcatIterable() {
118+
BinaryString s1 = BinaryString.fromString("a");
119+
BinaryString s2 = BinaryString.fromString("b");
120+
BinaryString s3 = BinaryString.fromString("c");
121+
122+
// Test with iterable
123+
BinaryString result = BinaryStringUtils.concat(java.util.Arrays.asList(s1, s2, s3));
124+
assertThat(result.toString()).isEqualTo("abc");
125+
126+
// Test with null in iterable
127+
result = BinaryStringUtils.concat(java.util.Arrays.asList(s1, null, s3));
128+
assertThat(result).isNull();
129+
}
130+
131+
@Test
132+
public void testConcatWsIterable() {
133+
BinaryString sep = BinaryString.fromString(",");
134+
BinaryString s1 = BinaryString.fromString("a");
135+
BinaryString s2 = BinaryString.fromString("b");
136+
BinaryString s3 = BinaryString.fromString("c");
137+
138+
// Test with iterable
139+
BinaryString result = BinaryStringUtils.concatWs(sep, java.util.Arrays.asList(s1, s2, s3));
140+
assertThat(result.toString()).isEqualTo("a,b,c");
141+
142+
// Test with null values in iterable (should skip nulls)
143+
result = BinaryStringUtils.concatWs(sep, java.util.Arrays.asList(s1, null, s3));
144+
assertThat(result.toString()).isEqualTo("a,c");
145+
}
146+
147+
@Test
148+
public void testConcatPerformance() {
149+
// Test to ensure concat works correctly for repeated concatenations
150+
// simulating listagg aggregation
151+
BinaryString delimiter = BinaryString.fromString(",");
152+
BinaryString accumulator = BinaryString.fromString("item1");
153+
154+
for (int i = 2; i <= 10; i++) {
155+
BinaryString newItem = BinaryString.fromString("item" + i);
156+
accumulator = BinaryStringUtils.concat(accumulator, delimiter, newItem);
157+
}
158+
159+
assertThat(accumulator.toString())
160+
.isEqualTo("item1,item2,item3,item4,item5,item6,item7,item8,item9,item10");
161+
}
162+
}

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldListaggAgg.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,21 @@
2525
import org.apache.fluss.config.TableConfig;
2626
import org.apache.fluss.row.BinaryString;
2727
import org.apache.fluss.types.StringType;
28+
import org.apache.fluss.utils.BinaryStringUtils;
2829

2930
/** List aggregation aggregator - concatenates string values with a delimiter. */
3031
public class FieldListaggAgg extends FieldAggregator {
3132

3233
private static final long serialVersionUID = 1L;
3334

34-
private final String delimiter;
35+
private final BinaryString delimiter;
3536

3637
public FieldListaggAgg(String name, StringType dataType, TableConfig options, String field) {
3738
super(name, dataType);
3839
// Read delimiter from configuration: table.merge-engine.aggregate.<field>.listagg-delimiter
3940
// Default to comma if not configured
40-
this.delimiter = options.getFieldListaggDelimiter(field);
41+
// Cache delimiter as BinaryString to avoid repeated conversions
42+
this.delimiter = BinaryString.fromString(options.getFieldListaggDelimiter(field));
4143
}
4244

4345
@Override
@@ -49,8 +51,7 @@ public Object agg(Object accumulator, Object inputField) {
4951
BinaryString mergeFieldSD = (BinaryString) accumulator;
5052
BinaryString inFieldSD = (BinaryString) inputField;
5153

52-
// Concatenate strings using simple string concatenation
53-
String result = mergeFieldSD + delimiter + inFieldSD;
54-
return BinaryString.fromString(result);
54+
// Use optimized concat method to avoid string conversion and reduce memory allocation
55+
return BinaryStringUtils.concat(mergeFieldSD, delimiter, inFieldSD);
5556
}
5657
}

0 commit comments

Comments
 (0)