Skip to content

Commit 489bea6

Browse files
committed
[test] Fix thread-safe problems in CoordinatorEventProcessorTest
1 parent 5a1c4fd commit 489bea6

File tree

5 files changed

+306
-325
lines changed

5 files changed

+306
-325
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,16 @@ protected Map<Integer, List<Integer>> getTableAssignment(long tableId) {
402402
return tableAssignments.getOrDefault(tableId, Collections.emptyMap());
403403
}
404404

405+
@VisibleForTesting
406+
protected int replicaCounts(long tableId) {
407+
return getTableAssignment(tableId).values().stream().mapToInt(List::size).sum();
408+
}
409+
410+
@VisibleForTesting
411+
protected int replicaCounts(TablePartition tablePartition) {
412+
return getPartitionAssignment(tablePartition).values().stream().mapToInt(List::size).sum();
413+
}
414+
405415
@VisibleForTesting
406416
protected Map<Integer, List<Integer>> getPartitionAssignment(TablePartition tablePartition) {
407417
return partitionAssignments.getOrDefault(tablePartition, Collections.emptyMap());

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestResponse;
3939
import com.alibaba.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
4040
import com.alibaba.fluss.rpc.protocol.ApiError;
41+
import com.alibaba.fluss.server.coordinator.event.AccessContextEvent;
4142
import com.alibaba.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
4243
import com.alibaba.fluss.server.coordinator.event.CommitKvSnapshotEvent;
4344
import com.alibaba.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
@@ -418,11 +419,6 @@ private void loadAssignment(
418419
}
419420
}
420421

421-
@VisibleForTesting
422-
protected CoordinatorContext getCoordinatorContext() {
423-
return coordinatorContext;
424-
}
425-
426422
private void onShutdown() {
427423
// first shutdown table manager
428424
tableManager.shutdown();
@@ -485,6 +481,11 @@ public void process(CoordinatorEvent event) {
485481
completeFromCallable(
486482
commitLakeTableSnapshotEvent.getRespCallback(),
487483
() -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
484+
} else if (event instanceof AccessContextEvent) {
485+
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) event;
486+
processAccessContext(accessContextEvent);
487+
} else {
488+
LOG.warn("Unknown event type: {}", event.getClass().getName());
488489
}
489490
} finally {
490491
updateMetrics();
@@ -935,6 +936,15 @@ private CommitRemoteLogManifestResponse tryProcessCommitRemoteLogManifest(
935936
return response;
936937
}
937938

939+
private <T> void processAccessContext(AccessContextEvent<T> event) {
940+
try {
941+
T result = event.getAccessFunction().apply(coordinatorContext);
942+
event.getResultFuture().complete(result);
943+
} catch (Throwable t) {
944+
event.getResultFuture().completeExceptionally(t);
945+
}
946+
}
947+
938948
private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
939949
CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent) {
940950
CommitLakeTableSnapshotData commitLakeTableSnapshotData =
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2025 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.server.coordinator.event;
18+
19+
import com.alibaba.fluss.server.coordinator.CoordinatorContext;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Function;
23+
24+
/**
25+
* An event designed to safely access the {@link CoordinatorContext}. This is intended solely for
26+
* testing purposes. Since {@link CoordinatorContext} is not thread-safe, directly accessing it in
27+
* tests can lead to unsafe operations. This event ensures safe access to the {@link
28+
* CoordinatorContext} during testing
29+
*
30+
* @param <T> the type of the result of the access operation
31+
*/
32+
public class AccessContextEvent<T> implements CoordinatorEvent {
33+
34+
private final Function<CoordinatorContext, T> accessFunction;
35+
private final CompletableFuture<T> resultFuture;
36+
37+
public AccessContextEvent(Function<CoordinatorContext, T> accessFunction) {
38+
this.accessFunction = accessFunction;
39+
this.resultFuture = new CompletableFuture<>();
40+
}
41+
42+
public Function<CoordinatorContext, T> getAccessFunction() {
43+
return accessFunction;
44+
}
45+
46+
public CompletableFuture<T> getResultFuture() {
47+
return resultFuture;
48+
}
49+
}

0 commit comments

Comments
 (0)