Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
Expand All @@ -35,10 +40,15 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.NoRebalanceInProgressException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
import org.apache.fluss.exception.ServerTagNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
Expand All @@ -60,6 +70,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -492,4 +503,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);

/**
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
*
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists for any one of
* the tabletServers.
* </ul>
*
* @param tabletServers the tabletServers we want to add server tags.
* @param serverTag the server tag to be added.
*/
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Remove server tag from the specified tabletServers.
*
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagNotExistException} If the server tag does not exist for any one of the
* tabletServers.
* </ul>
*
* @param tabletServers the tabletServers we want to remove server tags.
*/
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
* bucket load.
*
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
* balancing according to the user-defined {@code priorityGoals}.
*
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
* execution.
* </ul>
*
* @param priorityGoals the goals to be optimized.
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
* it.
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
*/
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
List<GoalType> priorityGoals, boolean dryRun);

/**
* List the rebalance process.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*
* @return the rebalance process for all the tableBuckets doing rebalance.
*/
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();

/**
* Cannel the rebalance task.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have cluster
* permissions.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*/
CompletableFuture<Void> cancelRebalance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.LeaderNotAvailableException;
Expand All @@ -44,6 +48,7 @@
import org.apache.fluss.rpc.gateway.AdminGateway;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AddServerTagRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
Expand Down Expand Up @@ -72,6 +77,7 @@
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
import org.apache.fluss.rpc.messages.PbPartitionSpec;
import org.apache.fluss.rpc.messages.PbTablePath;
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
import org.apache.fluss.rpc.protocol.ApiError;
Expand Down Expand Up @@ -531,6 +537,37 @@ public CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> confi
return future;
}

@Override
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
tabletServers.forEach(request::addServerId);
return gateway.addServerTag(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Void> removeServerTag(
List<Integer> tabletServers, ServerTag serverTag) {
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
tabletServers.forEach(request::addServerId);
return gateway.removeServerTag(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
List<GoalType> priorityGoals, boolean dryRun) {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<Void> cancelRebalance() {
throw new UnsupportedOperationException("Support soon");
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.AutoPartitionTimeUnit;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand All @@ -42,6 +43,9 @@
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
import org.apache.fluss.exception.ServerTagNotExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.exception.TooManyBucketsException;
Expand All @@ -64,6 +68,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -1436,4 +1441,63 @@ public void testSystemsColumns() throws Exception {
+ "Please use other names for these columns. "
+ "The reserved system columns are: __offset, __timestamp, __bucket");
}

@Test
public void testAddAndRemoveServerTags() throws Exception {
ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
// 1.add server tag to a none exists server.
assertThatThrownBy(
() ->
admin.addServerTag(
Collections.singletonList(100),
ServerTag.PERMANENT_OFFLINE)
.get())
.cause()
.isInstanceOf(ServerNotExistException.class)
.hasMessageContaining("Server 100 not exists when trying to add server tag.");

// 2.add server tag for server 0,1.
admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
// TODO use api to get serverTags instead of getting from zk directly
assertThat(zkClient.getServerTags()).isPresent();
assertThat(zkClient.getServerTags().get().getServerTags())
.containsEntry(0, ServerTag.PERMANENT_OFFLINE)
.containsEntry(1, ServerTag.PERMANENT_OFFLINE);

// 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added.
assertThatThrownBy(
() ->
admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE)
.get())
.cause()
.isInstanceOf(ServerTagAlreadyExistException.class)
.hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0.");

// 4.remove server tag for server 100
assertThatThrownBy(
() ->
admin.removeServerTag(
Collections.singletonList(100),
ServerTag.PERMANENT_OFFLINE)
.get())
.cause()
.isInstanceOf(ServerNotExistException.class)
.hasMessageContaining("Server 100 not exists when trying to removing server tag.");

// 5.remove server tag for server 0,1.
admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get();
assertThat(zkClient.getServerTags()).isPresent();
assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty();

// 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed.
assertThatThrownBy(
() ->
admin.removeServerTag(
Collections.singletonList(0),
ServerTag.PERMANENT_OFFLINE)
.get())
.cause()
.isInstanceOf(ServerTagNotExistException.class)
.hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.cluster.rebalance;

import org.apache.fluss.annotation.PublicEvolving;

import java.util.Arrays;

/**
* The type of goal to optimize.
*
* @since 0.8
*/
@PublicEvolving
public enum GoalType {
/**
* Goal to generate replica movement tasks to ensure that the number of replicas on each
* tabletServer is near balanced.
*/
REPLICA_DISTRIBUTION_GOAL(0),

/**
* Goal to generate leadership movement and leader replica movement tasks to ensure that the
* number of leader replicas on each tabletServer is near balanced.
*/
LEADER_REPLICA_DISTRIBUTION_GOAL(1);

public final int value;

GoalType(int value) {
this.value = value;
}

public static GoalType valueOf(int value) {
if (value == REPLICA_DISTRIBUTION_GOAL.value) {
return REPLICA_DISTRIBUTION_GOAL;
} else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) {
return LEADER_REPLICA_DISTRIBUTION_GOAL;
} else {
throw new IllegalArgumentException(
String.format(
"Value %s must be one of %s", value, Arrays.asList(GoalType.values())));
}
}
}
Loading