From 749689fd9b95be645e7ee414ae35621a519c085a Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 21 Jul 2025 19:52:38 +0800 Subject: [PATCH 1/2] [server] Introduce new rebalance API --- .../org/apache/fluss/client/admin/Admin.java | 97 +++++++++++ .../apache/fluss/client/admin/FlussAdmin.java | 31 ++++ .../fluss/cluster/rebalance/GoalType.java | 60 +++++++ .../rebalance/RebalancePlanForBucket.java | 112 +++++++++++++ .../rebalance/RebalanceResultForBucket.java | 77 +++++++++ .../rebalance/RebalanceStatusForBucket.java | 48 ++++++ .../fluss/cluster/rebalance/ServerTag.java | 52 ++++++ .../NoRebalanceInProgressException.java | 34 ++++ .../exception/RebalanceFailureException.java | 38 +++++ .../exception/ServerNotExistException.java | 34 ++++ .../ServerTagAlreadyExistException.java | 34 ++++ .../exception/ServerTagNotExistException.java | 35 ++++ .../fluss/rpc/gateway/AdminGateway.java | 26 +++ .../apache/fluss/rpc/protocol/ApiKeys.java | 7 +- .../org/apache/fluss/rpc/protocol/Errors.java | 14 +- fluss-rpc/src/main/proto/FlussApi.proto | 75 +++++++++ .../coordinator/CoordinatorService.java | 38 +++++ .../fluss/server/zk/ZooKeeperClient.java | 47 ++++++ .../fluss/server/zk/data/RebalancePlan.java | 107 ++++++++++++ .../zk/data/RebalancePlanJsonSerde.java | 156 ++++++++++++++++++ .../fluss/server/zk/data/ServerTags.java | 65 ++++++++ .../server/zk/data/ServerTagsJsonSerde.java | 67 ++++++++ .../apache/fluss/server/zk/data/ZkData.java | 42 +++++ .../coordinator/TestCoordinatorGateway.java | 38 +++++ .../fluss/server/zk/ZooKeeperClientTest.java | 78 +++++++++ .../zk/data/RebalancePlanJsonSerdeTest.java | 97 +++++++++++ .../zk/data/ServerTagsJsonSerdeTest.java | 50 ++++++ 27 files changed, 1557 insertions(+), 2 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index 1ef5c30ebd..feca0e5f68 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -22,9 +22,14 @@ import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.AuthorizationException; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -35,10 +40,15 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.KvSnapshotNotExistException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.SchemaNotExistException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TableNotPartitionedException; @@ -60,6 +70,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -492,4 +503,90 @@ ListOffsetsResult listOffsets( * @return A CompletableFuture indicating completion of the operation. */ CompletableFuture alterClusterConfigs(Collection configs); + + /** + * Add server tag to the specified tabletServers, one tabletServer can only have one serverTag. + * + *

If one tabletServer failed adding tag, none of the tags will take effect. + * + *

+ * + * @param tabletServers the tabletServers we want to add server tags. + * @param serverTag the server tag to be added. + */ + CompletableFuture addServerTag(List tabletServers, ServerTag serverTag); + + /** + * Remove server tag from the specified tabletServers. + * + *

If one tabletServer failed removing tag, none of the tags will be removed. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not + * exist. + *
  • {@link ServerTagNotExistException} If the server tag does not exist for any one of the + * tabletServers. + *
+ * + * @param tabletServers the tabletServers we want to remove server tags. + */ + CompletableFuture removeServerTag(List tabletServers, ServerTag serverTag); + + /** + * Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's + * bucket load. + * + *

More details, Fluss collects the cluster's load information and optimizes to perform load + * balancing according to the user-defined {@code priorityGoals}. + * + *

Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted + * rebalance task exists, an {@link RebalanceFailureException} will be thrown. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing + * execution. + *
+ * + * @param priorityGoals the goals to be optimized. + * @param dryRun Calculate and return the rebalance optimization proposal, but do not execute + * it. + * @return the generated rebalance plan for all the tableBuckets which need to do rebalance. + */ + CompletableFuture> rebalance( + List priorityGoals, boolean dryRun); + + /** + * List the rebalance process. + * + *
    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. + *
+ * + * @return the rebalance process for all the tableBuckets doing rebalance. + */ + CompletableFuture> listRebalanceProcess(); + + /** + * Cannel the rebalance task. + * + *
    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. + *
+ */ + CompletableFuture cancelRebalance(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 27124c70fd..cdf9a037f8 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -24,6 +24,10 @@ import org.apache.fluss.client.utils.ClientRpcMessageUtils; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; import org.apache.fluss.exception.LeaderNotAvailableException; @@ -531,6 +535,33 @@ public CompletableFuture alterClusterConfigs(Collection confi return future; } + @Override + public CompletableFuture addServerTag(List tabletServers, ServerTag serverTag) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture removeServerTag( + List tabletServers, ServerTag serverTag) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture> rebalance( + List priorityGoals, boolean dryRun) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture> listRebalanceProcess() { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture cancelRebalance() { + throw new UnsupportedOperationException("Support soon"); + } + @Override public void close() { // nothing to do yet diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java new file mode 100644 index 0000000000..6dc7624805 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Arrays; + +/** + * The type of goal to optimize. + * + * @since 0.8 + */ +@PublicEvolving +public enum GoalType { + /** + * Goal to generate replica movement tasks to ensure that the number of replicas on each + * tabletServer is near balanced. + */ + REPLICA_DISTRIBUTION_GOAL(0), + + /** + * Goal to generate leadership movement and leader replica movement tasks to ensure that the + * number of leader replicas on each tabletServer is near balanced. + */ + LEADER_REPLICA_DISTRIBUTION_GOAL(1); + + public final int value; + + GoalType(int value) { + this.value = value; + } + + public static GoalType valueOf(int value) { + if (value == REPLICA_DISTRIBUTION_GOAL.value) { + return REPLICA_DISTRIBUTION_GOAL; + } else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) { + return LEADER_REPLICA_DISTRIBUTION_GOAL; + } else { + throw new IllegalArgumentException( + String.format( + "Value %s must be one of %s", value, Arrays.asList(GoalType.values()))); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java new file mode 100644 index 0000000000..bedce22663 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.List; +import java.util.Objects; + +/** + * a generated rebalance plan for a tableBucket. + * + * @since 0.8 + */ +@PublicEvolving +public class RebalancePlanForBucket { + private final TableBucket tableBucket; + private final int originalLeader; + private final int newLeader; + private final List originReplicas; + private final List newReplicas; + + public RebalancePlanForBucket( + TableBucket tableBucket, + int originalLeader, + int newLeader, + List originReplicas, + List newReplicas) { + this.tableBucket = tableBucket; + this.originalLeader = originalLeader; + this.newLeader = newLeader; + this.originReplicas = originReplicas; + this.newReplicas = newReplicas; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public int getBucketId() { + return tableBucket.getBucket(); + } + + public Integer getOriginalLeader() { + return originalLeader; + } + + public Integer getNewLeader() { + return newLeader; + } + + public List getOriginReplicas() { + return originReplicas; + } + + public List getNewReplicas() { + return newReplicas; + } + + @Override + public String toString() { + return "RebalancePlanForBucket{" + + "tableBucket=" + + tableBucket + + ", originalLeader=" + + originalLeader + + ", newLeader=" + + newLeader + + ", originReplicas=" + + originReplicas + + ", newReplicas=" + + newReplicas + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RebalancePlanForBucket that = (RebalancePlanForBucket) o; + return Objects.equals(tableBucket, that.tableBucket) + && originalLeader == that.originalLeader + && newLeader == that.newLeader + && Objects.equals(originReplicas, that.originReplicas) + && Objects.equals(newReplicas, that.newReplicas); + } + + @Override + public int hashCode() { + return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java new file mode 100644 index 0000000000..c477e524b3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.List; + +/** + * Status of rebalance process for a tabletBucket. + * + * @since 0.8 + */ +@PublicEvolving +public class RebalanceResultForBucket { + private final RebalancePlanForBucket rebalancePlanForBucket; + private RebalanceStatusForBucket rebalanceStatusForBucket; + + public RebalanceResultForBucket( + RebalancePlanForBucket rebalancePlanForBucket, + RebalanceStatusForBucket rebalanceStatusForBucket) { + this.rebalancePlanForBucket = rebalancePlanForBucket; + this.rebalanceStatusForBucket = rebalanceStatusForBucket; + } + + public TableBucket tableBucket() { + return rebalancePlanForBucket.getTableBucket(); + } + + public RebalancePlanForBucket planForBucket() { + return rebalancePlanForBucket; + } + + public List newReplicas() { + return rebalancePlanForBucket.getNewReplicas(); + } + + public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) { + this.rebalanceStatusForBucket = status; + return this; + } + + public RebalanceStatusForBucket status() { + return rebalanceStatusForBucket; + } + + public static RebalanceResultForBucket of( + RebalancePlanForBucket planForBucket, RebalanceStatusForBucket status) { + return new RebalanceResultForBucket(planForBucket, status); + } + + @Override + public String toString() { + return "RebalanceResultForBucket{" + + "rebalancePlanForBucket=" + + rebalancePlanForBucket + + ", rebalanceStatusForBucket=" + + rebalanceStatusForBucket + + '}'; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java new file mode 100644 index 0000000000..e8c0e46733 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Rebalance status for single bucket. + * + * @since 0.8 + */ +@PublicEvolving +public enum RebalanceStatusForBucket { + PENDING(1), + REBALANCING(2), + FAILED(3), + COMPLETED(4); + + private final int code; + + RebalanceStatusForBucket(int code) { + this.code = code; + } + + public static RebalanceStatusForBucket of(int code) { + for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) { + if (status.code == code) { + return status; + } + } + return null; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java new file mode 100644 index 0000000000..66849060d4 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Arrays; + +/** + * The tag of tabletServer. + * + * @since 0.8 + */ +@PublicEvolving +public enum ServerTag { + PERMANENT_OFFLINE(0), + TEMPORARY_OFFLINE(1); + + public final int value; + + ServerTag(int value) { + this.value = value; + } + + public static ServerTag valueOf(int value) { + if (value == PERMANENT_OFFLINE.value) { + return PERMANENT_OFFLINE; + } else if (value == TEMPORARY_OFFLINE.value) { + return TEMPORARY_OFFLINE; + } else { + throw new IllegalArgumentException( + String.format( + "Value %s must be one of %s", + value, Arrays.asList(ServerTag.values()))); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java new file mode 100644 index 0000000000..8b052a5100 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if there are no rebalance tasks in progress when list rebalance process. + * + * @since 0.8 + */ +@PublicEvolving +public class NoRebalanceInProgressException extends ApiException { + private static final long serialVersionUID = 1L; + + public NoRebalanceInProgressException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java new file mode 100644 index 0000000000..0dcf260b0c --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.VisibleForTesting; + +/** + * This exception is thrown if rebalance failed. + * + * @since 0.8 + */ +@VisibleForTesting +public class RebalanceFailureException extends ApiException { + private static final long serialVersionUID = 1L; + + public RebalanceFailureException(String message) { + super(message); + } + + public RebalanceFailureException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java new file mode 100644 index 0000000000..2bdbe621e8 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server does not exist in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerNotExistException extends ApiException { + private static final long serialVersionUID = 1L; + + public ServerNotExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java new file mode 100644 index 0000000000..a3d4259b13 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server tag already exists for specify tabletServer in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerTagAlreadyExistException extends ApiException { + private static final long serialVersionUID = 1L; + + public ServerTagAlreadyExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java new file mode 100644 index 0000000000..bd62672c72 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server tag not exist for specify tabletServer in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerTagNotExistException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ServerTagNotExistException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index 7b322ff67e..996342dfcb 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -17,10 +17,14 @@ package org.apache.fluss.rpc.gateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse; import org.apache.fluss.rpc.messages.AlterTableRequest; import org.apache.fluss.rpc.messages.AlterTableResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; @@ -37,6 +41,12 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.rpc.protocol.RPC; @@ -120,6 +130,22 @@ public interface AdminGateway extends AdminReadOnlyGateway { CompletableFuture alterClusterConfigs( AlterClusterConfigsRequest request); + @RPC(api = ApiKeys.ADD_SERVER_TAG) + CompletableFuture addServerTag(AddServerTagRequest request); + + @RPC(api = ApiKeys.REMOVE_SERVER_TAG) + CompletableFuture removeServerTag(RemoveServerTagRequest request); + + @RPC(api = ApiKeys.REBALANCE) + CompletableFuture rebalance(RebalanceRequest request); + + @RPC(api = ApiKeys.LIST_REBALANCE_PROCESS) + CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request); + + @RPC(api = ApiKeys.CANCEL_REBALANCE) + CompletableFuture cancelRebalance(CancelRebalanceRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 8acec7a36d..97e14d79f0 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -74,7 +74,12 @@ public enum ApiKeys { CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE), ALTER_TABLE(1044, 0, 0, PUBLIC), DESCRIBE_CLUSTER_CONFIGS(1045, 0, 0, PUBLIC), - ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC); + ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC), + ADD_SERVER_TAG(1047, 0, 0, PUBLIC), + REMOVE_SERVER_TAG(1048, 0, 0, PUBLIC), + REBALANCE(1049, 0, 0, PUBLIC), + LIST_REBALANCE_PROCESS(1050, 0, 0, PUBLIC), + CANCEL_REBALANCE(1051, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 4b49a58475..5ee652cce2 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -52,6 +52,7 @@ import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NetworkException; +import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.NotEnoughReplicasAfterAppendException; import org.apache.fluss.exception.NotEnoughReplicasException; @@ -60,11 +61,15 @@ import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.SecurityTokenException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.StorageException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; @@ -228,7 +233,14 @@ public enum Errors { INVALID_ALTER_TABLE_EXCEPTION( 56, "The alter table is invalid.", InvalidAlterTableException::new), DELETION_DISABLED_EXCEPTION( - 57, "Deletion operations are disabled on this table.", DeletionDisabledException::new); + 57, "Deletion operations are disabled on this table.", DeletionDisabledException::new), + SERVER_NOT_EXIST_EXCEPTION(58, "The server is not exist.", ServerNotExistException::new), + SEVER_TAG_ALREADY_EXIST_EXCEPTION( + 59, "The server tag already exist.", ServerTagAlreadyExistException::new), + SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new), + REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new), + NO_REBALANCE_IN_PROGRESS_EXCEPTION( + 62, "No rebalance task in progress.", NoRebalanceInProgressException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index d2917734ce..891a295d16 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -574,6 +574,44 @@ message AlterClusterConfigsRequest{ message AlterClusterConfigsResponse{ } +message AddServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message AddServerTagResponse { +} + +message RemoveServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message RemoveServerTagResponse { +} + +message RebalanceRequest { + repeated int32 goals = 1 [packed = true]; + required bool dry_run = 2; +} + +message RebalanceResponse { + repeated PbRebalancePlanForTable plan_for_table = 1; +} + +message ListRebalanceProcessRequest { +} + +message ListRebalanceProcessResponse { + repeated PbRebalanceProcessForTable process_for_table = 1; +} + +message CancelRebalanceRequest { +} + +message CancelRebalanceResponse { +} + // --------------- Inner classes ---------------- @@ -955,3 +993,40 @@ message PbDescribeConfig { required string config_source = 3; } +message PbRebalancePlanForTable { + required int64 table_id = 1; + repeated PbRebalancePlanForPartition partitions_plan = 2; // for none-partition table, this is empty + repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, this is empty + +} + +message PbRebalancePlanForPartition { + required int64 partition_id = 1; + repeated PbRebalancePlanForBucket buckets_plan = 2; +} + +message PbRebalancePlanForBucket { + required int32 bucket_id = 1; + optional int32 original_leader = 2; + optional int32 new_leader = 3; + repeated int32 original_replicas = 4 [packed = true]; + repeated int32 new_replicas = 5 [packed = true]; +} + +message PbRebalanceProcessForTable { + required int64 table_id = 1; + repeated PbRebalanceProcessForPartition partitions_process = 2; + repeated PbRebalanceProcessForBucket buckets_process = 3; +} + +message PbRebalanceProcessForPartition { + required int64 partition_id = 1; + repeated PbRebalanceProcessForBucket buckets_process = 2; +} + +message PbRebalanceProcessForBucket { + required int32 bucket_id = 1; + repeated int32 original_replicas = 2 [packed = true]; + repeated int32 new_replicas = 3 [packed = true]; + required int32 rebalance_status = 4; +} \ No newline at end of file diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 3d58112070..95d98e4f94 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -44,12 +44,16 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse; import org.apache.fluss.rpc.messages.AlterTableRequest; import org.apache.fluss.rpc.messages.AlterTableResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -76,11 +80,17 @@ import org.apache.fluss.rpc.messages.DropTableResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.netty.server.Session; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; @@ -733,6 +743,34 @@ public CompletableFuture alterClusterConfigs( return future; } + @Override + public CompletableFuture addServerTag(AddServerTagRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture removeServerTag( + RemoveServerTagRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture rebalance(RebalanceRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture cancelRebalance( + CancelRebalanceRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + @VisibleForTesting public DataLakeFormat getDataLakeFormat() { return lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index dfc2c004d0..749fdec2a0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -42,8 +42,10 @@ import org.apache.fluss.server.zk.data.DatabaseRegistration; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.RebalancePlan; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.server.zk.data.ResourceAcl; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -63,11 +65,13 @@ import org.apache.fluss.server.zk.data.ZkData.PartitionSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionsZNode; +import org.apache.fluss.server.zk.data.ZkData.RebalanceZNode; import org.apache.fluss.server.zk.data.ZkData.ResourceAclNode; import org.apache.fluss.server.zk.data.ZkData.SchemaZNode; import org.apache.fluss.server.zk.data.ZkData.SchemasZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdsZNode; +import org.apache.fluss.server.zk.data.ZkData.ServerTagsZNode; import org.apache.fluss.server.zk.data.ZkData.TableIdZNode; import org.apache.fluss.server.zk.data.ZkData.TableSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.TableZNode; @@ -1188,6 +1192,49 @@ public void insertConfigChangeNotification() throws Exception { ZkData.ConfigEntityChangeNotificationSequenceZNode.encode()); } + // -------------------------------------------------------------------------------------------- + // Maintenance + // -------------------------------------------------------------------------------------------- + + public void registerServerTags(ServerTags newServerTags) throws Exception { + String path = ServerTagsZNode.path(); + if (getOrEmpty(path).isPresent()) { + zkClient.setData().forPath(path, ServerTagsZNode.encode(newServerTags)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, ServerTagsZNode.encode(newServerTags)); + } + } + + public Optional getServerTags() throws Exception { + String path = ServerTagsZNode.path(); + return getOrEmpty(path).map(ServerTagsZNode::decode); + } + + public void registerRebalancePlan(RebalancePlan rebalancePlan) throws Exception { + String path = RebalanceZNode.path(); + if (getOrEmpty(path).isPresent()) { + zkClient.setData().forPath(path, RebalanceZNode.encode(rebalancePlan)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, RebalanceZNode.encode(rebalancePlan)); + } + } + + public Optional getRebalancePlan() throws Exception { + String path = RebalanceZNode.path(); + return getOrEmpty(path).map(RebalanceZNode::decode); + } + + public void deleteRebalancePlan() throws Exception { + String path = RebalanceZNode.path(); + deletePath(path); + } + // -------------------------------------------------------------------------------------------- // Utils // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java new file mode 100644 index 0000000000..00c87bcbc3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * The generated rebalance plan for this cluster. + * + *

The latest execution rebalance plan will be stored in {@link ZkData.RebalanceZNode}. + * + * @see RebalancePlanJsonSerde for json serialization and deserialization. + */ +public class RebalancePlan { + + /** A mapping from tableBucket to RebalancePlanForBuckets of none-partitioned table. */ + private final Map> planForBuckets; + + /** A mapping from tableBucket to RebalancePlanForBuckets of partitioned table. */ + private final Map> + planForBucketsOfPartitionedTable; + + public RebalancePlan(Map bucketPlan) { + this.planForBuckets = new HashMap<>(); + this.planForBucketsOfPartitionedTable = new HashMap<>(); + + for (Map.Entry entry : bucketPlan.entrySet()) { + TableBucket tableBucket = entry.getKey(); + RebalancePlanForBucket rebalancePlanForBucket = entry.getValue(); + if (tableBucket.getPartitionId() == null) { + planForBuckets + .computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>()) + .add(rebalancePlanForBucket); + } else { + TablePartition tp = + new TablePartition(tableBucket.getTableId(), tableBucket.getPartitionId()); + planForBucketsOfPartitionedTable + .computeIfAbsent(tp, k -> new ArrayList<>()) + .add(rebalancePlanForBucket); + } + } + } + + public Map> getPlanForBuckets() { + return planForBuckets; + } + + public Map> getPlanForBucketsOfPartitionedTable() { + return planForBucketsOfPartitionedTable; + } + + @Override + public String toString() { + return "RebalancePlan{" + + "planForBuckets=" + + planForBuckets + + ", planForBucketsOfPartitionedTable=" + + planForBucketsOfPartitionedTable + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RebalancePlan that = (RebalancePlan) o; + + if (!Objects.equals(planForBuckets, that.planForBuckets)) { + return false; + } + return Objects.equals( + planForBucketsOfPartitionedTable, that.planForBucketsOfPartitionedTable); + } + + @Override + public int hashCode() { + return Objects.hash(planForBuckets, planForBucketsOfPartitionedTable); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java new file mode 100644 index 0000000000..b79cd46e30 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** Json serializer and deserializer for {@link RebalancePlan}. */ +public class RebalancePlanJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final RebalancePlanJsonSerde INSTANCE = new RebalancePlanJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String REBALANCE_PLAN = "rebalance_plan"; + + private static final String TABLE_ID = "table_id"; + private static final String PARTITION_ID = "partition_id"; + + private static final String BUCKETS = "buckets"; + private static final String BUCKET_ID = "bucket_id"; + private static final String ORIGINAL_LEADER = "original_leader"; + private static final String NEW_LEADER = "new_leader"; + private static final String ORIGIN_REPLICAS = "origin_replicas"; + private static final String NEW_REPLICAS = "new_replicas"; + + private static final int VERSION = 1; + + @Override + public void serialize(RebalancePlan rebalancePlan, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + + generator.writeArrayFieldStart(REBALANCE_PLAN); + // first to write none-partitioned tables. + for (Map.Entry> entry : + rebalancePlan.getPlanForBuckets().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID, entry.getKey()); + generator.writeArrayFieldStart(BUCKETS); + for (RebalancePlanForBucket bucketPlan : entry.getValue()) { + serializeRebalancePlanForBucket(generator, bucketPlan); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + // then to write partitioned tables. + for (Map.Entry> entry : + rebalancePlan.getPlanForBucketsOfPartitionedTable().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID, entry.getKey().getTableId()); + generator.writeNumberField(PARTITION_ID, entry.getKey().getPartitionId()); + generator.writeArrayFieldStart(BUCKETS); + for (RebalancePlanForBucket bucketPlan : entry.getValue()) { + serializeRebalancePlanForBucket(generator, bucketPlan); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + generator.writeEndArray(); + + generator.writeEndObject(); + } + + @Override + public RebalancePlan deserialize(JsonNode node) { + JsonNode rebalancePlanNode = node.get(REBALANCE_PLAN); + Map planForBuckets = new HashMap<>(); + + for (JsonNode tablePartitionPlanNode : rebalancePlanNode) { + long tableId = tablePartitionPlanNode.get(TABLE_ID).asLong(); + + Long partitionId = null; + if (tablePartitionPlanNode.has(PARTITION_ID)) { + partitionId = tablePartitionPlanNode.get(PARTITION_ID).asLong(); + } + + JsonNode bucketPlanNodes = tablePartitionPlanNode.get(BUCKETS); + for (JsonNode bucketPlanNode : bucketPlanNodes) { + int bucketId = bucketPlanNode.get(BUCKET_ID).asInt(); + TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); + + int originLeader = bucketPlanNode.get(ORIGINAL_LEADER).asInt(); + + int newLeader = bucketPlanNode.get(NEW_LEADER).asInt(); + + List originReplicas = new ArrayList<>(); + Iterator elements = bucketPlanNode.get(ORIGIN_REPLICAS).elements(); + while (elements.hasNext()) { + originReplicas.add(elements.next().asInt()); + } + + List newReplicas = new ArrayList<>(); + elements = bucketPlanNode.get(NEW_REPLICAS).elements(); + while (elements.hasNext()) { + newReplicas.add(elements.next().asInt()); + } + + planForBuckets.put( + tableBucket, + new RebalancePlanForBucket( + tableBucket, originLeader, newLeader, originReplicas, newReplicas)); + } + } + + return new RebalancePlan(planForBuckets); + } + + private void serializeRebalancePlanForBucket( + JsonGenerator generator, RebalancePlanForBucket bucketPlan) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(BUCKET_ID, bucketPlan.getBucketId()); + generator.writeNumberField(ORIGINAL_LEADER, bucketPlan.getOriginalLeader()); + generator.writeNumberField(NEW_LEADER, bucketPlan.getNewLeader()); + generator.writeArrayFieldStart(ORIGIN_REPLICAS); + for (Integer replica : bucketPlan.getOriginReplicas()) { + generator.writeNumber(replica); + } + generator.writeEndArray(); + generator.writeArrayFieldStart(NEW_REPLICAS); + for (Integer replica : bucketPlan.getNewReplicas()) { + generator.writeNumber(replica); + } + generator.writeEndArray(); + generator.writeEndObject(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java new file mode 100644 index 0000000000..edddcaaf75 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; + +import java.util.Map; +import java.util.Objects; + +/** + * The latest {@link ServerTags} of tabletServers in {@link ZkData.ServerTagsZNode}. It is used to + * store the serverTags information in zookeeper. + * + * @see ServerTagsJsonSerde for json serialization and deserialization. + */ +public class ServerTags { + + // a mapping from tabletServer id to serverTag. + private final Map serverTags; + + public ServerTags(Map serverTags) { + this.serverTags = serverTags; + } + + public Map getServerTags() { + return serverTags; + } + + @Override + public String toString() { + return "ServerTags{" + "serverTags=" + serverTags + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerTags that = (ServerTags) o; + return Objects.equals(serverTags, that.serverTags); + } + + @Override + public int hashCode() { + return Objects.hash(serverTags); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java new file mode 100644 index 0000000000..7df94c74e4 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Json serializer and deserializer for {@link ServerTags}. */ +public class ServerTagsJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final ServerTagsJsonSerde INSTANCE = new ServerTagsJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String SERVER_TAGS = "server_tags"; + private static final int VERSION = 1; + + @Override + public void serialize(ServerTags serverTags, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeObjectFieldStart(SERVER_TAGS); + for (Map.Entry entry : serverTags.getServerTags().entrySet()) { + generator.writeNumberField(String.valueOf(entry.getKey()), entry.getValue().value); + } + generator.writeEndObject(); + + generator.writeEndObject(); + } + + @Override + public ServerTags deserialize(JsonNode node) { + JsonNode serverTagsNode = node.get(SERVER_TAGS); + Map serverTags = new HashMap<>(); + Iterator fieldNames = serverTagsNode.fieldNames(); + while (fieldNames.hasNext()) { + String serverId = fieldNames.next(); + serverTags.put( + Integer.valueOf(serverId), + ServerTag.valueOf(serverTagsNode.get(serverId).asInt())); + } + return new ServerTags(serverTags); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index bb33e17ff5..4798623a74 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -613,6 +613,25 @@ public static LakeTable decode(byte[] json) { } } + /** + * The znode for server tags. The znode path is: + * + *

/tabletServers/server_tags + */ + public static final class ServerTagsZNode { + public static String path() { + return "/tabletservers/server_tags"; + } + + public static byte[] encode(ServerTags serverTag) { + return JsonSerdeUtils.writeValueAsBytes(serverTag, ServerTagsJsonSerde.INSTANCE); + } + + public static ServerTags decode(byte[] json) { + return JsonSerdeUtils.readValue(json, ServerTagsJsonSerde.INSTANCE); + } + } + // ------------------------------------------------------------------------------------------ // ZNodes for ACL(Access Control List). // ------------------------------------------------------------------------------------------ @@ -776,4 +795,27 @@ public static byte[] encode() { return new byte[0]; } } + + // ------------------------------------------------------------------------------------------ + // ZNodes under "/cluster/" + // ------------------------------------------------------------------------------------------ + + /** + * The znode for rebalance. The znode path is: + * + *

/cluster/rebalance + */ + public static final class RebalanceZNode { + public static String path() { + return "/cluster/rebalance"; + } + + public static byte[] encode(RebalancePlan rebalancePlan) { + return JsonSerdeUtils.writeValueAsBytes(rebalancePlan, RebalancePlanJsonSerde.INSTANCE); + } + + public static RebalancePlan decode(byte[] json) { + return JsonSerdeUtils.readValue(json, RebalancePlanJsonSerde.INSTANCE); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index be15c4d151..ffba559fa0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -22,6 +22,8 @@ import org.apache.fluss.exception.NetworkException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; @@ -30,6 +32,8 @@ import org.apache.fluss.rpc.messages.AlterTableResponse; import org.apache.fluss.rpc.messages.ApiVersionsRequest; import org.apache.fluss.rpc.messages.ApiVersionsResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -80,10 +84,16 @@ import org.apache.fluss.rpc.messages.ListDatabasesResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -339,6 +349,34 @@ public CompletableFuture lakeTieringHeartbeat( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture addServerTag(AddServerTagRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture removeServerTag( + RemoveServerTagRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture rebalance(RebalanceRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture cancelRebalance( + CancelRebalanceRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture controlledShutdown( ControlledShutdownRequest request) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index f42d381faf..f46acaf30d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -19,6 +19,8 @@ import org.apache.fluss.cluster.Endpoint; import org.apache.fluss.cluster.TabletServerInfo; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.Schema; @@ -33,6 +35,8 @@ import org.apache.fluss.server.zk.data.CoordinatorAddress; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -552,6 +556,80 @@ void testPartition() throws Exception { assertThat(partitions).containsExactly("p2"); } + @Test + void testServerTag() throws Exception { + Map serverTags = new HashMap<>(); + serverTags.put(0, ServerTag.PERMANENT_OFFLINE); + serverTags.put(1, ServerTag.TEMPORARY_OFFLINE); + + zookeeperClient.registerServerTags(new ServerTags(serverTags)); + assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); + + // update server tags. + serverTags.put(0, ServerTag.TEMPORARY_OFFLINE); + serverTags.remove(1); + zookeeperClient.registerServerTags(new ServerTags(serverTags)); + assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); + + zookeeperClient.registerServerTags(new ServerTags(Collections.emptyMap())); + assertThat(zookeeperClient.getServerTags()) + .hasValue(new ServerTags(Collections.emptyMap())); + } + + @Test + void testRebalancePlan() throws Exception { + Map bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(0L, 1), + new RebalancePlanForBucket( + new TableBucket(0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + bucketPlan.put( + new TableBucket(1L, 1L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 0), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + bucketPlan.put( + new TableBucket(1L, 1L, 1), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + + bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + + zookeeperClient.deleteRebalancePlan(); + assertThat(zookeeperClient.getRebalancePlan()).isEmpty(); + } + @Test void testZookeeperConfigPath() throws Exception { final Configuration config = new Configuration(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java new file mode 100644 index 0000000000..64b6da43e5 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link RebalancePlanJsonSerde}. */ +public class RebalancePlanJsonSerdeTest extends JsonSerdeTestBase { + + RebalancePlanJsonSerdeTest() { + super(RebalancePlanJsonSerde.INSTANCE); + } + + @Override + protected RebalancePlan[] createObjects() { + Map bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(0L, 1), + new RebalancePlanForBucket( + new TableBucket(0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + + bucketPlan.put( + new TableBucket(1L, 0L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(1L, 0L, 1), + new RebalancePlanForBucket( + new TableBucket(1L, 0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + + bucketPlan.put( + new TableBucket(1L, 1L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + return new RebalancePlan[] {new RebalancePlan(bucketPlan)}; + } + + @Override + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"rebalance_plan\":" + + "[{\"table_id\":0,\"buckets\":" + + "[{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}," + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}," + + "{\"table_id\":1,\"partition_id\":0,\"buckets\":[" + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}," + + "{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}]}," + + "{\"table_id\":1,\"partition_id\":1,\"buckets\":[" + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}]}" + }; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java new file mode 100644 index 0000000000..8dd4e7f454 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link ServerTagsJsonSerde}. */ +public class ServerTagsJsonSerdeTest extends JsonSerdeTestBase { + + ServerTagsJsonSerdeTest() { + super(ServerTagsJsonSerde.INSTANCE); + } + + @Override + protected ServerTags[] createObjects() { + Map serverTags = new HashMap<>(); + serverTags.put(0, ServerTag.PERMANENT_OFFLINE); + serverTags.put(1, ServerTag.TEMPORARY_OFFLINE); + + Map serverTags2 = new HashMap<>(); + + return new ServerTags[] {new ServerTags(serverTags), new ServerTags(serverTags2)}; + } + + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"server_tags\":{\"0\":0,\"1\":1}}", + "{\"version\":1,\"server_tags\":{}}" + }; + } +} From 5255b80cd6777071a44e800a625d89704f9143ec Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Thu, 24 Jul 2025 10:42:11 +0800 Subject: [PATCH 2/2] [server] Support AddServerTag and RemoveServerTag --- .../apache/fluss/client/admin/FlussAdmin.java | 10 +- .../fluss/client/admin/FlussAdminITCase.java | 64 ++++++++++ .../fluss/cluster/rebalance/ServerTag.java | 6 + .../coordinator/CoordinatorContext.java | 25 ++++ .../CoordinatorEventProcessor.java | 109 ++++++++++++++++++ .../coordinator/CoordinatorService.java | 28 ++++- .../coordinator/event/AddServerTagEvent.java | 52 +++++++++ .../event/RemoveServerTagEvent.java | 52 +++++++++ 8 files changed, 342 insertions(+), 4 deletions(-) create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index cdf9a037f8..c9b5094747 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -48,6 +48,7 @@ import org.apache.fluss.rpc.gateway.AdminGateway; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; import org.apache.fluss.rpc.messages.AlterTableRequest; import org.apache.fluss.rpc.messages.CreateAclsRequest; @@ -76,6 +77,7 @@ import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; import org.apache.fluss.rpc.messages.PbPartitionSpec; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -537,13 +539,17 @@ public CompletableFuture alterClusterConfigs(Collection confi @Override public CompletableFuture addServerTag(List tabletServers, ServerTag serverTag) { - throw new UnsupportedOperationException("Support soon"); + AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value); + tabletServers.forEach(request::addServerId); + return gateway.addServerTag(request).thenApply(r -> null); } @Override public CompletableFuture removeServerTag( List tabletServers, ServerTag serverTag) { - throw new UnsupportedOperationException("Support soon"); + RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value); + tabletServers.forEach(request::addServerId); + return gateway.removeServerTag(request).thenApply(r -> null); } @Override diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 62026ca2f5..5c04745e8e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -24,6 +24,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.AutoPartitionTimeUnit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -42,6 +43,9 @@ import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.SchemaNotExistException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TableNotPartitionedException; import org.apache.fluss.exception.TooManyBucketsException; @@ -64,6 +68,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle; +import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.BeforeEach; @@ -1436,4 +1441,63 @@ public void testSystemsColumns() throws Exception { + "Please use other names for these columns. " + "The reserved system columns are: __offset, __timestamp, __bucket"); } + + @Test + public void testAddAndRemoveServerTags() throws Exception { + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + // 1.add server tag to a none exists server. + assertThatThrownBy( + () -> + admin.addServerTag( + Collections.singletonList(100), + ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerNotExistException.class) + .hasMessageContaining("Server 100 not exists when trying to add server tag."); + + // 2.add server tag for server 0,1. + admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get(); + // TODO use api to get serverTags instead of getting from zk directly + assertThat(zkClient.getServerTags()).isPresent(); + assertThat(zkClient.getServerTags().get().getServerTags()) + .containsEntry(0, ServerTag.PERMANENT_OFFLINE) + .containsEntry(1, ServerTag.PERMANENT_OFFLINE); + + // 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added. + assertThatThrownBy( + () -> + admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerTagAlreadyExistException.class) + .hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0."); + + // 4.remove server tag for server 100 + assertThatThrownBy( + () -> + admin.removeServerTag( + Collections.singletonList(100), + ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerNotExistException.class) + .hasMessageContaining("Server 100 not exists when trying to removing server tag."); + + // 5.remove server tag for server 0,1. + admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get(); + assertThat(zkClient.getServerTags()).isPresent(); + assertThat(zkClient.getServerTags().get().getServerTags()).isEmpty(); + + // 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed. + assertThatThrownBy( + () -> + admin.removeServerTag( + Collections.singletonList(0), + ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerTagNotExistException.class) + .hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0."); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java index 66849060d4..5e20b34f73 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java @@ -28,7 +28,13 @@ */ @PublicEvolving public enum ServerTag { + /** + * The tabletServer is permanently offline. Such as the host where the tabletServer on is + * upcoming decommissioning. + */ PERMANENT_OFFLINE(0), + + /** The tabletServer is temporarily offline. Such as the tabletServer is upcoming upgrading. */ TEMPORARY_OFFLINE(1); public final int value; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index a5f34aa17f..e811e01456 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.coordinator; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableBucketReplica; @@ -102,6 +103,9 @@ public class CoordinatorContext { */ private final Map> replicasOnOffline = new HashMap<>(); + /** A mapping from tabletServers to server tag. */ + private final Map serverTags = new HashMap<>(); + private ServerInfo coordinatorServerInfo = null; private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; @@ -635,6 +639,26 @@ public void removePartition(TablePartition tablePartition) { } } + public void initSeverTags(Map initialServerTags) { + serverTags.putAll(initialServerTags); + } + + public void putServerTag(int serverId, ServerTag serverTag) { + serverTags.put(serverId, serverTag); + } + + public Map getServerTags() { + return new HashMap<>(serverTags); + } + + public Optional getServerTag(int serverId) { + return Optional.ofNullable(serverTags.get(serverId)); + } + + public void removeServerTag(int serverId) { + serverTags.remove(serverId); + } + private void clearTablesState() { tableAssignments.clear(); partitionAssignments.clear(); @@ -656,6 +680,7 @@ public void resetContext() { // clear the live tablet servers liveTabletServers.clear(); shuttingDownTabletServers.clear(); + serverTags.clear(); } public int getTotalPartitionCount() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 4a6fd89f06..3811b781c4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -21,6 +21,7 @@ import org.apache.fluss.cluster.Endpoint; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FencedLeaderEpochException; @@ -28,8 +29,12 @@ import org.apache.fluss.exception.IneligibleReplicaException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidUpdateVersionException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TabletServerNotAvailableException; +import org.apache.fluss.exception.UnknownServerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaInfo; @@ -38,14 +43,17 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; import org.apache.fluss.rpc.messages.ControlledShutdownResponse; import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.server.coordinator.event.AccessContextEvent; +import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; @@ -65,6 +73,7 @@ import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent; import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent; import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; +import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; @@ -86,6 +95,7 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TabletServerRegistration; import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; @@ -326,6 +336,11 @@ private void initCoordinatorContext() throws Exception { // init tablet server channels coordinatorChannelManager.startup(internalServerNodes); + // load server tags. + zooKeeperClient + .getServerTags() + .ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags())); + // load all tables long start4loadTables = System.currentTimeMillis(); List autoPartitionTables = new ArrayList<>(); @@ -553,6 +568,16 @@ public void process(CoordinatorEvent event) { completeFromCallable( controlledShutdownEvent.getRespCallback(), () -> tryProcessControlledShutdown(controlledShutdownEvent)); + } else if (event instanceof AddServerTagEvent) { + AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event; + completeFromCallable( + addServerTagEvent.getRespCallback(), + () -> processAddServerTag(addServerTagEvent)); + } else if (event instanceof RemoveServerTagEvent) { + RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event; + completeFromCallable( + removeServerTagEvent.getRespCallback(), + () -> processRemoveServerTag(removeServerTagEvent)); } else if (event instanceof AccessContextEvent) { AccessContextEvent accessContextEvent = (AccessContextEvent) event; processAccessContext(accessContextEvent); @@ -973,6 +998,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader); } + private AddServerTagResponse processAddServerTag(AddServerTagEvent event) { + AddServerTagResponse addServerTagResponse = new AddServerTagResponse(); + List serverIds = event.getServerIds(); + ServerTag serverTag = event.getServerTag(); + + // Verify that dose serverTag exist for input serverIds. If any of them exists, throw + // an error and none of them will be written to coordinatorContext and zk. + Map liveTabletServers = coordinatorContext.getLiveTabletServers(); + for (Integer serverId : serverIds) { + if (!liveTabletServers.containsKey(serverId)) { + throw new ServerNotExistException( + String.format( + "Server %s not exists when trying to add server tag.", serverId)); + } + + if (coordinatorContext.getServerTag(serverId).isPresent()) { + throw new ServerTagAlreadyExistException( + String.format( + "Server tag %s already exists for server %s.", + serverTag, serverId)); + } + } + + // First register to zk, and then update coordinatorContext. + Map serverTags = coordinatorContext.getServerTags(); + for (Integer serverId : serverIds) { + serverTags.put(serverId, serverTag); + } + + try { + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); + } catch (Exception e) { + LOG.error("Error when register server tags to zookeeper.", e); + throw new UnknownServerException("Error when register server tags to zookeeper.", e); + } + + // Then update coordinatorContext. + serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag)); + + return addServerTagResponse; + } + + private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) { + RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse(); + List serverIds = event.getServerIds(); + ServerTag serverTag = event.getServerTag(); + + // Verify that dose serverTag not exist for input serverIds. If any of them not exists, + // throw an error and none of them will be removed form coordinatorContext and zk. + Map liveTabletServers = coordinatorContext.getLiveTabletServers(); + for (Integer serverId : serverIds) { + if (!liveTabletServers.containsKey(serverId)) { + throw new ServerNotExistException( + String.format( + "Server %s not exists when trying to removing server tag.", + serverId)); + } + + if (!coordinatorContext.getServerTag(serverId).isPresent()) { + throw new ServerTagNotExistException( + String.format( + "Server tag %s not exists for server %s.", serverTag, serverId)); + } + } + + // First register to zk, and then update coordinatorContext. + Map serverTags = coordinatorContext.getServerTags(); + for (Integer serverId : serverIds) { + serverTags.remove(serverId); + } + + try { + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); + } catch (Exception e) { + LOG.error("Error when register server tags to zookeeper.", e); + throw new UnknownServerException("Error when register server tags to zookeeper.", e); + } + + // Then update coordinatorContext. + serverIds.forEach(coordinatorContext::removeServerTag); + + return removeServerTagResponse; + } + private List tryProcessAdjustIsr( Map leaderAndIsrList) { // TODO verify leader epoch. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 95d98e4f94..ffc32ff41d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.cluster.TabletServerInfo; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.cluster.AlterConfig; @@ -104,12 +105,14 @@ import org.apache.fluss.server.authorizer.AclDeleteResult; import org.apache.fluss.server.authorizer.Authorizer; import org.apache.fluss.server.coordinator.event.AccessContextEvent; +import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent; import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.LakeTieringTableInfo; import org.apache.fluss.server.entity.TablePropertyChanges; @@ -128,6 +131,7 @@ import javax.annotation.Nullable; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -745,13 +749,33 @@ public CompletableFuture alterClusterConfigs( @Override public CompletableFuture addServerTag(AddServerTagRequest request) { - throw new UnsupportedOperationException("Support soon!"); + CompletableFuture response = new CompletableFuture<>(); + eventManagerSupplier + .get() + .put( + new AddServerTagEvent( + Arrays.stream(request.getServerIds()) + .boxed() + .collect(Collectors.toList()), + ServerTag.valueOf(request.getServerTag()), + response)); + return response; } @Override public CompletableFuture removeServerTag( RemoveServerTagRequest request) { - throw new UnsupportedOperationException("Support soon!"); + CompletableFuture response = new CompletableFuture<>(); + eventManagerSupplier + .get() + .put( + new RemoveServerTagEvent( + Arrays.stream(request.getServerIds()) + .boxed() + .collect(Collectors.toList()), + ServerTag.valueOf(request.getServerTag()), + response)); + return response; } @Override diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java new file mode 100644 index 0000000000..b6e7af8886 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.rpc.messages.AddServerTagResponse; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** An event for add server tag. */ +public class AddServerTagEvent implements CoordinatorEvent { + private final List serverIds; + private final ServerTag serverTag; + private final CompletableFuture respCallback; + + public AddServerTagEvent( + List serverIds, + ServerTag serverTag, + CompletableFuture respCallback) { + this.serverIds = serverIds; + this.serverTag = serverTag; + this.respCallback = respCallback; + } + + public List getServerIds() { + return serverIds; + } + + public ServerTag getServerTag() { + return serverTag; + } + + public CompletableFuture getRespCallback() { + return respCallback; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java new file mode 100644 index 0000000000..ede6fdeb0c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** An event for remove server tag. */ +public class RemoveServerTagEvent implements CoordinatorEvent { + private final List serverIds; + private final ServerTag serverTag; + private final CompletableFuture respCallback; + + public RemoveServerTagEvent( + List serverIds, + ServerTag serverTag, + CompletableFuture respCallback) { + this.serverIds = serverIds; + this.serverTag = serverTag; + this.respCallback = respCallback; + } + + public List getServerIds() { + return serverIds; + } + + public ServerTag getServerTag() { + return serverTag; + } + + public CompletableFuture getRespCallback() { + return respCallback; + } +}