Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1418,9 +1418,10 @@ public class ConfigOptions {
.noDefaultValue()
.withDescription(
"Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. "
+ "The supported merge engines are `first_row` and `versioned`. "
+ "The supported merge engines are `first_row`, `versioned`, and `aggregate`. "
+ "The `first_row` merge engine will keep the first row of the same primary key. "
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key.");
+ "The `versioned` merge engine will keep the row with the largest version of the same primary key. "
+ "The `aggregate` merge engine will aggregate rows with the same primary key using field-level aggregate functions.");

public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
// we may need to introduce "del-column" in the future to support delete operation
Expand All @@ -1431,6 +1432,23 @@ public class ConfigOptions {
"The column name of the version column for the `versioned` merge engine. "
+ "If the merge engine is set to `versioned`, the version column must be set.");

public static final ConfigOption<String> TABLE_AGGREGATE_DEFAULT_FUNCTION =
key("table.merge-engine.aggregate.default-function")
.stringType()
.noDefaultValue()
.withDescription(
"Default aggregate function for all non-primary-key fields when using aggregate merge engine. "
+ "If not set, 'last_value_ignore_nulls' will be used as default. "
+ "Field-specific aggregate functions can be configured using 'table.merge-engine.aggregate.<field-name>'.");

public static final ConfigOption<Boolean> TABLE_AGG_REMOVE_RECORD_ON_DELETE =
key("table.aggregation.remove-record-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to remove the entire record when receiving a DELETE operation in aggregate merge engine. "
+ "By default, DELETE operations are not supported in aggregate merge engine.");

public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
key("table.delete.behavior")
.enumType(DeleteBehavior.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,37 @@ public Optional<String> getMergeEngineVersionColumn() {
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
}

/**
* Whether to remove record on delete for aggregate merge engine.
*
* @return true if record should be removed on delete, false otherwise
*/
public boolean getAggregationRemoveRecordOnDelete() {
return config.get(ConfigOptions.TABLE_AGG_REMOVE_RECORD_ON_DELETE);
}

/**
* Gets the listagg delimiter for a specific field in aggregate merge engine.
*
* <p>This method dynamically constructs the ConfigOption for field-specific delimiter. The
* configuration key pattern is: table.merge-engine.aggregate.{fieldName}.listagg-delimiter
*
* @param fieldName the field name
* @return the delimiter string, default to comma if not configured
*/
public String getFieldListaggDelimiter(String fieldName) {
return config.get(
ConfigBuilder.key(
"table.merge-engine.aggregate." + fieldName + ".listagg-delimiter")
.stringType()
.defaultValue(",")
.withDescription(
"Delimiter for listagg aggregation function of field '"
+ fieldName
+ "' in aggregate merge engine. "
+ "Default is comma ','."));
}

/** Gets the delete behavior of the table. */
public Optional<DeleteBehavior> getDeleteBehavior() {
return config.getOptional(ConfigOptions.TABLE_DELETE_BEHAVIOR);
Expand Down
111 changes: 111 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.metadata;

import org.apache.fluss.annotation.PublicEvolving;

import java.util.Locale;

/**
* Aggregation function for aggregate merge engine.
*
* <p>This enum represents all supported aggregation functions that can be applied to non-primary
* key columns in aggregation merge engine tables.
*/
@PublicEvolving
public enum AggFunction {
// Numeric aggregation
SUM("sum"),
PRODUCT("product"),
MAX("max"),
MIN("min"),

// Value selection
LAST_VALUE("last_value"),
LAST_VALUE_IGNORE_NULLS("last_value_ignore_nulls"),
FIRST_VALUE("first_value"),
FIRST_VALUE_IGNORE_NULLS("first_value_ignore_nulls"),

// String aggregation
LISTAGG("listagg"),
STRING_AGG("string_agg"), // Alias for LISTAGG - maps to same factory

// Boolean aggregation
BOOL_AND("bool_and"),
BOOL_OR("bool_or");

private final String identifier;

AggFunction(String identifier) {
this.identifier = identifier;
}

/**
* Returns the identifier string for this aggregation function.
*
* @return the identifier string
*/
public String getIdentifier() {
return identifier;
}

/**
* Converts a string to an AggFunction enum value.
*
* <p>This method supports multiple naming formats:
*
* <ul>
* <li>Underscore format: "last_value_ignore_nulls"
* <li>Hyphen format: "last-value-ignore-nulls"
* <li>Case insensitive matching
* </ul>
*
* <p>Note: For string_agg, this will return STRING_AGG enum, but the server-side factory will
* map it to the same implementation as listagg.
*
* @param name the aggregation function name
* @return the AggFunction enum value, or null if not found
*/
public static AggFunction fromString(String name) {
if (name == null || name.trim().isEmpty()) {
return null;
}

// Normalize the input: convert hyphens to underscores and lowercase
String normalized = name.replace('-', '_').toLowerCase(Locale.ROOT).trim();

// Try direct match with identifier
for (AggFunction aggFunc : values()) {
if (aggFunc.identifier.equals(normalized)) {
return aggFunc;
}
}

return null;
}

/**
* Converts this AggFunction to its string identifier.
*
* @return the identifier string
*/
@Override
public String toString() {
return identifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ public enum MergeEngineType {
* <li>Null version value is treated as the smallest version (i.e., Long.MIN_VALUE)
* </ul>
*/
VERSIONED;
VERSIONED,

// introduce AGGREGATE merge engine in the future
/**
* A merge engine that aggregates rows with the same primary key using field-level aggregate
* functions. Each non-primary-key field can have its own aggregate function (e.g., sum, max,
* min, last_value, etc.). This allows for flexible aggregation semantics at the field level.
*
* @since 0.9
*/
AGGREGATE;

/** Creates a {@link MergeEngineType} from the given string. */
public static MergeEngineType fromString(String type) {
Expand All @@ -63,6 +70,8 @@ public static MergeEngineType fromString(String type) {
return FIRST_ROW;
case "VERSIONED":
return VERSIONED;
case "AGGREGATE":
return AGGREGATE;
default:
throw new IllegalArgumentException("Unsupported merge engine type: " + type);
}
Expand Down
Loading