Skip to content

Commit f391276

Browse files
committed
avoid deadlock
1 parent ed3a37a commit f391276

File tree

1 file changed

+23
-27
lines changed

1 file changed

+23
-27
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,20 @@ private void registerRebalanceFromZookeeper() {
114114

115115
public void registerRebalance(Map<TableBucket, RebalancePlanForBucket> rebalancePlan) {
116116
checkNotClosed();
117+
118+
registerTime = System.currentTimeMillis();
119+
// Register to zookeeper first.
120+
try {
121+
zkClient.registerRebalancePlan(new RebalancePlan(rebalancePlan));
122+
} catch (Exception e) {
123+
LOG.error("Error when register rebalance plan to zookeeper.", e);
124+
throw new RebalanceFailureException(
125+
"Error when register rebalance plan to zookeeper.", e);
126+
}
127+
117128
inLock(
118129
lock,
119130
() -> {
120-
registerTime = System.currentTimeMillis();
121-
// Register to zookeeper first.
122-
try {
123-
zkClient.registerRebalancePlan(new RebalancePlan(rebalancePlan));
124-
} catch (Exception e) {
125-
LOG.error("Error when register rebalance plan to zookeeper.", e);
126-
throw new RebalanceFailureException(
127-
"Error when register rebalance plan to zookeeper.", e);
128-
}
129-
130131
// Then, register to ongoingRebalanceTasks.
131132
rebalancePlan.forEach(
132133
((tableBucket, rebalancePlanForBucket) -> {
@@ -198,25 +199,20 @@ public boolean hasOngoingRebalance() {
198199

199200
public RebalancePlan generateRebalancePlan(List<Goal> goalsByPriority) throws Exception {
200201
checkNotClosed();
201-
return inLock(
202-
lock,
203-
() -> {
204-
List<RebalancePlanForBucket> rebalancePlanForBuckets;
205-
try {
206-
// Generate the latest cluster model.
207-
ClusterModel clusterModel = getClusterModel();
202+
List<RebalancePlanForBucket> rebalancePlanForBuckets;
203+
try {
204+
// Generate the latest cluster model.
205+
ClusterModel clusterModel = getClusterModel();
208206

209-
// do optimize.
210-
rebalancePlanForBuckets =
211-
goalOptimizer.doOptimizeOnce(clusterModel, goalsByPriority);
212-
} catch (Exception e) {
213-
LOG.error("Failed to generate rebalance plan.", e);
214-
throw e;
215-
}
207+
// do optimize.
208+
rebalancePlanForBuckets = goalOptimizer.doOptimizeOnce(clusterModel, goalsByPriority);
209+
} catch (Exception e) {
210+
LOG.error("Failed to generate rebalance plan.", e);
211+
throw e;
212+
}
216213

217-
// group by tableId and partitionId to generate rebalance plan.
218-
return buildRebalancePlan(rebalancePlanForBuckets);
219-
});
214+
// group by tableId and partitionId to generate rebalance plan.
215+
return buildRebalancePlan(rebalancePlanForBuckets);
220216
}
221217

222218
public @Nullable RebalancePlanForBucket getRebalancePlanForBucket(TableBucket tableBucket) {

0 commit comments

Comments
 (0)