Skip to content

Commit ba18fb4

Browse files
Implement basic Aggregate Merge Engine
1 parent de2d9a0 commit ba18fb4

File tree

54 files changed

+6181
-23
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+6181
-23
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,9 +1418,10 @@ public class ConfigOptions {
14181418
.noDefaultValue()
14191419
.withDescription(
14201420
"Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. "
1421-
+ "The supported merge engines are `first_row` and `versioned`. "
1421+
+ "The supported merge engines are `first_row`, `versioned`, and `aggregate`. "
14221422
+ "The `first_row` merge engine will keep the first row of the same primary key. "
1423-
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
1423+
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key. "
1424+
+ "The `aggregate` merge engine will aggregate rows with the same primary key using field-level aggregate functions.");
14241425

14251426
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
14261427
// we may need to introduce "del-column" in the future to support delete operation
@@ -1431,6 +1432,23 @@ public class ConfigOptions {
14311432
"The column name of the version column for the `versioned` merge engine. "
14321433
+ "If the merge engine is set to `versioned`, the version column must be set.");
14331434

1435+
public static final ConfigOption<String> TABLE_AGGREGATE_DEFAULT_FUNCTION =
1436+
key("table.merge-engine.aggregate.default-function")
1437+
.stringType()
1438+
.noDefaultValue()
1439+
.withDescription(
1440+
"Default aggregate function for all non-primary-key fields when using aggregate merge engine. "
1441+
+ "If not set, 'last_value_ignore_nulls' will be used as default. "
1442+
+ "Field-specific aggregate functions can be configured using 'table.merge-engine.aggregate.<field-name>'.");
1443+
1444+
public static final ConfigOption<Boolean> TABLE_AGG_REMOVE_RECORD_ON_DELETE =
1445+
key("table.aggregation.remove-record-on-delete")
1446+
.booleanType()
1447+
.defaultValue(false)
1448+
.withDescription(
1449+
"Whether to remove the entire record when receiving a DELETE operation in aggregate merge engine. "
1450+
+ "By default, DELETE operations are not supported in aggregate merge engine.");
1451+
14341452
public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
14351453
key("table.delete.behavior")
14361454
.enumType(DeleteBehavior.class)

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,37 @@ public Optional<String> getMergeEngineVersionColumn() {
118118
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
119119
}
120120

121+
/**
122+
* Whether to remove record on delete for aggregate merge engine.
123+
*
124+
* @return true if record should be removed on delete, false otherwise
125+
*/
126+
public boolean getAggregationRemoveRecordOnDelete() {
127+
return config.get(ConfigOptions.TABLE_AGG_REMOVE_RECORD_ON_DELETE);
128+
}
129+
130+
/**
131+
* Gets the listagg delimiter for a specific field in aggregate merge engine.
132+
*
133+
* <p>This method dynamically constructs the ConfigOption for field-specific delimiter. The
134+
* configuration key pattern is: table.merge-engine.aggregate.{fieldName}.listagg-delimiter
135+
*
136+
* @param fieldName the field name
137+
* @return the delimiter string, default to comma if not configured
138+
*/
139+
public String getFieldListaggDelimiter(String fieldName) {
140+
return config.get(
141+
ConfigBuilder.key(
142+
"table.merge-engine.aggregate." + fieldName + ".listagg-delimiter")
143+
.stringType()
144+
.defaultValue(",")
145+
.withDescription(
146+
"Delimiter for listagg aggregation function of field '"
147+
+ fieldName
148+
+ "' in aggregate merge engine. "
149+
+ "Default is comma ','."));
150+
}
151+
121152
/** Gets the delete behavior of the table. */
122153
public Optional<DeleteBehavior> getDeleteBehavior() {
123154
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Locale;
23+
24+
/**
25+
* Aggregation function for aggregate merge engine.
26+
*
27+
* <p>This enum represents all supported aggregation functions that can be applied to non-primary
28+
* key columns in aggregation merge engine tables.
29+
*/
30+
@PublicEvolving
31+
public enum AggFunction {
32+
// Numeric aggregation
33+
SUM("sum"),
34+
PRODUCT("product"),
35+
MAX("max"),
36+
MIN("min"),
37+
38+
// Value selection
39+
LAST_VALUE("last_value"),
40+
LAST_VALUE_IGNORE_NULLS("last_value_ignore_nulls"),
41+
FIRST_VALUE("first_value"),
42+
FIRST_VALUE_IGNORE_NULLS("first_value_ignore_nulls"),
43+
44+
// String aggregation
45+
LISTAGG("listagg"),
46+
STRING_AGG("string_agg"), // Alias for LISTAGG - maps to same factory
47+
48+
// Boolean aggregation
49+
BOOL_AND("bool_and"),
50+
BOOL_OR("bool_or");
51+
52+
private final String identifier;
53+
54+
AggFunction(String identifier) {
55+
this.identifier = identifier;
56+
}
57+
58+
/**
59+
* Returns the identifier string for this aggregation function.
60+
*
61+
* @return the identifier string
62+
*/
63+
public String getIdentifier() {
64+
return identifier;
65+
}
66+
67+
/**
68+
* Converts a string to an AggFunction enum value.
69+
*
70+
* <p>This method supports multiple naming formats:
71+
*
72+
* <ul>
73+
* <li>Underscore format: "last_value_ignore_nulls"
74+
* <li>Hyphen format: "last-value-ignore-nulls"
75+
* <li>Case insensitive matching
76+
* </ul>
77+
*
78+
* <p>Note: For string_agg, this will return STRING_AGG enum, but the server-side factory will
79+
* map it to the same implementation as listagg.
80+
*
81+
* @param name the aggregation function name
82+
* @return the AggFunction enum value, or null if not found
83+
*/
84+
public static AggFunction fromString(String name) {
85+
if (name == null || name.trim().isEmpty()) {
86+
return null;
87+
}
88+
89+
// Normalize the input: convert hyphens to underscores and lowercase
90+
String normalized = name.replace('-', '_').toLowerCase(Locale.ROOT).trim();
91+
92+
// Try direct match with identifier
93+
for (AggFunction aggFunc : values()) {
94+
if (aggFunc.identifier.equals(normalized)) {
95+
return aggFunc;
96+
}
97+
}
98+
99+
return null;
100+
}
101+
102+
/**
103+
* Converts this AggFunction to its string identifier.
104+
*
105+
* @return the identifier string
106+
*/
107+
@Override
108+
public String toString() {
109+
return identifier;
110+
}
111+
}

fluss-common/src/main/java/org/apache/fluss/metadata/MergeEngineType.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,16 @@ public enum MergeEngineType {
5252
* <li>Null version value is treated as the smallest version (i.e., Long.MIN_VALUE)
5353
* </ul>
5454
*/
55-
VERSIONED;
55+
VERSIONED,
5656

57-
// introduce AGGREGATE merge engine in the future
57+
/**
58+
* A merge engine that aggregates rows with the same primary key using field-level aggregate
59+
* functions. Each non-primary-key field can have its own aggregate function (e.g., sum, max,
60+
* min, last_value, etc.). This allows for flexible aggregation semantics at the field level.
61+
*
62+
* @since 0.9
63+
*/
64+
AGGREGATE;
5865

5966
/** Creates a {@link MergeEngineType} from the given string. */
6067
public static MergeEngineType fromString(String type) {
@@ -63,6 +70,8 @@ public static MergeEngineType fromString(String type) {
6370
return FIRST_ROW;
6471
case "VERSIONED":
6572
return VERSIONED;
73+
case "AGGREGATE":
74+
return AGGREGATE;
6675
default:
6776
throw new IllegalArgumentException("Unsupported merge engine type: " + type);
6877
}

0 commit comments

Comments
 (0)