Skip to content

Commit 05fb8de

Browse files
committed
[server] Servers with PERMANENT_OFFLINE and TEMPORARY_OFFLINE tag are no longer assigned replicas
1 parent 170a35e commit 05fb8de

File tree

3 files changed

+72
-8
lines changed

3 files changed

+72
-8
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,9 @@ public void startup() {
239239
HashSet<ServerInfo> tabletServerInfoList =
240240
new HashSet<>(coordinatorContext.getLiveTabletServers().values());
241241
serverMetadataCache.updateMetadata(
242-
coordinatorContext.getCoordinatorServerInfo(), tabletServerInfoList);
242+
coordinatorContext.getCoordinatorServerInfo(),
243+
tabletServerInfoList,
244+
coordinatorContext.getServerTags());
243245
updateTabletServerMetadataCacheWhenStartup(tabletServerInfoList);
244246

245247
// start table manager
@@ -856,7 +858,8 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
856858
// update coordinatorServer metadata cache for the new added table server.
857859
serverMetadataCache.updateMetadata(
858860
coordinatorContext.getCoordinatorServerInfo(),
859-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()));
861+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
862+
coordinatorContext.getServerTags());
860863
// update server info for all tablet servers.
861864
updateTabletServerMetadataCache(
862865
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -913,7 +916,9 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
913916
new HashSet<>(coordinatorContext.getLiveTabletServers().values());
914917
// update coordinatorServer metadata cache.
915918
serverMetadataCache.updateMetadata(
916-
coordinatorContext.getCoordinatorServerInfo(), serverInfos);
919+
coordinatorContext.getCoordinatorServerInfo(),
920+
serverInfos,
921+
coordinatorContext.getServerTags());
917922
updateTabletServerMetadataCache(serverInfos, null, null, Collections.emptySet());
918923

919924
TableBucketStateMachine tableBucketStateMachine = tableManager.getTableBucketStateMachine();
@@ -986,6 +991,11 @@ private AddServerTagResponse processAddServerTag(AddServerTagEvent event) {
986991

987992
// Then update coordinatorContext.
988993
serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag));
994+
// update coordinatorServer metadata cache for the new added serverTag
995+
serverMetadataCache.updateMetadata(
996+
coordinatorContext.getCoordinatorServerInfo(),
997+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
998+
coordinatorContext.getServerTags());
989999

9901000
return addServerTagResponse;
9911001
}
@@ -1028,6 +1038,11 @@ private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent even
10281038

10291039
// Then update coordinatorContext.
10301040
serverIds.forEach(coordinatorContext::removeServerTag);
1041+
// update coordinatorServer metadata cache for the new removed serverTag
1042+
serverMetadataCache.updateMetadata(
1043+
coordinatorContext.getCoordinatorServerInfo(),
1044+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
1045+
coordinatorContext.getServerTags());
10311046

10321047
return removeServerTagResponse;
10331048
}

fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.cluster.ServerNode;
2121
import org.apache.fluss.cluster.TabletServerInfo;
22+
import org.apache.fluss.cluster.rebalance.ServerTag;
2223
import org.apache.fluss.server.coordinator.CoordinatorServer;
2324

2425
import javax.annotation.Nullable;
@@ -27,11 +28,13 @@
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
31+
import java.util.Iterator;
3032
import java.util.Map;
3133
import java.util.Optional;
3234
import java.util.Set;
3335
import java.util.concurrent.locks.Lock;
3436
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.stream.Collectors;
3538

3639
import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
3740

@@ -42,7 +45,7 @@ public class CoordinatorMetadataCache implements ServerMetadataCache {
4245

4346
@GuardedBy("metadataLock")
4447
private volatile NodeMetadataSnapshot metadataSnapshot =
45-
new NodeMetadataSnapshot(null, Collections.emptyMap());
48+
new NodeMetadataSnapshot(null, Collections.emptyMap(), Collections.emptyMap());
4649

4750
public CoordinatorMetadataCache() {}
4851

@@ -91,7 +94,37 @@ public Set<TabletServerInfo> getAliveTabletServerInfos() {
9194
return Collections.unmodifiableSet(tabletServerInfos);
9295
}
9396

94-
public void updateMetadata(ServerInfo coordinatorServer, Set<ServerInfo> serverInfoSet) {
97+
/**
98+
* Servers with {@code PERMANENT_OFFLINE} and {@code TEMPORARY_OFFLINE} tags are no longer
99+
* returned here. So that no new replicas will be assigned to these servers.
100+
*/
101+
@Override
102+
public TabletServerInfo[] getLiveServers() {
103+
Set<TabletServerInfo> aliveTabletServerInfosWithoutOfflineServerTag =
104+
getAliveTabletServerInfos().stream()
105+
.filter(
106+
info ->
107+
!metadataSnapshot.serverTags.containsKey(info.getId())
108+
|| (metadataSnapshot.serverTags.get(info.getId())
109+
!= ServerTag.TEMPORARY_OFFLINE
110+
&& metadataSnapshot.serverTags.get(
111+
info.getId())
112+
!= ServerTag.PERMANENT_OFFLINE))
113+
.collect(Collectors.toSet());
114+
TabletServerInfo[] server =
115+
new TabletServerInfo[aliveTabletServerInfosWithoutOfflineServerTag.size()];
116+
Iterator<TabletServerInfo> iterator =
117+
aliveTabletServerInfosWithoutOfflineServerTag.iterator();
118+
for (int i = 0; i < aliveTabletServerInfosWithoutOfflineServerTag.size(); i++) {
119+
server[i] = iterator.next();
120+
}
121+
return server;
122+
}
123+
124+
public void updateMetadata(
125+
ServerInfo coordinatorServer,
126+
Set<ServerInfo> serverInfoSet,
127+
Map<Integer, ServerTag> serverTagMap) {
95128
inLock(
96129
metadataLock,
97130
() -> {
@@ -101,19 +134,23 @@ public void updateMetadata(ServerInfo coordinatorServer, Set<ServerInfo> serverI
101134
}
102135

103136
this.metadataSnapshot =
104-
new NodeMetadataSnapshot(coordinatorServer, newAliveTableServers);
137+
new NodeMetadataSnapshot(
138+
coordinatorServer, newAliveTableServers, serverTagMap);
105139
});
106140
}
107141

108142
private static class NodeMetadataSnapshot {
109143
final @Nullable ServerInfo coordinatorServer;
110144
final Map<Integer, ServerInfo> aliveTabletServers;
145+
final Map<Integer, ServerTag> serverTags;
111146

112147
private NodeMetadataSnapshot(
113148
@Nullable ServerInfo coordinatorServer,
114-
Map<Integer, ServerInfo> aliveTabletServers) {
149+
Map<Integer, ServerInfo> aliveTabletServers,
150+
Map<Integer, ServerTag> serverTags) {
115151
this.coordinatorServer = coordinatorServer;
116152
this.aliveTabletServers = aliveTabletServers;
153+
this.serverTags = serverTags;
117154
}
118155
}
119156
}

fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import org.apache.fluss.cluster.Endpoint;
2121
import org.apache.fluss.cluster.ServerType;
2222
import org.apache.fluss.cluster.TabletServerInfo;
23+
import org.apache.fluss.cluster.rebalance.ServerTag;
2324

2425
import org.junit.jupiter.api.BeforeEach;
2526
import org.junit.jupiter.api.Test;
2627

2728
import java.util.Arrays;
29+
import java.util.HashMap;
2830
import java.util.HashSet;
31+
import java.util.Map;
2932
import java.util.Set;
3033

3134
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,6 +39,7 @@ public class CoordinatorMetadataCacheTest {
3639

3740
private ServerInfo coordinatorServer;
3841
private Set<ServerInfo> aliveTableServers;
42+
private Map<Integer, ServerTag> serverTagMap;
3943

4044
@BeforeEach
4145
public void setup() {
@@ -68,11 +72,15 @@ public void setup() {
6872
"rack2",
6973
Endpoint.fromListenersString("INTERNAL://localhost:104"),
7074
ServerType.TABLET_SERVER)));
75+
76+
serverTagMap = new HashMap<>();
77+
serverTagMap.put(0, ServerTag.PERMANENT_OFFLINE);
78+
serverTagMap.put(1, ServerTag.TEMPORARY_OFFLINE);
7179
}
7280

7381
@Test
7482
void testCoordinatorServerMetadataCache() {
75-
serverMetadataCache.updateMetadata(coordinatorServer, aliveTableServers);
83+
serverMetadataCache.updateMetadata(coordinatorServer, aliveTableServers, serverTagMap);
7684
assertThat(serverMetadataCache.getCoordinatorServer("CLIENT"))
7785
.isEqualTo(coordinatorServer.node("CLIENT"));
7886
assertThat(serverMetadataCache.getCoordinatorServer("INTERNAL"))
@@ -85,5 +93,9 @@ void testCoordinatorServerMetadataCache() {
8593
new TabletServerInfo(0, "rack0"),
8694
new TabletServerInfo(1, "rack1"),
8795
new TabletServerInfo(2, "rack2"));
96+
// server 0 with PERMANENT_OFFLINE tag and server 1 with TEMPORARY_OFFLINE will no longer
97+
// consider alive
98+
assertThat(serverMetadataCache.getLiveServers())
99+
.containsExactlyInAnyOrder(new TabletServerInfo(2, "rack2"));
88100
}
89101
}

0 commit comments

Comments
 (0)