Skip to content

Commit f25019a

Browse files
author
Liebing
committed
[server] Servers with PERMANENT_OFFLINE and TEMPORARY_OFFLINE tag are no longer assigned replicas
1 parent 2368e6a commit f25019a

File tree

3 files changed

+66
-8
lines changed

3 files changed

+66
-8
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,9 @@ public void startup() {
247247
HashSet<ServerInfo> tabletServerInfoList =
248248
new HashSet<>(coordinatorContext.getLiveTabletServers().values());
249249
serverMetadataCache.updateMetadata(
250-
coordinatorContext.getCoordinatorServerInfo(), tabletServerInfoList);
250+
coordinatorContext.getCoordinatorServerInfo(),
251+
tabletServerInfoList,
252+
coordinatorContext.getServerTags());
251253
updateTabletServerMetadataCacheWhenStartup(tabletServerInfoList);
252254

253255
// start table manager
@@ -818,7 +820,8 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
818820
// update coordinatorServer metadata cache for the new added table server.
819821
serverMetadataCache.updateMetadata(
820822
coordinatorContext.getCoordinatorServerInfo(),
821-
new HashSet<>(coordinatorContext.getLiveTabletServers().values()));
823+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
824+
coordinatorContext.getServerTags());
822825
// update server info for all tablet servers.
823826
updateTabletServerMetadataCache(
824827
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -874,7 +877,9 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
874877
new HashSet<>(coordinatorContext.getLiveTabletServers().values());
875878
// update coordinatorServer metadata cache.
876879
serverMetadataCache.updateMetadata(
877-
coordinatorContext.getCoordinatorServerInfo(), serverInfos);
880+
coordinatorContext.getCoordinatorServerInfo(),
881+
serverInfos,
882+
coordinatorContext.getServerTags());
878883
updateTabletServerMetadataCache(serverInfos, null, null, Collections.emptySet());
879884

880885
TableBucketStateMachine tableBucketStateMachine = tableManager.getTableBucketStateMachine();
@@ -947,6 +952,11 @@ private AddServerTagResponse processAddServerTag(AddServerTagEvent event) {
947952

948953
// Then update coordinatorContext.
949954
serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag));
955+
// update coordinatorServer metadata cache for the new added serverTag
956+
serverMetadataCache.updateMetadata(
957+
coordinatorContext.getCoordinatorServerInfo(),
958+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
959+
coordinatorContext.getServerTags());
950960

951961
return addServerTagResponse;
952962
}
@@ -989,6 +999,11 @@ private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent even
989999

9901000
// Then update coordinatorContext.
9911001
serverIds.forEach(coordinatorContext::removeServerTag);
1002+
// update coordinatorServer metadata cache for the new removed serverTag
1003+
serverMetadataCache.updateMetadata(
1004+
coordinatorContext.getCoordinatorServerInfo(),
1005+
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
1006+
coordinatorContext.getServerTags());
9921007

9931008
return removeServerTagResponse;
9941009
}

fluss-server/src/main/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCache.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.alibaba.fluss.cluster.ServerNode;
2121
import com.alibaba.fluss.cluster.TabletServerInfo;
22+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2223
import com.alibaba.fluss.server.coordinator.CoordinatorServer;
2324

2425
import javax.annotation.Nullable;
@@ -27,11 +28,13 @@
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
31+
import java.util.Iterator;
3032
import java.util.Map;
3133
import java.util.Optional;
3234
import java.util.Set;
3335
import java.util.concurrent.locks.Lock;
3436
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.stream.Collectors;
3538

3639
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
3740

@@ -42,7 +45,7 @@ public class CoordinatorMetadataCache implements ServerMetadataCache {
4245

4346
@GuardedBy("metadataLock")
4447
private volatile NodeMetadataSnapshot metadataSnapshot =
45-
new NodeMetadataSnapshot(null, Collections.emptyMap());
48+
new NodeMetadataSnapshot(null, Collections.emptyMap(), Collections.emptyMap());
4649

4750
public CoordinatorMetadataCache() {}
4851

@@ -91,7 +94,33 @@ public Set<TabletServerInfo> getAliveTabletServerInfos() {
9194
return Collections.unmodifiableSet(tabletServerInfos);
9295
}
9396

94-
public void updateMetadata(ServerInfo coordinatorServer, Set<ServerInfo> serverInfoSet) {
97+
@Override
98+
public TabletServerInfo[] getLiveServers() {
99+
Set<TabletServerInfo> aliveTabletServerInfosWithoutOfflineServerTag =
100+
getAliveTabletServerInfos().stream()
101+
.filter(
102+
info ->
103+
!metadataSnapshot.serverTags.containsKey(info.getId())
104+
|| (metadataSnapshot.serverTags.get(info.getId())
105+
!= ServerTag.TEMPORARY_OFFLINE
106+
&& metadataSnapshot.serverTags.get(
107+
info.getId())
108+
!= ServerTag.PERMANENT_OFFLINE))
109+
.collect(Collectors.toSet());
110+
TabletServerInfo[] server =
111+
new TabletServerInfo[aliveTabletServerInfosWithoutOfflineServerTag.size()];
112+
Iterator<TabletServerInfo> iterator =
113+
aliveTabletServerInfosWithoutOfflineServerTag.iterator();
114+
for (int i = 0; i < aliveTabletServerInfosWithoutOfflineServerTag.size(); i++) {
115+
server[i] = iterator.next();
116+
}
117+
return server;
118+
}
119+
120+
public void updateMetadata(
121+
ServerInfo coordinatorServer,
122+
Set<ServerInfo> serverInfoSet,
123+
Map<Integer, ServerTag> serverTagMap) {
95124
inLock(
96125
metadataLock,
97126
() -> {
@@ -101,19 +130,23 @@ public void updateMetadata(ServerInfo coordinatorServer, Set<ServerInfo> serverI
101130
}
102131

103132
this.metadataSnapshot =
104-
new NodeMetadataSnapshot(coordinatorServer, newAliveTableServers);
133+
new NodeMetadataSnapshot(
134+
coordinatorServer, newAliveTableServers, serverTagMap);
105135
});
106136
}
107137

108138
private static class NodeMetadataSnapshot {
109139
final @Nullable ServerInfo coordinatorServer;
110140
final Map<Integer, ServerInfo> aliveTabletServers;
141+
final Map<Integer, ServerTag> serverTags;
111142

112143
private NodeMetadataSnapshot(
113144
@Nullable ServerInfo coordinatorServer,
114-
Map<Integer, ServerInfo> aliveTabletServers) {
145+
Map<Integer, ServerInfo> aliveTabletServers,
146+
Map<Integer, ServerTag> serverTags) {
115147
this.coordinatorServer = coordinatorServer;
116148
this.aliveTabletServers = aliveTabletServers;
149+
this.serverTags = serverTags;
117150
}
118151
}
119152
}

fluss-server/src/test/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCacheTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import com.alibaba.fluss.cluster.Endpoint;
2121
import com.alibaba.fluss.cluster.ServerType;
2222
import com.alibaba.fluss.cluster.TabletServerInfo;
23+
import com.alibaba.fluss.cluster.maintencance.ServerTag;
2324

2425
import org.junit.jupiter.api.BeforeEach;
2526
import org.junit.jupiter.api.Test;
2627

2728
import java.util.Arrays;
29+
import java.util.HashMap;
2830
import java.util.HashSet;
31+
import java.util.Map;
2932
import java.util.Set;
3033

3134
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,6 +39,7 @@ public class CoordinatorMetadataCacheTest {
3639

3740
private ServerInfo coordinatorServer;
3841
private Set<ServerInfo> aliveTableServers;
42+
private Map<Integer, ServerTag> serverTagMap;
3943

4044
@BeforeEach
4145
public void setup() {
@@ -68,11 +72,15 @@ public void setup() {
6872
"rack2",
6973
Endpoint.fromListenersString("INTERNAL://localhost:104"),
7074
ServerType.TABLET_SERVER)));
75+
76+
serverTagMap = new HashMap<>();
77+
serverTagMap.put(0, ServerTag.PERMANENT_OFFLINE);
78+
serverTagMap.put(1, ServerTag.TEMPORARY_OFFLINE);
7179
}
7280

7381
@Test
7482
void testCoordinatorServerMetadataCache() {
75-
serverMetadataCache.updateMetadata(coordinatorServer, aliveTableServers);
83+
serverMetadataCache.updateMetadata(coordinatorServer, aliveTableServers, serverTagMap);
7684
assertThat(serverMetadataCache.getCoordinatorServer("CLIENT"))
7785
.isEqualTo(coordinatorServer.node("CLIENT"));
7886
assertThat(serverMetadataCache.getCoordinatorServer("INTERNAL"))
@@ -85,5 +93,7 @@ void testCoordinatorServerMetadataCache() {
8593
new TabletServerInfo(0, "rack0"),
8694
new TabletServerInfo(1, "rack1"),
8795
new TabletServerInfo(2, "rack2"));
96+
assertThat(serverMetadataCache.getLiveServers())
97+
.containsExactlyInAnyOrder(new TabletServerInfo(2, "rack2"));
8898
}
8999
}

0 commit comments

Comments
 (0)