Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
Expand All @@ -35,10 +38,15 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.NoRebalanceInProgressException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
import org.apache.fluss.exception.ServerTagNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
Expand Down Expand Up @@ -492,4 +500,94 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);

/**
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
*
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
*
* <p>If one tabletServer already has a serverTag, and the serverTag is same with the existing
* one, this operation will be ignored.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists for any one of
* the tabletServers, and the server tag is different from the existing one.
* </ul>
*
* @param tabletServers the tabletServers we want to add server tags.
* @param serverTag the server tag to be added.
*/
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Remove server tag from the specified tabletServers.
*
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
*
* <p>No exception will be thrown if the server already has no any server tag now.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagNotExistException} If the server tag does not exist for any one of the
* tabletServers.
* </ul>
*
* @param tabletServers the tabletServers we want to remove server tags.
*/
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
* bucket load.
*
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
* balancing according to the user-defined {@code priorityGoals}.
*
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
* execution.
* </ul>
*
* @param priorityGoals the goals to be optimized.
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
* it.
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
*/
CompletableFuture<RebalancePlan> rebalance(List<GoalType> priorityGoals, boolean dryRun);

/**
* List the rebalance progress.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*
* @return the rebalance process.
*/
CompletableFuture<RebalanceProgress> listRebalanceProgress();

/**
* Cannel the rebalance task.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*/
CompletableFuture<Void> cancelRebalance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.LeaderNotAvailableException;
Expand Down Expand Up @@ -531,6 +533,33 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
return future;
}

@Override
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<Void> removeServerTag(
List<Integer> tabletServers, ServerTag serverTag) {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<RebalancePlan> rebalance(
List<GoalType> priorityGoals, boolean dryRun) {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<RebalanceProgress> listRebalanceProgress() {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<Void> cancelRebalance() {
throw new UnsupportedOperationException("Support soon");
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.admin;

import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.metadata.TableBucket;

import java.util.Map;

/**
* The rebalance plan.
*
* @since 0.9
*/
public class RebalancePlan {

private final Map<TableBucket, RebalancePlanForBucket> planForBucketMap;

public RebalancePlan(Map<TableBucket, RebalancePlanForBucket> planForBucketMap) {
this.planForBucketMap = planForBucketMap;
}

public Map<TableBucket, RebalancePlanForBucket> getPlanForBucketMap() {
return planForBucketMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.admin;

import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceStatus;
import org.apache.fluss.metadata.TableBucket;

import java.util.Map;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* The rebalance progress.
*
* @since 0.9
*/
public class RebalanceProgress {

/** The rebalance status for the overall rebalance. */
private final RebalanceStatus rebalanceStatus;

/** The rebalance progress for the overall rebalance. Between 0.0d to 1.0d */
private final double progress;

/** The rebalance progress for each tabletBucket. */
private final Map<TableBucket, RebalanceResultForBucket> progressForBucketMap;

public RebalanceProgress(
RebalanceStatus rebalanceStatus,
double progress,
Map<TableBucket, RebalanceResultForBucket> progressForBucketMap) {
// TODO: we may derive the overall progress and status from progressForBucketMap
this.rebalanceStatus = checkNotNull(rebalanceStatus);
this.progress = progress;
this.progressForBucketMap = checkNotNull(progressForBucketMap);
}

public RebalanceStatus status() {
return rebalanceStatus;
}

public double progress() {
return progress;
}

public Map<TableBucket, RebalanceResultForBucket> progressForBucketMap() {
return progressForBucketMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.cluster.rebalance;

import org.apache.fluss.annotation.PublicEvolving;

import java.util.Arrays;

/**
* The type of goal to optimize.
*
* @since 0.9
*/
@PublicEvolving
public enum GoalType {
/**
* Goal to generate replica movement tasks to ensure that the number of replicas on each
* tabletServer is near balanced.
*/
REPLICA_DISTRIBUTION_GOAL(0),

/**
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
* number of leader replicas on each tabletServer is near balanced.
*/
LEADER_DISTRIBUTION_GOAL(1);

public final int value;

GoalType(int value) {
this.value = value;
}

public static GoalType valueOf(int value) {
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
return REPLICA_DISTRIBUTION_GOAL;
} else if (value == LEADER_DISTRIBUTION_GOAL.value) {
return LEADER_DISTRIBUTION_GOAL;
} else {
throw new IllegalArgumentException(
String.format(
"Value %s must be one of %s", value, Arrays.asList(GoalType.values())));
}
}
}
Loading