From 1ea02c2b5cf421ccd36af79ba6073bb412771807 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Mon, 21 Jul 2025 19:52:38 +0800 Subject: [PATCH 1/3] [server] Introduce new rebalance API --- .../org/apache/fluss/client/admin/Admin.java | 97 +++++++++++ .../apache/fluss/client/admin/FlussAdmin.java | 31 ++++ .../fluss/cluster/rebalance/GoalType.java | 60 +++++++ .../rebalance/RebalancePlanForBucket.java | 112 +++++++++++++ .../rebalance/RebalanceResultForBucket.java | 77 +++++++++ .../rebalance/RebalanceStatusForBucket.java | 48 ++++++ .../fluss/cluster/rebalance/ServerTag.java | 52 ++++++ .../NoRebalanceInProgressException.java | 34 ++++ .../exception/RebalanceFailureException.java | 38 +++++ .../exception/ServerNotExistException.java | 34 ++++ .../ServerTagAlreadyExistException.java | 34 ++++ .../exception/ServerTagNotExistException.java | 35 ++++ .../fluss/rpc/gateway/AdminGateway.java | 26 +++ .../apache/fluss/rpc/protocol/ApiKeys.java | 7 +- .../org/apache/fluss/rpc/protocol/Errors.java | 14 +- fluss-rpc/src/main/proto/FlussApi.proto | 75 +++++++++ .../coordinator/CoordinatorService.java | 38 +++++ .../fluss/server/zk/ZooKeeperClient.java | 47 ++++++ .../fluss/server/zk/data/RebalancePlan.java | 107 ++++++++++++ .../zk/data/RebalancePlanJsonSerde.java | 156 ++++++++++++++++++ .../fluss/server/zk/data/ServerTags.java | 65 ++++++++ .../server/zk/data/ServerTagsJsonSerde.java | 67 ++++++++ .../apache/fluss/server/zk/data/ZkData.java | 42 +++++ .../coordinator/TestCoordinatorGateway.java | 38 +++++ .../fluss/server/zk/ZooKeeperClientTest.java | 78 +++++++++ .../zk/data/RebalancePlanJsonSerdeTest.java | 97 +++++++++++ .../zk/data/ServerTagsJsonSerdeTest.java | 50 ++++++ 27 files changed, 1557 insertions(+), 2 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index 1ef5c30ebd..feca0e5f68 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -22,9 +22,14 @@ import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.AuthorizationException; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -35,10 +40,15 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.KvSnapshotNotExistException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.SchemaNotExistException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TableNotPartitionedException; @@ -60,6 +70,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -492,4 +503,90 @@ ListOffsetsResult listOffsets( * @return A CompletableFuture indicating completion of the operation. */ CompletableFuture alterClusterConfigs(Collection configs); + + /** + * Add server tag to the specified tabletServers, one tabletServer can only have one serverTag. + * + *

If one tabletServer failed adding tag, none of the tags will take effect. + * + *

+ * + * @param tabletServers the tabletServers we want to add server tags. + * @param serverTag the server tag to be added. + */ + CompletableFuture addServerTag(List tabletServers, ServerTag serverTag); + + /** + * Remove server tag from the specified tabletServers. + * + *

If one tabletServer failed removing tag, none of the tags will be removed. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not + * exist. + *
  • {@link ServerTagNotExistException} If the server tag does not exist for any one of the + * tabletServers. + *
+ * + * @param tabletServers the tabletServers we want to remove server tags. + */ + CompletableFuture removeServerTag(List tabletServers, ServerTag serverTag); + + /** + * Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's + * bucket load. + * + *

More details, Fluss collects the cluster's load information and optimizes to perform load + * balancing according to the user-defined {@code priorityGoals}. + * + *

Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted + * rebalance task exists, an {@link RebalanceFailureException} will be thrown. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing + * execution. + *
+ * + * @param priorityGoals the goals to be optimized. + * @param dryRun Calculate and return the rebalance optimization proposal, but do not execute + * it. + * @return the generated rebalance plan for all the tableBuckets which need to do rebalance. + */ + CompletableFuture> rebalance( + List priorityGoals, boolean dryRun); + + /** + * List the rebalance process. + * + *
    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. + *
+ * + * @return the rebalance process for all the tableBuckets doing rebalance. + */ + CompletableFuture> listRebalanceProcess(); + + /** + * Cannel the rebalance task. + * + *
    + *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster + * permissions. + *
  • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. + *
+ */ + CompletableFuture cancelRebalance(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 27124c70fd..cdf9a037f8 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -24,6 +24,10 @@ import org.apache.fluss.client.utils.ClientRpcMessageUtils; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; import org.apache.fluss.exception.LeaderNotAvailableException; @@ -531,6 +535,33 @@ public CompletableFuture alterClusterConfigs(Collection confi return future; } + @Override + public CompletableFuture addServerTag(List tabletServers, ServerTag serverTag) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture removeServerTag( + List tabletServers, ServerTag serverTag) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture> rebalance( + List priorityGoals, boolean dryRun) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture> listRebalanceProcess() { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture cancelRebalance() { + throw new UnsupportedOperationException("Support soon"); + } + @Override public void close() { // nothing to do yet diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java new file mode 100644 index 0000000000..6dc7624805 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Arrays; + +/** + * The type of goal to optimize. + * + * @since 0.8 + */ +@PublicEvolving +public enum GoalType { + /** + * Goal to generate replica movement tasks to ensure that the number of replicas on each + * tabletServer is near balanced. + */ + REPLICA_DISTRIBUTION_GOAL(0), + + /** + * Goal to generate leadership movement and leader replica movement tasks to ensure that the + * number of leader replicas on each tabletServer is near balanced. + */ + LEADER_REPLICA_DISTRIBUTION_GOAL(1); + + public final int value; + + GoalType(int value) { + this.value = value; + } + + public static GoalType valueOf(int value) { + if (value == REPLICA_DISTRIBUTION_GOAL.value) { + return REPLICA_DISTRIBUTION_GOAL; + } else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) { + return LEADER_REPLICA_DISTRIBUTION_GOAL; + } else { + throw new IllegalArgumentException( + String.format( + "Value %s must be one of %s", value, Arrays.asList(GoalType.values()))); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java new file mode 100644 index 0000000000..bedce22663 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.List; +import java.util.Objects; + +/** + * a generated rebalance plan for a tableBucket. + * + * @since 0.8 + */ +@PublicEvolving +public class RebalancePlanForBucket { + private final TableBucket tableBucket; + private final int originalLeader; + private final int newLeader; + private final List originReplicas; + private final List newReplicas; + + public RebalancePlanForBucket( + TableBucket tableBucket, + int originalLeader, + int newLeader, + List originReplicas, + List newReplicas) { + this.tableBucket = tableBucket; + this.originalLeader = originalLeader; + this.newLeader = newLeader; + this.originReplicas = originReplicas; + this.newReplicas = newReplicas; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public int getBucketId() { + return tableBucket.getBucket(); + } + + public Integer getOriginalLeader() { + return originalLeader; + } + + public Integer getNewLeader() { + return newLeader; + } + + public List getOriginReplicas() { + return originReplicas; + } + + public List getNewReplicas() { + return newReplicas; + } + + @Override + public String toString() { + return "RebalancePlanForBucket{" + + "tableBucket=" + + tableBucket + + ", originalLeader=" + + originalLeader + + ", newLeader=" + + newLeader + + ", originReplicas=" + + originReplicas + + ", newReplicas=" + + newReplicas + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RebalancePlanForBucket that = (RebalancePlanForBucket) o; + return Objects.equals(tableBucket, that.tableBucket) + && originalLeader == that.originalLeader + && newLeader == that.newLeader + && Objects.equals(originReplicas, that.originReplicas) + && Objects.equals(newReplicas, that.newReplicas); + } + + @Override + public int hashCode() { + return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java new file mode 100644 index 0000000000..c477e524b3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.List; + +/** + * Status of rebalance process for a tabletBucket. + * + * @since 0.8 + */ +@PublicEvolving +public class RebalanceResultForBucket { + private final RebalancePlanForBucket rebalancePlanForBucket; + private RebalanceStatusForBucket rebalanceStatusForBucket; + + public RebalanceResultForBucket( + RebalancePlanForBucket rebalancePlanForBucket, + RebalanceStatusForBucket rebalanceStatusForBucket) { + this.rebalancePlanForBucket = rebalancePlanForBucket; + this.rebalanceStatusForBucket = rebalanceStatusForBucket; + } + + public TableBucket tableBucket() { + return rebalancePlanForBucket.getTableBucket(); + } + + public RebalancePlanForBucket planForBucket() { + return rebalancePlanForBucket; + } + + public List newReplicas() { + return rebalancePlanForBucket.getNewReplicas(); + } + + public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) { + this.rebalanceStatusForBucket = status; + return this; + } + + public RebalanceStatusForBucket status() { + return rebalanceStatusForBucket; + } + + public static RebalanceResultForBucket of( + RebalancePlanForBucket planForBucket, RebalanceStatusForBucket status) { + return new RebalanceResultForBucket(planForBucket, status); + } + + @Override + public String toString() { + return "RebalanceResultForBucket{" + + "rebalancePlanForBucket=" + + rebalancePlanForBucket + + ", rebalanceStatusForBucket=" + + rebalanceStatusForBucket + + '}'; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java new file mode 100644 index 0000000000..e8c0e46733 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Rebalance status for single bucket. + * + * @since 0.8 + */ +@PublicEvolving +public enum RebalanceStatusForBucket { + PENDING(1), + REBALANCING(2), + FAILED(3), + COMPLETED(4); + + private final int code; + + RebalanceStatusForBucket(int code) { + this.code = code; + } + + public static RebalanceStatusForBucket of(int code) { + for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) { + if (status.code == code) { + return status; + } + } + return null; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java new file mode 100644 index 0000000000..66849060d4 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Arrays; + +/** + * The tag of tabletServer. + * + * @since 0.8 + */ +@PublicEvolving +public enum ServerTag { + PERMANENT_OFFLINE(0), + TEMPORARY_OFFLINE(1); + + public final int value; + + ServerTag(int value) { + this.value = value; + } + + public static ServerTag valueOf(int value) { + if (value == PERMANENT_OFFLINE.value) { + return PERMANENT_OFFLINE; + } else if (value == TEMPORARY_OFFLINE.value) { + return TEMPORARY_OFFLINE; + } else { + throw new IllegalArgumentException( + String.format( + "Value %s must be one of %s", + value, Arrays.asList(ServerTag.values()))); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java new file mode 100644 index 0000000000..8b052a5100 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if there are no rebalance tasks in progress when list rebalance process. + * + * @since 0.8 + */ +@PublicEvolving +public class NoRebalanceInProgressException extends ApiException { + private static final long serialVersionUID = 1L; + + public NoRebalanceInProgressException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java new file mode 100644 index 0000000000..0dcf260b0c --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.VisibleForTesting; + +/** + * This exception is thrown if rebalance failed. + * + * @since 0.8 + */ +@VisibleForTesting +public class RebalanceFailureException extends ApiException { + private static final long serialVersionUID = 1L; + + public RebalanceFailureException(String message) { + super(message); + } + + public RebalanceFailureException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java new file mode 100644 index 0000000000..2bdbe621e8 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server does not exist in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerNotExistException extends ApiException { + private static final long serialVersionUID = 1L; + + public ServerNotExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java new file mode 100644 index 0000000000..a3d4259b13 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server tag already exists for specify tabletServer in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerTagAlreadyExistException extends ApiException { + private static final long serialVersionUID = 1L; + + public ServerTagAlreadyExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java new file mode 100644 index 0000000000..bd62672c72 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server tag not exist for specify tabletServer in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerTagNotExistException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ServerTagNotExistException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index 7b322ff67e..996342dfcb 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -17,10 +17,14 @@ package org.apache.fluss.rpc.gateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse; import org.apache.fluss.rpc.messages.AlterTableRequest; import org.apache.fluss.rpc.messages.AlterTableResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; @@ -37,6 +41,12 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.rpc.protocol.RPC; @@ -120,6 +130,22 @@ public interface AdminGateway extends AdminReadOnlyGateway { CompletableFuture alterClusterConfigs( AlterClusterConfigsRequest request); + @RPC(api = ApiKeys.ADD_SERVER_TAG) + CompletableFuture addServerTag(AddServerTagRequest request); + + @RPC(api = ApiKeys.REMOVE_SERVER_TAG) + CompletableFuture removeServerTag(RemoveServerTagRequest request); + + @RPC(api = ApiKeys.REBALANCE) + CompletableFuture rebalance(RebalanceRequest request); + + @RPC(api = ApiKeys.LIST_REBALANCE_PROCESS) + CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request); + + @RPC(api = ApiKeys.CANCEL_REBALANCE) + CompletableFuture cancelRebalance(CancelRebalanceRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 8acec7a36d..97e14d79f0 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -74,7 +74,12 @@ public enum ApiKeys { CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE), ALTER_TABLE(1044, 0, 0, PUBLIC), DESCRIBE_CLUSTER_CONFIGS(1045, 0, 0, PUBLIC), - ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC); + ALTER_CLUSTER_CONFIGS(1046, 0, 0, PUBLIC), + ADD_SERVER_TAG(1047, 0, 0, PUBLIC), + REMOVE_SERVER_TAG(1048, 0, 0, PUBLIC), + REBALANCE(1049, 0, 0, PUBLIC), + LIST_REBALANCE_PROCESS(1050, 0, 0, PUBLIC), + CANCEL_REBALANCE(1051, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 4b49a58475..5ee652cce2 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -52,6 +52,7 @@ import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NetworkException; +import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.NotEnoughReplicasAfterAppendException; import org.apache.fluss.exception.NotEnoughReplicasException; @@ -60,11 +61,15 @@ import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.SecurityTokenException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.StorageException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; @@ -228,7 +233,14 @@ public enum Errors { INVALID_ALTER_TABLE_EXCEPTION( 56, "The alter table is invalid.", InvalidAlterTableException::new), DELETION_DISABLED_EXCEPTION( - 57, "Deletion operations are disabled on this table.", DeletionDisabledException::new); + 57, "Deletion operations are disabled on this table.", DeletionDisabledException::new), + SERVER_NOT_EXIST_EXCEPTION(58, "The server is not exist.", ServerNotExistException::new), + SEVER_TAG_ALREADY_EXIST_EXCEPTION( + 59, "The server tag already exist.", ServerTagAlreadyExistException::new), + SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new), + REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new), + NO_REBALANCE_IN_PROGRESS_EXCEPTION( + 62, "No rebalance task in progress.", NoRebalanceInProgressException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index d2917734ce..891a295d16 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -574,6 +574,44 @@ message AlterClusterConfigsRequest{ message AlterClusterConfigsResponse{ } +message AddServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message AddServerTagResponse { +} + +message RemoveServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message RemoveServerTagResponse { +} + +message RebalanceRequest { + repeated int32 goals = 1 [packed = true]; + required bool dry_run = 2; +} + +message RebalanceResponse { + repeated PbRebalancePlanForTable plan_for_table = 1; +} + +message ListRebalanceProcessRequest { +} + +message ListRebalanceProcessResponse { + repeated PbRebalanceProcessForTable process_for_table = 1; +} + +message CancelRebalanceRequest { +} + +message CancelRebalanceResponse { +} + // --------------- Inner classes ---------------- @@ -955,3 +993,40 @@ message PbDescribeConfig { required string config_source = 3; } +message PbRebalancePlanForTable { + required int64 table_id = 1; + repeated PbRebalancePlanForPartition partitions_plan = 2; // for none-partition table, this is empty + repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, this is empty + +} + +message PbRebalancePlanForPartition { + required int64 partition_id = 1; + repeated PbRebalancePlanForBucket buckets_plan = 2; +} + +message PbRebalancePlanForBucket { + required int32 bucket_id = 1; + optional int32 original_leader = 2; + optional int32 new_leader = 3; + repeated int32 original_replicas = 4 [packed = true]; + repeated int32 new_replicas = 5 [packed = true]; +} + +message PbRebalanceProcessForTable { + required int64 table_id = 1; + repeated PbRebalanceProcessForPartition partitions_process = 2; + repeated PbRebalanceProcessForBucket buckets_process = 3; +} + +message PbRebalanceProcessForPartition { + required int64 partition_id = 1; + repeated PbRebalanceProcessForBucket buckets_process = 2; +} + +message PbRebalanceProcessForBucket { + required int32 bucket_id = 1; + repeated int32 original_replicas = 2 [packed = true]; + repeated int32 new_replicas = 3 [packed = true]; + required int32 rebalance_status = 4; +} \ No newline at end of file diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 4239b61679..7633df59d4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -44,12 +44,16 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse; import org.apache.fluss.rpc.messages.AlterTableRequest; import org.apache.fluss.rpc.messages.AlterTableResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -76,11 +80,17 @@ import org.apache.fluss.rpc.messages.DropTableResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.netty.server.Session; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; @@ -740,6 +750,34 @@ public CompletableFuture alterClusterConfigs( return future; } + @Override + public CompletableFuture addServerTag(AddServerTagRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture removeServerTag( + RemoveServerTagRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture rebalance(RebalanceRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture cancelRebalance( + CancelRebalanceRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + @VisibleForTesting public DataLakeFormat getDataLakeFormat() { return lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index dfc2c004d0..749fdec2a0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -42,8 +42,10 @@ import org.apache.fluss.server.zk.data.DatabaseRegistration; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.RebalancePlan; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.server.zk.data.ResourceAcl; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -63,11 +65,13 @@ import org.apache.fluss.server.zk.data.ZkData.PartitionSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionsZNode; +import org.apache.fluss.server.zk.data.ZkData.RebalanceZNode; import org.apache.fluss.server.zk.data.ZkData.ResourceAclNode; import org.apache.fluss.server.zk.data.ZkData.SchemaZNode; import org.apache.fluss.server.zk.data.ZkData.SchemasZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdsZNode; +import org.apache.fluss.server.zk.data.ZkData.ServerTagsZNode; import org.apache.fluss.server.zk.data.ZkData.TableIdZNode; import org.apache.fluss.server.zk.data.ZkData.TableSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.TableZNode; @@ -1188,6 +1192,49 @@ public void insertConfigChangeNotification() throws Exception { ZkData.ConfigEntityChangeNotificationSequenceZNode.encode()); } + // -------------------------------------------------------------------------------------------- + // Maintenance + // -------------------------------------------------------------------------------------------- + + public void registerServerTags(ServerTags newServerTags) throws Exception { + String path = ServerTagsZNode.path(); + if (getOrEmpty(path).isPresent()) { + zkClient.setData().forPath(path, ServerTagsZNode.encode(newServerTags)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, ServerTagsZNode.encode(newServerTags)); + } + } + + public Optional getServerTags() throws Exception { + String path = ServerTagsZNode.path(); + return getOrEmpty(path).map(ServerTagsZNode::decode); + } + + public void registerRebalancePlan(RebalancePlan rebalancePlan) throws Exception { + String path = RebalanceZNode.path(); + if (getOrEmpty(path).isPresent()) { + zkClient.setData().forPath(path, RebalanceZNode.encode(rebalancePlan)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, RebalanceZNode.encode(rebalancePlan)); + } + } + + public Optional getRebalancePlan() throws Exception { + String path = RebalanceZNode.path(); + return getOrEmpty(path).map(RebalanceZNode::decode); + } + + public void deleteRebalancePlan() throws Exception { + String path = RebalanceZNode.path(); + deletePath(path); + } + // -------------------------------------------------------------------------------------------- // Utils // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java new file mode 100644 index 0000000000..00c87bcbc3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * The generated rebalance plan for this cluster. + * + *

The latest execution rebalance plan will be stored in {@link ZkData.RebalanceZNode}. + * + * @see RebalancePlanJsonSerde for json serialization and deserialization. + */ +public class RebalancePlan { + + /** A mapping from tableBucket to RebalancePlanForBuckets of none-partitioned table. */ + private final Map> planForBuckets; + + /** A mapping from tableBucket to RebalancePlanForBuckets of partitioned table. */ + private final Map> + planForBucketsOfPartitionedTable; + + public RebalancePlan(Map bucketPlan) { + this.planForBuckets = new HashMap<>(); + this.planForBucketsOfPartitionedTable = new HashMap<>(); + + for (Map.Entry entry : bucketPlan.entrySet()) { + TableBucket tableBucket = entry.getKey(); + RebalancePlanForBucket rebalancePlanForBucket = entry.getValue(); + if (tableBucket.getPartitionId() == null) { + planForBuckets + .computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>()) + .add(rebalancePlanForBucket); + } else { + TablePartition tp = + new TablePartition(tableBucket.getTableId(), tableBucket.getPartitionId()); + planForBucketsOfPartitionedTable + .computeIfAbsent(tp, k -> new ArrayList<>()) + .add(rebalancePlanForBucket); + } + } + } + + public Map> getPlanForBuckets() { + return planForBuckets; + } + + public Map> getPlanForBucketsOfPartitionedTable() { + return planForBucketsOfPartitionedTable; + } + + @Override + public String toString() { + return "RebalancePlan{" + + "planForBuckets=" + + planForBuckets + + ", planForBucketsOfPartitionedTable=" + + planForBucketsOfPartitionedTable + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RebalancePlan that = (RebalancePlan) o; + + if (!Objects.equals(planForBuckets, that.planForBuckets)) { + return false; + } + return Objects.equals( + planForBucketsOfPartitionedTable, that.planForBucketsOfPartitionedTable); + } + + @Override + public int hashCode() { + return Objects.hash(planForBuckets, planForBucketsOfPartitionedTable); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java new file mode 100644 index 0000000000..b79cd46e30 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** Json serializer and deserializer for {@link RebalancePlan}. */ +public class RebalancePlanJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final RebalancePlanJsonSerde INSTANCE = new RebalancePlanJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String REBALANCE_PLAN = "rebalance_plan"; + + private static final String TABLE_ID = "table_id"; + private static final String PARTITION_ID = "partition_id"; + + private static final String BUCKETS = "buckets"; + private static final String BUCKET_ID = "bucket_id"; + private static final String ORIGINAL_LEADER = "original_leader"; + private static final String NEW_LEADER = "new_leader"; + private static final String ORIGIN_REPLICAS = "origin_replicas"; + private static final String NEW_REPLICAS = "new_replicas"; + + private static final int VERSION = 1; + + @Override + public void serialize(RebalancePlan rebalancePlan, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + + generator.writeArrayFieldStart(REBALANCE_PLAN); + // first to write none-partitioned tables. + for (Map.Entry> entry : + rebalancePlan.getPlanForBuckets().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID, entry.getKey()); + generator.writeArrayFieldStart(BUCKETS); + for (RebalancePlanForBucket bucketPlan : entry.getValue()) { + serializeRebalancePlanForBucket(generator, bucketPlan); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + // then to write partitioned tables. + for (Map.Entry> entry : + rebalancePlan.getPlanForBucketsOfPartitionedTable().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID, entry.getKey().getTableId()); + generator.writeNumberField(PARTITION_ID, entry.getKey().getPartitionId()); + generator.writeArrayFieldStart(BUCKETS); + for (RebalancePlanForBucket bucketPlan : entry.getValue()) { + serializeRebalancePlanForBucket(generator, bucketPlan); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + generator.writeEndArray(); + + generator.writeEndObject(); + } + + @Override + public RebalancePlan deserialize(JsonNode node) { + JsonNode rebalancePlanNode = node.get(REBALANCE_PLAN); + Map planForBuckets = new HashMap<>(); + + for (JsonNode tablePartitionPlanNode : rebalancePlanNode) { + long tableId = tablePartitionPlanNode.get(TABLE_ID).asLong(); + + Long partitionId = null; + if (tablePartitionPlanNode.has(PARTITION_ID)) { + partitionId = tablePartitionPlanNode.get(PARTITION_ID).asLong(); + } + + JsonNode bucketPlanNodes = tablePartitionPlanNode.get(BUCKETS); + for (JsonNode bucketPlanNode : bucketPlanNodes) { + int bucketId = bucketPlanNode.get(BUCKET_ID).asInt(); + TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); + + int originLeader = bucketPlanNode.get(ORIGINAL_LEADER).asInt(); + + int newLeader = bucketPlanNode.get(NEW_LEADER).asInt(); + + List originReplicas = new ArrayList<>(); + Iterator elements = bucketPlanNode.get(ORIGIN_REPLICAS).elements(); + while (elements.hasNext()) { + originReplicas.add(elements.next().asInt()); + } + + List newReplicas = new ArrayList<>(); + elements = bucketPlanNode.get(NEW_REPLICAS).elements(); + while (elements.hasNext()) { + newReplicas.add(elements.next().asInt()); + } + + planForBuckets.put( + tableBucket, + new RebalancePlanForBucket( + tableBucket, originLeader, newLeader, originReplicas, newReplicas)); + } + } + + return new RebalancePlan(planForBuckets); + } + + private void serializeRebalancePlanForBucket( + JsonGenerator generator, RebalancePlanForBucket bucketPlan) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(BUCKET_ID, bucketPlan.getBucketId()); + generator.writeNumberField(ORIGINAL_LEADER, bucketPlan.getOriginalLeader()); + generator.writeNumberField(NEW_LEADER, bucketPlan.getNewLeader()); + generator.writeArrayFieldStart(ORIGIN_REPLICAS); + for (Integer replica : bucketPlan.getOriginReplicas()) { + generator.writeNumber(replica); + } + generator.writeEndArray(); + generator.writeArrayFieldStart(NEW_REPLICAS); + for (Integer replica : bucketPlan.getNewReplicas()) { + generator.writeNumber(replica); + } + generator.writeEndArray(); + generator.writeEndObject(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java new file mode 100644 index 0000000000..edddcaaf75 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; + +import java.util.Map; +import java.util.Objects; + +/** + * The latest {@link ServerTags} of tabletServers in {@link ZkData.ServerTagsZNode}. It is used to + * store the serverTags information in zookeeper. + * + * @see ServerTagsJsonSerde for json serialization and deserialization. + */ +public class ServerTags { + + // a mapping from tabletServer id to serverTag. + private final Map serverTags; + + public ServerTags(Map serverTags) { + this.serverTags = serverTags; + } + + public Map getServerTags() { + return serverTags; + } + + @Override + public String toString() { + return "ServerTags{" + "serverTags=" + serverTags + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerTags that = (ServerTags) o; + return Objects.equals(serverTags, that.serverTags); + } + + @Override + public int hashCode() { + return Objects.hash(serverTags); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java new file mode 100644 index 0000000000..7df94c74e4 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Json serializer and deserializer for {@link ServerTags}. */ +public class ServerTagsJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final ServerTagsJsonSerde INSTANCE = new ServerTagsJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String SERVER_TAGS = "server_tags"; + private static final int VERSION = 1; + + @Override + public void serialize(ServerTags serverTags, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeObjectFieldStart(SERVER_TAGS); + for (Map.Entry entry : serverTags.getServerTags().entrySet()) { + generator.writeNumberField(String.valueOf(entry.getKey()), entry.getValue().value); + } + generator.writeEndObject(); + + generator.writeEndObject(); + } + + @Override + public ServerTags deserialize(JsonNode node) { + JsonNode serverTagsNode = node.get(SERVER_TAGS); + Map serverTags = new HashMap<>(); + Iterator fieldNames = serverTagsNode.fieldNames(); + while (fieldNames.hasNext()) { + String serverId = fieldNames.next(); + serverTags.put( + Integer.valueOf(serverId), + ServerTag.valueOf(serverTagsNode.get(serverId).asInt())); + } + return new ServerTags(serverTags); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index bb33e17ff5..4798623a74 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -613,6 +613,25 @@ public static LakeTable decode(byte[] json) { } } + /** + * The znode for server tags. The znode path is: + * + *

/tabletServers/server_tags + */ + public static final class ServerTagsZNode { + public static String path() { + return "/tabletservers/server_tags"; + } + + public static byte[] encode(ServerTags serverTag) { + return JsonSerdeUtils.writeValueAsBytes(serverTag, ServerTagsJsonSerde.INSTANCE); + } + + public static ServerTags decode(byte[] json) { + return JsonSerdeUtils.readValue(json, ServerTagsJsonSerde.INSTANCE); + } + } + // ------------------------------------------------------------------------------------------ // ZNodes for ACL(Access Control List). // ------------------------------------------------------------------------------------------ @@ -776,4 +795,27 @@ public static byte[] encode() { return new byte[0]; } } + + // ------------------------------------------------------------------------------------------ + // ZNodes under "/cluster/" + // ------------------------------------------------------------------------------------------ + + /** + * The znode for rebalance. The znode path is: + * + *

/cluster/rebalance + */ + public static final class RebalanceZNode { + public static String path() { + return "/cluster/rebalance"; + } + + public static byte[] encode(RebalancePlan rebalancePlan) { + return JsonSerdeUtils.writeValueAsBytes(rebalancePlan, RebalancePlanJsonSerde.INSTANCE); + } + + public static RebalancePlan decode(byte[] json) { + return JsonSerdeUtils.readValue(json, RebalancePlanJsonSerde.INSTANCE); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index be15c4d151..ffba559fa0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -22,6 +22,8 @@ import org.apache.fluss.exception.NetworkException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest; @@ -30,6 +32,8 @@ import org.apache.fluss.rpc.messages.AlterTableResponse; import org.apache.fluss.rpc.messages.ApiVersionsRequest; import org.apache.fluss.rpc.messages.ApiVersionsResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -80,10 +84,16 @@ import org.apache.fluss.rpc.messages.ListDatabasesResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -339,6 +349,34 @@ public CompletableFuture lakeTieringHeartbeat( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture addServerTag(AddServerTagRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture removeServerTag( + RemoveServerTagRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture rebalance(RebalanceRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture cancelRebalance( + CancelRebalanceRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture controlledShutdown( ControlledShutdownRequest request) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index f42d381faf..f46acaf30d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -19,6 +19,8 @@ import org.apache.fluss.cluster.Endpoint; import org.apache.fluss.cluster.TabletServerInfo; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.Schema; @@ -33,6 +35,8 @@ import org.apache.fluss.server.zk.data.CoordinatorAddress; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -552,6 +556,80 @@ void testPartition() throws Exception { assertThat(partitions).containsExactly("p2"); } + @Test + void testServerTag() throws Exception { + Map serverTags = new HashMap<>(); + serverTags.put(0, ServerTag.PERMANENT_OFFLINE); + serverTags.put(1, ServerTag.TEMPORARY_OFFLINE); + + zookeeperClient.registerServerTags(new ServerTags(serverTags)); + assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); + + // update server tags. + serverTags.put(0, ServerTag.TEMPORARY_OFFLINE); + serverTags.remove(1); + zookeeperClient.registerServerTags(new ServerTags(serverTags)); + assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); + + zookeeperClient.registerServerTags(new ServerTags(Collections.emptyMap())); + assertThat(zookeeperClient.getServerTags()) + .hasValue(new ServerTags(Collections.emptyMap())); + } + + @Test + void testRebalancePlan() throws Exception { + Map bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(0L, 1), + new RebalancePlanForBucket( + new TableBucket(0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + bucketPlan.put( + new TableBucket(1L, 1L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 0), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + bucketPlan.put( + new TableBucket(1L, 1L, 1), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + + bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + + zookeeperClient.deleteRebalancePlan(); + assertThat(zookeeperClient.getRebalancePlan()).isEmpty(); + } + @Test void testZookeeperConfigPath() throws Exception { final Configuration config = new Configuration(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java new file mode 100644 index 0000000000..64b6da43e5 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link RebalancePlanJsonSerde}. */ +public class RebalancePlanJsonSerdeTest extends JsonSerdeTestBase { + + RebalancePlanJsonSerdeTest() { + super(RebalancePlanJsonSerde.INSTANCE); + } + + @Override + protected RebalancePlan[] createObjects() { + Map bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(0L, 1), + new RebalancePlanForBucket( + new TableBucket(0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + + bucketPlan.put( + new TableBucket(1L, 0L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(1L, 0L, 1), + new RebalancePlanForBucket( + new TableBucket(1L, 0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + + bucketPlan.put( + new TableBucket(1L, 1L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + return new RebalancePlan[] {new RebalancePlan(bucketPlan)}; + } + + @Override + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"rebalance_plan\":" + + "[{\"table_id\":0,\"buckets\":" + + "[{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}," + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}," + + "{\"table_id\":1,\"partition_id\":0,\"buckets\":[" + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}," + + "{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}]}," + + "{\"table_id\":1,\"partition_id\":1,\"buckets\":[" + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}]}" + }; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java new file mode 100644 index 0000000000..8dd4e7f454 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link ServerTagsJsonSerde}. */ +public class ServerTagsJsonSerdeTest extends JsonSerdeTestBase { + + ServerTagsJsonSerdeTest() { + super(ServerTagsJsonSerde.INSTANCE); + } + + @Override + protected ServerTags[] createObjects() { + Map serverTags = new HashMap<>(); + serverTags.put(0, ServerTag.PERMANENT_OFFLINE); + serverTags.put(1, ServerTag.TEMPORARY_OFFLINE); + + Map serverTags2 = new HashMap<>(); + + return new ServerTags[] {new ServerTags(serverTags), new ServerTags(serverTags2)}; + } + + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"server_tags\":{\"0\":0,\"1\":1}}", + "{\"version\":1,\"server_tags\":{}}" + }; + } +} From 9ddb717096e7d459ff1bded9c7edeb3f52e77b93 Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Sun, 28 Dec 2025 20:03:04 +0800 Subject: [PATCH 2/3] address jark's comments --- .../org/apache/fluss/client/admin/Admin.java | 19 +++--- .../apache/fluss/client/admin/FlussAdmin.java | 6 +- .../fluss/client/admin/RebalancePlan.java | 41 ++++++++++++ .../fluss/client/admin/RebalanceProgress.java | 62 +++++++++++++++++++ .../fluss/cluster/rebalance/GoalType.java | 8 +-- .../rebalance/RebalancePlanForBucket.java | 2 +- .../rebalance/RebalanceResultForBucket.java | 24 +++---- ...tusForBucket.java => RebalanceStatus.java} | 16 +++-- .../fluss/cluster/rebalance/ServerTag.java | 8 ++- .../NoRebalanceInProgressException.java | 2 +- .../exception/RebalanceFailureException.java | 2 +- .../exception/ServerNotExistException.java | 2 +- .../ServerTagAlreadyExistException.java | 2 +- .../exception/ServerTagNotExistException.java | 2 +- .../TabletServerNotAvailableException.java | 2 +- .../fluss/rpc/gateway/AdminGateway.java | 10 +-- .../apache/fluss/rpc/protocol/ApiKeys.java | 2 +- fluss-rpc/src/main/proto/FlussApi.proto | 49 +++++---------- .../coordinator/CoordinatorService.java | 8 +-- .../fluss/server/zk/ZooKeeperClient.java | 37 +++++------ .../fluss/server/zk/data/RebalancePlan.java | 30 ++++++--- .../zk/data/RebalancePlanJsonSerde.java | 9 ++- .../coordinator/TestCoordinatorGateway.java | 8 +-- .../fluss/server/zk/ZooKeeperClientTest.java | 21 ++++--- .../zk/data/RebalancePlanJsonSerdeTest.java | 6 +- 25 files changed, 245 insertions(+), 133 deletions(-) create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/admin/RebalancePlan.java create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java rename fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/{RebalanceStatusForBucket.java => RebalanceStatus.java} (81%) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index feca0e5f68..b5ae7b06b9 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -23,8 +23,6 @@ import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.rebalance.GoalType; -import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; -import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.cluster.AlterConfig; @@ -70,7 +68,6 @@ import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -509,13 +506,16 @@ ListOffsetsResult listOffsets( * *

If one tabletServer failed adding tag, none of the tags will take effect. * + *

If one tabletServer already has a serverTag, and the serverTag is same with the existing + * one, this operation will be ignored. + * *

    *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster * permissions. *
  • {@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not * exist. *
  • {@link ServerTagAlreadyExistException} If the server tag already exists for any one of - * the tabletServers. + * the tabletServers, and the server tag is different from the existing one. *
* * @param tabletServers the tabletServers we want to add server tags. @@ -528,6 +528,8 @@ ListOffsetsResult listOffsets( * *

If one tabletServer failed removing tag, none of the tags will be removed. * + *

No exception will be thrown if the server already has no any server tag now. + * *

    *
  • {@link AuthorizationException} If the authenticated user doesn't have cluster * permissions. @@ -563,11 +565,10 @@ ListOffsetsResult listOffsets( * it. * @return the generated rebalance plan for all the tableBuckets which need to do rebalance. */ - CompletableFuture> rebalance( - List priorityGoals, boolean dryRun); + CompletableFuture rebalance(List priorityGoals, boolean dryRun); /** - * List the rebalance process. + * List the rebalance progress. * *
      *
    • {@link AuthorizationException} If the authenticated user doesn't have cluster @@ -575,9 +576,9 @@ CompletableFuture> rebalance( *
    • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. *
    * - * @return the rebalance process for all the tableBuckets doing rebalance. + * @return the rebalance process. */ - CompletableFuture> listRebalanceProcess(); + CompletableFuture listRebalanceProgress(); /** * Cannel the rebalance task. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index cdf9a037f8..5504ee2707 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -25,8 +25,6 @@ import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.rebalance.GoalType; -import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; -import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.cluster.AlterConfig; import org.apache.fluss.config.cluster.ConfigEntry; @@ -547,13 +545,13 @@ public CompletableFuture removeServerTag( } @Override - public CompletableFuture> rebalance( + public CompletableFuture rebalance( List priorityGoals, boolean dryRun) { throw new UnsupportedOperationException("Support soon"); } @Override - public CompletableFuture> listRebalanceProcess() { + public CompletableFuture listRebalanceProgress() { throw new UnsupportedOperationException("Support soon"); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalancePlan.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalancePlan.java new file mode 100644 index 0000000000..4e054a71b6 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalancePlan.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; + +import java.util.Map; + +/** + * The rebalance plan. + * + * @since 0.9 + */ +public class RebalancePlan { + + private final Map planForBucketMap; + + public RebalancePlan(Map planForBucketMap) { + this.planForBucketMap = planForBucketMap; + } + + public Map getPlanForBucketMap() { + return planForBucketMap; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java new file mode 100644 index 0000000000..990127f584 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; +import org.apache.fluss.metadata.TableBucket; + +import java.util.Map; + +/** + * The rebalance progress. + * + * @since 0.9 + */ +public class RebalanceProgress { + + /** The rebalance status for the overall rebalance. */ + private final RebalanceStatus rebalanceStatus; + + /** The rebalance progress for the overall rebalance. Between 0.0d to 1.0d */ + private final double progress; + + /** The rebalance progress for each tabletBucket. */ + private final Map processForBucketMap; + + public RebalanceProgress( + RebalanceStatus rebalanceStatus, + double progress, + Map processForBucketMap) { + this.rebalanceStatus = rebalanceStatus; + this.progress = progress; + this.processForBucketMap = processForBucketMap; + } + + public RebalanceStatus status() { + return rebalanceStatus; + } + + public double progress() { + return progress; + } + + public Map processForBucketMap() { + return processForBucketMap; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java index 6dc7624805..c9392497c1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java @@ -24,7 +24,7 @@ /** * The type of goal to optimize. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public enum GoalType { @@ -38,7 +38,7 @@ public enum GoalType { * Goal to generate leadership movement and leader replica movement tasks to ensure that the * number of leader replicas on each tabletServer is near balanced. */ - LEADER_REPLICA_DISTRIBUTION_GOAL(1); + LEADER_DISTRIBUTION_GOAL(1); public final int value; @@ -49,8 +49,8 @@ public enum GoalType { public static GoalType valueOf(int value) { if (value == REPLICA_DISTRIBUTION_GOAL.value) { return REPLICA_DISTRIBUTION_GOAL; - } else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) { - return LEADER_REPLICA_DISTRIBUTION_GOAL; + } else if (value == LEADER_DISTRIBUTION_GOAL.value) { + return LEADER_DISTRIBUTION_GOAL; } else { throw new IllegalArgumentException( String.format( diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java index bedce22663..df5bde12b8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java @@ -26,7 +26,7 @@ /** * a generated rebalance plan for a tableBucket. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class RebalancePlanForBucket { diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java index c477e524b3..161e4f24bb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java @@ -25,25 +25,24 @@ /** * Status of rebalance process for a tabletBucket. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class RebalanceResultForBucket { private final RebalancePlanForBucket rebalancePlanForBucket; - private RebalanceStatusForBucket rebalanceStatusForBucket; + private final RebalanceStatus rebalanceStatus; public RebalanceResultForBucket( - RebalancePlanForBucket rebalancePlanForBucket, - RebalanceStatusForBucket rebalanceStatusForBucket) { + RebalancePlanForBucket rebalancePlanForBucket, RebalanceStatus rebalanceStatus) { this.rebalancePlanForBucket = rebalancePlanForBucket; - this.rebalanceStatusForBucket = rebalanceStatusForBucket; + this.rebalanceStatus = rebalanceStatus; } public TableBucket tableBucket() { return rebalancePlanForBucket.getTableBucket(); } - public RebalancePlanForBucket planForBucket() { + public RebalancePlanForBucket plan() { return rebalancePlanForBucket; } @@ -51,17 +50,12 @@ public List newReplicas() { return rebalancePlanForBucket.getNewReplicas(); } - public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) { - this.rebalanceStatusForBucket = status; - return this; - } - - public RebalanceStatusForBucket status() { - return rebalanceStatusForBucket; + public RebalanceStatus status() { + return rebalanceStatus; } public static RebalanceResultForBucket of( - RebalancePlanForBucket planForBucket, RebalanceStatusForBucket status) { + RebalancePlanForBucket planForBucket, RebalanceStatus status) { return new RebalanceResultForBucket(planForBucket, status); } @@ -71,7 +65,7 @@ public String toString() { + "rebalancePlanForBucket=" + rebalancePlanForBucket + ", rebalanceStatusForBucket=" - + rebalanceStatusForBucket + + rebalanceStatus + '}'; } } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java similarity index 81% rename from fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java rename to fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java index e8c0e46733..96951fe02b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java @@ -20,12 +20,12 @@ import org.apache.fluss.annotation.PublicEvolving; /** - * Rebalance status for single bucket. + * Rebalance status. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving -public enum RebalanceStatusForBucket { +public enum RebalanceStatus { PENDING(1), REBALANCING(2), FAILED(3), @@ -33,12 +33,16 @@ public enum RebalanceStatusForBucket { private final int code; - RebalanceStatusForBucket(int code) { + RebalanceStatus(int code) { this.code = code; } - public static RebalanceStatusForBucket of(int code) { - for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) { + public int getCode() { + return code; + } + + public static RebalanceStatus of(int code) { + for (RebalanceStatus status : RebalanceStatus.values()) { if (status.code == code) { return status; } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java index 66849060d4..ae20c1b7ab 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java @@ -24,11 +24,17 @@ /** * The tag of tabletServer. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public enum ServerTag { + /** + * The tabletServer is permanently offline. Such as the host where the tabletServer on is + * upcoming decommissioning. + */ PERMANENT_OFFLINE(0), + + /** The tabletServer is temporarily offline. Such as the tabletServer is upcoming upgrading. */ TEMPORARY_OFFLINE(1); public final int value; diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java index 8b052a5100..9c25cf1eba 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java @@ -22,7 +22,7 @@ /** * Thrown if there are no rebalance tasks in progress when list rebalance process. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class NoRebalanceInProgressException extends ApiException { diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java index 0dcf260b0c..676832b291 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java @@ -22,7 +22,7 @@ /** * This exception is thrown if rebalance failed. * - * @since 0.8 + * @since 0.9 */ @VisibleForTesting public class RebalanceFailureException extends ApiException { diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java index 2bdbe621e8..e4a2ff6ab3 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java @@ -22,7 +22,7 @@ /** * Thrown if a server does not exist in the cluster. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class ServerNotExistException extends ApiException { diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java index a3d4259b13..8fd64ccab9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java @@ -22,7 +22,7 @@ /** * Thrown if a server tag already exists for specify tabletServer in the cluster. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class ServerTagAlreadyExistException extends ApiException { diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java index bd62672c72..ec6eef7794 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java @@ -22,7 +22,7 @@ /** * Thrown if a server tag not exist for specify tabletServer in the cluster. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class ServerTagNotExistException extends ApiException { diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java index 320daeff64..c04369a3e4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java @@ -22,7 +22,7 @@ /** * Thrown when the tabletServer is not available. * - * @since 0.8 + * @since 0.9 */ @PublicEvolving public class TabletServerNotAvailableException extends ApiException { diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index 996342dfcb..072a5954f1 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -41,8 +41,8 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; -import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; -import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; @@ -139,9 +139,9 @@ CompletableFuture alterClusterConfigs( @RPC(api = ApiKeys.REBALANCE) CompletableFuture rebalance(RebalanceRequest request); - @RPC(api = ApiKeys.LIST_REBALANCE_PROCESS) - CompletableFuture listRebalanceProcess( - ListRebalanceProcessRequest request); + @RPC(api = ApiKeys.LIST_REBALANCE_PROGRESS) + CompletableFuture listRebalanceProgress( + ListRebalanceProgressRequest request); @RPC(api = ApiKeys.CANCEL_REBALANCE) CompletableFuture cancelRebalance(CancelRebalanceRequest request); diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 97e14d79f0..388ec081dd 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -78,7 +78,7 @@ public enum ApiKeys { ADD_SERVER_TAG(1047, 0, 0, PUBLIC), REMOVE_SERVER_TAG(1048, 0, 0, PUBLIC), REBALANCE(1049, 0, 0, PUBLIC), - LIST_REBALANCE_PROCESS(1050, 0, 0, PUBLIC), + LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), CANCEL_REBALANCE(1051, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 891a295d16..33fd34764a 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -596,14 +596,14 @@ message RebalanceRequest { } message RebalanceResponse { - repeated PbRebalancePlanForTable plan_for_table = 1; + repeated PbRebalancePlanForTable table_plan = 1; } -message ListRebalanceProcessRequest { +message ListRebalanceProgressRequest { } -message ListRebalanceProcessResponse { - repeated PbRebalanceProcessForTable process_for_table = 1; +message ListRebalanceProgressResponse { + repeated PbRebalanceProgressForTable table_process = 1; } message CancelRebalanceRequest { @@ -977,7 +977,6 @@ message PbRenameColumn { required string new_column_name = 2; } - message PbModifyColumn { required string column_name = 1; optional bytes data_type_json = 2; @@ -985,8 +984,6 @@ message PbModifyColumn { optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3 } - - message PbDescribeConfig { required string config_key = 1; optional string config_value = 2; @@ -995,38 +992,24 @@ message PbDescribeConfig { message PbRebalancePlanForTable { required int64 table_id = 1; - repeated PbRebalancePlanForPartition partitions_plan = 2; // for none-partition table, this is empty - repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, this is empty - -} - -message PbRebalancePlanForPartition { - required int64 partition_id = 1; repeated PbRebalancePlanForBucket buckets_plan = 2; } -message PbRebalancePlanForBucket { - required int32 bucket_id = 1; - optional int32 original_leader = 2; - optional int32 new_leader = 3; - repeated int32 original_replicas = 4 [packed = true]; - repeated int32 new_replicas = 5 [packed = true]; -} - -message PbRebalanceProcessForTable { +message PbRebalanceProgressForTable { required int64 table_id = 1; - repeated PbRebalanceProcessForPartition partitions_process = 2; - repeated PbRebalanceProcessForBucket buckets_process = 3; + repeated PbRebalanceProgressForBucket buckets_progress = 2; } -message PbRebalanceProcessForPartition { - required int64 partition_id = 1; - repeated PbRebalanceProcessForBucket buckets_process = 2; +message PbRebalanceProgressForBucket { + required PbRebalancePlanForBucket reblance_plan = 1; + required int32 rebalance_status = 2; } -message PbRebalanceProcessForBucket { - required int32 bucket_id = 1; - repeated int32 original_replicas = 2 [packed = true]; - repeated int32 new_replicas = 3 [packed = true]; - required int32 rebalance_status = 4; +message PbRebalancePlanForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + optional int32 original_leader = 3; + optional int32 new_leader = 4; + repeated int32 original_replicas = 5 [packed = true]; + repeated int32 new_replicas = 6 [packed = true]; } \ No newline at end of file diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 7633df59d4..9eee45e600 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -80,8 +80,8 @@ import org.apache.fluss.rpc.messages.DropTableResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; -import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; -import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbAlterConfig; @@ -767,8 +767,8 @@ public CompletableFuture rebalance(RebalanceRequest request) } @Override - public CompletableFuture listRebalanceProcess( - ListRebalanceProcessRequest request) { + public CompletableFuture listRebalanceProgress( + ListRebalanceProgressRequest request) { throw new UnsupportedOperationException("Support soon!"); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 749fdec2a0..a9e0af4d95 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1198,14 +1198,15 @@ public void insertConfigChangeNotification() throws Exception { public void registerServerTags(ServerTags newServerTags) throws Exception { String path = ServerTagsZNode.path(); - if (getOrEmpty(path).isPresent()) { - zkClient.setData().forPath(path, ServerTagsZNode.encode(newServerTags)); - } else { - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, ServerTagsZNode.encode(newServerTags)); - } + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, ServerTagsZNode.encode(newServerTags)); + } + + public void updateServerTags(ServerTags newServerTags) throws Exception { + String path = ServerTagsZNode.path(); + zkClient.setData().forPath(path, ServerTagsZNode.encode(newServerTags)); } public Optional getServerTags() throws Exception { @@ -1215,24 +1216,20 @@ public Optional getServerTags() throws Exception { public void registerRebalancePlan(RebalancePlan rebalancePlan) throws Exception { String path = RebalanceZNode.path(); - if (getOrEmpty(path).isPresent()) { - zkClient.setData().forPath(path, RebalanceZNode.encode(rebalancePlan)); - } else { - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, RebalanceZNode.encode(rebalancePlan)); - } + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, RebalanceZNode.encode(rebalancePlan)); } - public Optional getRebalancePlan() throws Exception { + public void updateRebalancePlan(RebalancePlan rebalancePlan) throws Exception { String path = RebalanceZNode.path(); - return getOrEmpty(path).map(RebalanceZNode::decode); + zkClient.setData().forPath(path, RebalanceZNode.encode(rebalancePlan)); } - public void deleteRebalancePlan() throws Exception { + public Optional getRebalancePlan() throws Exception { String path = RebalanceZNode.path(); - deletePath(path); + return getOrEmpty(path).map(RebalanceZNode::decode); } // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java index 00c87bcbc3..80bfddeb28 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.zk.data; import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePartition; @@ -36,6 +37,9 @@ */ public class RebalancePlan { + /** The rebalance status. */ + private RebalanceStatus rebalanceStatus; + /** A mapping from tableBucket to RebalancePlanForBuckets of none-partitioned table. */ private final Map> planForBuckets; @@ -43,7 +47,9 @@ public class RebalancePlan { private final Map> planForBucketsOfPartitionedTable; - public RebalancePlan(Map bucketPlan) { + public RebalancePlan( + RebalanceStatus rebalanceStatus, Map bucketPlan) { + this.rebalanceStatus = rebalanceStatus; this.planForBuckets = new HashMap<>(); this.planForBucketsOfPartitionedTable = new HashMap<>(); @@ -64,6 +70,14 @@ public RebalancePlan(Map bucketPlan) { } } + public RebalanceStatus getRebalanceStatus() { + return rebalanceStatus; + } + + public void setRebalanceStatus(RebalanceStatus rebalanceStatus) { + this.rebalanceStatus = rebalanceStatus; + } + public Map> getPlanForBuckets() { return planForBuckets; } @@ -75,6 +89,8 @@ public Map> getPlanForBucketsOfPart @Override public String toString() { return "RebalancePlan{" + + "rebalanceStatus=" + + rebalanceStatus + "planForBuckets=" + planForBuckets + ", planForBucketsOfPartitionedTable=" @@ -92,16 +108,14 @@ public boolean equals(Object o) { } RebalancePlan that = (RebalancePlan) o; - - if (!Objects.equals(planForBuckets, that.planForBuckets)) { - return false; - } - return Objects.equals( - planForBucketsOfPartitionedTable, that.planForBucketsOfPartitionedTable); + return rebalanceStatus == that.rebalanceStatus + && Objects.equals(planForBuckets, that.planForBuckets) + && Objects.equals( + planForBucketsOfPartitionedTable, that.planForBucketsOfPartitionedTable); } @Override public int hashCode() { - return Objects.hash(planForBuckets, planForBucketsOfPartitionedTable); + return Objects.hash(rebalanceStatus, planForBuckets, planForBucketsOfPartitionedTable); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java index b79cd46e30..6588d879c5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.zk.data; import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceStatus; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -39,6 +40,7 @@ public class RebalancePlanJsonSerde public static final RebalancePlanJsonSerde INSTANCE = new RebalancePlanJsonSerde(); private static final String VERSION_KEY = "version"; + private static final String REBALANCE_STATUS = "rebalance_status"; private static final String REBALANCE_PLAN = "rebalance_plan"; private static final String TABLE_ID = "table_id"; @@ -57,6 +59,7 @@ public class RebalancePlanJsonSerde public void serialize(RebalancePlan rebalancePlan, JsonGenerator generator) throws IOException { generator.writeStartObject(); generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeNumberField(REBALANCE_STATUS, rebalancePlan.getRebalanceStatus().getCode()); generator.writeArrayFieldStart(REBALANCE_PLAN); // first to write none-partitioned tables. @@ -94,8 +97,10 @@ public void serialize(RebalancePlan rebalancePlan, JsonGenerator generator) thro @Override public RebalancePlan deserialize(JsonNode node) { JsonNode rebalancePlanNode = node.get(REBALANCE_PLAN); - Map planForBuckets = new HashMap<>(); + RebalanceStatus rebalanceStatus = RebalanceStatus.of(node.get(REBALANCE_STATUS).asInt()); + + Map planForBuckets = new HashMap<>(); for (JsonNode tablePartitionPlanNode : rebalancePlanNode) { long tableId = tablePartitionPlanNode.get(TABLE_ID).asLong(); @@ -132,7 +137,7 @@ public RebalancePlan deserialize(JsonNode node) { } } - return new RebalancePlan(planForBuckets); + return new RebalancePlan(rebalanceStatus, planForBuckets); } private void serializeRebalancePlanForBucket( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index ffba559fa0..c8500645e6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -84,8 +84,8 @@ import org.apache.fluss.rpc.messages.ListDatabasesResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; -import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; -import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; @@ -366,8 +366,8 @@ public CompletableFuture rebalance(RebalanceRequest request) } @Override - public CompletableFuture listRebalanceProcess( - ListRebalanceProcessRequest request) { + public CompletableFuture listRebalanceProgress( + ListRebalanceProgressRequest request) { throw new UnsupportedOperationException(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index f46acaf30d..2cc8ab9400 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -67,6 +67,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.PENDING; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -568,10 +570,10 @@ void testServerTag() throws Exception { // update server tags. serverTags.put(0, ServerTag.TEMPORARY_OFFLINE); serverTags.remove(1); - zookeeperClient.registerServerTags(new ServerTags(serverTags)); + zookeeperClient.updateServerTags(new ServerTags(serverTags)); assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); - zookeeperClient.registerServerTags(new ServerTags(Collections.emptyMap())); + zookeeperClient.updateServerTags(new ServerTags(Collections.emptyMap())); assertThat(zookeeperClient.getServerTags()) .hasValue(new ServerTags(Collections.emptyMap())); } @@ -611,8 +613,9 @@ void testRebalancePlan() throws Exception { 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3))); - zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); - assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + zookeeperClient.registerRebalancePlan(new RebalancePlan(PENDING, bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()) + .hasValue(new RebalancePlan(PENDING, bucketPlan)); bucketPlan = new HashMap<>(); bucketPlan.put( @@ -623,11 +626,13 @@ void testRebalancePlan() throws Exception { 3, Arrays.asList(0, 1, 2), Arrays.asList(3, 4, 5))); - zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); - assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + zookeeperClient.updateRebalancePlan(new RebalancePlan(PENDING, bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()) + .hasValue(new RebalancePlan(PENDING, bucketPlan)); - zookeeperClient.deleteRebalancePlan(); - assertThat(zookeeperClient.getRebalancePlan()).isEmpty(); + zookeeperClient.updateRebalancePlan(new RebalancePlan(COMPLETED, bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()) + .hasValue(new RebalancePlan(COMPLETED, bucketPlan)); } @Test diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java index 64b6da43e5..da801de76c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.PENDING; + /** Test for {@link RebalancePlanJsonSerde}. */ public class RebalancePlanJsonSerdeTest extends JsonSerdeTestBase { @@ -77,13 +79,13 @@ protected RebalancePlan[] createObjects() { 3, Arrays.asList(0, 1, 2), Arrays.asList(3, 4, 5))); - return new RebalancePlan[] {new RebalancePlan(bucketPlan)}; + return new RebalancePlan[] {new RebalancePlan(PENDING, bucketPlan)}; } @Override protected String[] expectedJsons() { return new String[] { - "{\"version\":1,\"rebalance_plan\":" + "{\"version\":1,\"rebalance_status\":1,\"rebalance_plan\":" + "[{\"table_id\":0,\"buckets\":" + "[{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}," + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}," From fa8500a062c160094a3ae6aedb99f3d79003caf3 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Sun, 28 Dec 2025 21:01:12 +0800 Subject: [PATCH 3/3] minor improvements --- .../fluss/client/admin/RebalanceProgress.java | 15 +++++++++------ .../rebalance/RebalanceResultForBucket.java | 8 +++++--- .../fluss/cluster/rebalance/RebalanceStatus.java | 2 +- fluss-rpc/src/main/proto/FlussApi.proto | 4 ++-- .../fluss/server/zk/data/RebalancePlan.java | 8 ++------ .../fluss/server/zk/ZooKeeperClientTest.java | 10 +++++----- .../zk/data/RebalancePlanJsonSerdeTest.java | 4 ++-- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java index 990127f584..43738ca6c4 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/RebalanceProgress.java @@ -23,6 +23,8 @@ import java.util.Map; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + /** * The rebalance progress. * @@ -37,15 +39,16 @@ public class RebalanceProgress { private final double progress; /** The rebalance progress for each tabletBucket. */ - private final Map processForBucketMap; + private final Map progressForBucketMap; public RebalanceProgress( RebalanceStatus rebalanceStatus, double progress, - Map processForBucketMap) { - this.rebalanceStatus = rebalanceStatus; + Map progressForBucketMap) { + // TODO: we may derive the overall progress and status from progressForBucketMap + this.rebalanceStatus = checkNotNull(rebalanceStatus); this.progress = progress; - this.processForBucketMap = processForBucketMap; + this.progressForBucketMap = checkNotNull(progressForBucketMap); } public RebalanceStatus status() { @@ -56,7 +59,7 @@ public double progress() { return progress; } - public Map processForBucketMap() { - return processForBucketMap; + public Map progressForBucketMap() { + return progressForBucketMap; } } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java index 161e4f24bb..e472ebd0ae 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java @@ -22,6 +22,8 @@ import java.util.List; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + /** * Status of rebalance process for a tabletBucket. * @@ -34,8 +36,8 @@ public class RebalanceResultForBucket { public RebalanceResultForBucket( RebalancePlanForBucket rebalancePlanForBucket, RebalanceStatus rebalanceStatus) { - this.rebalancePlanForBucket = rebalancePlanForBucket; - this.rebalanceStatus = rebalanceStatus; + this.rebalancePlanForBucket = checkNotNull(rebalancePlanForBucket); + this.rebalanceStatus = checkNotNull(rebalanceStatus); } public TableBucket tableBucket() { @@ -64,7 +66,7 @@ public String toString() { return "RebalanceResultForBucket{" + "rebalancePlanForBucket=" + rebalancePlanForBucket - + ", rebalanceStatusForBucket=" + + ", rebalanceStatus=" + rebalanceStatus + '}'; } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java index 96951fe02b..cb7b6f92dd 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatus.java @@ -26,7 +26,7 @@ */ @PublicEvolving public enum RebalanceStatus { - PENDING(1), + NOT_STARTED(1), REBALANCING(2), FAILED(3), COMPLETED(4); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 33fd34764a..f5dae90c7e 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -603,7 +603,7 @@ message ListRebalanceProgressRequest { } message ListRebalanceProgressResponse { - repeated PbRebalanceProgressForTable table_process = 1; + repeated PbRebalanceProgressForTable table_progress = 1; } message CancelRebalanceRequest { @@ -1001,7 +1001,7 @@ message PbRebalanceProgressForTable { } message PbRebalanceProgressForBucket { - required PbRebalancePlanForBucket reblance_plan = 1; + required PbRebalancePlanForBucket rebalance_plan = 1; required int32 rebalance_status = 2; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java index 80bfddeb28..da9f019c35 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java @@ -37,8 +37,8 @@ */ public class RebalancePlan { - /** The rebalance status. */ - private RebalanceStatus rebalanceStatus; + /** The rebalance status for the overall rebalance. */ + private final RebalanceStatus rebalanceStatus; /** A mapping from tableBucket to RebalancePlanForBuckets of none-partitioned table. */ private final Map> planForBuckets; @@ -74,10 +74,6 @@ public RebalanceStatus getRebalanceStatus() { return rebalanceStatus; } - public void setRebalanceStatus(RebalanceStatus rebalanceStatus) { - this.rebalanceStatus = rebalanceStatus; - } - public Map> getPlanForBuckets() { return planForBuckets; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index 2cc8ab9400..64da4b95cb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -68,7 +68,7 @@ import java.util.stream.Collectors; import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; -import static org.apache.fluss.cluster.rebalance.RebalanceStatus.PENDING; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -613,9 +613,9 @@ void testRebalancePlan() throws Exception { 1, Arrays.asList(0, 1, 2), Arrays.asList(1, 2, 3))); - zookeeperClient.registerRebalancePlan(new RebalancePlan(PENDING, bucketPlan)); + zookeeperClient.registerRebalancePlan(new RebalancePlan(NOT_STARTED, bucketPlan)); assertThat(zookeeperClient.getRebalancePlan()) - .hasValue(new RebalancePlan(PENDING, bucketPlan)); + .hasValue(new RebalancePlan(NOT_STARTED, bucketPlan)); bucketPlan = new HashMap<>(); bucketPlan.put( @@ -626,9 +626,9 @@ void testRebalancePlan() throws Exception { 3, Arrays.asList(0, 1, 2), Arrays.asList(3, 4, 5))); - zookeeperClient.updateRebalancePlan(new RebalancePlan(PENDING, bucketPlan)); + zookeeperClient.updateRebalancePlan(new RebalancePlan(NOT_STARTED, bucketPlan)); assertThat(zookeeperClient.getRebalancePlan()) - .hasValue(new RebalancePlan(PENDING, bucketPlan)); + .hasValue(new RebalancePlan(NOT_STARTED, bucketPlan)); zookeeperClient.updateRebalancePlan(new RebalancePlan(COMPLETED, bucketPlan)); assertThat(zookeeperClient.getRebalancePlan()) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java index da801de76c..1bd8ad426e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.fluss.cluster.rebalance.RebalanceStatus.PENDING; +import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; /** Test for {@link RebalancePlanJsonSerde}. */ public class RebalancePlanJsonSerdeTest extends JsonSerdeTestBase { @@ -79,7 +79,7 @@ protected RebalancePlan[] createObjects() { 3, Arrays.asList(0, 1, 2), Arrays.asList(3, 4, 5))); - return new RebalancePlan[] {new RebalancePlan(PENDING, bucketPlan)}; + return new RebalancePlan[] {new RebalancePlan(NOT_STARTED, bucketPlan)}; } @Override