Skip to content

Commit a139595

Browse files
authored
[improve] improve hazelcast joiner, lite node can't be election as master (#8261)
1 parent caf37fc commit a139595

File tree

6 files changed

+389
-11
lines changed

6 files changed

+389
-11
lines changed

Diff for: seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java

+57
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.junit.jupiter.api.condition.OS;
3939

4040
import com.hazelcast.client.config.ClientConfig;
41+
import com.hazelcast.config.Config;
4142
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
4243
import lombok.SneakyThrows;
4344
import lombok.extern.slf4j.Slf4j;
@@ -306,6 +307,62 @@ public void pendingJobCancel() {
306307
}
307308
}
308309

310+
@Test
311+
public void testStartMasterNodeWithTcpIp() {
312+
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
313+
HazelcastInstanceImpl instance =
314+
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
315+
Assertions.assertNotNull(instance);
316+
Assertions.assertEquals(1, instance.getCluster().getMembers().size());
317+
instance.shutdown();
318+
}
319+
320+
@Test
321+
public void testStartMasterNodeWithMulticastJoin() {
322+
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
323+
seaTunnelConfig.setHazelcastConfig(Config.loadFromString(getMulticastConfig()));
324+
HazelcastInstanceImpl instance =
325+
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
326+
Assertions.assertNotNull(instance);
327+
Assertions.assertEquals(1, instance.getCluster().getMembers().size());
328+
instance.shutdown();
329+
}
330+
331+
@Test
332+
public void testCannotOnlyStartWorkerNodeWithTcpIp() {
333+
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
334+
Assertions.assertThrows(
335+
IllegalStateException.class,
336+
() -> {
337+
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
338+
});
339+
}
340+
341+
@Test
342+
public void testCannotOnlyStartWorkerNodeWithMulticastJoin() {
343+
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
344+
seaTunnelConfig.setHazelcastConfig(Config.loadFromString(getMulticastConfig()));
345+
Assertions.assertThrows(
346+
IllegalStateException.class,
347+
() -> {
348+
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
349+
});
350+
}
351+
352+
private String getMulticastConfig() {
353+
return "hazelcast:\n"
354+
+ " network:\n"
355+
+ " join:\n"
356+
+ " multicast:\n"
357+
+ " enabled: true\n"
358+
+ " multicast-group: 224.2.2.3\n"
359+
+ " multicast-port: 54327\n"
360+
+ " multicast-time-to-live: 32\n"
361+
+ " multicast-timeout-seconds: 2\n"
362+
+ " trusted-interfaces:\n"
363+
+ " - 192.168.1.1\n";
364+
}
365+
309366
private SeaTunnelClient createSeaTunnelClient(String clusterName) {
310367
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
311368
clientConfig.setClusterName(TestUtils.getClusterName(clusterName));

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelNodeContext.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@
1818
package org.apache.seatunnel.engine.server;
1919

2020
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
21+
import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutDiscoveryJoiner;
22+
import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutMulticastJoiner;
23+
import org.apache.seatunnel.engine.server.joiner.LiteNodeDropOutTcpIpJoiner;
2124

2225
import com.hazelcast.config.JoinConfig;
2326
import com.hazelcast.instance.impl.DefaultNodeContext;
2427
import com.hazelcast.instance.impl.Node;
2528
import com.hazelcast.instance.impl.NodeExtension;
2629
import com.hazelcast.internal.cluster.Joiner;
30+
import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
2731
import lombok.NonNull;
2832
import lombok.extern.slf4j.Slf4j;
2933

3034
import static com.hazelcast.config.ConfigAccessor.getActiveMemberNetworkConfig;
35+
import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_ENABLED;
3136

3237
@Slf4j
3338
public class SeaTunnelNodeContext extends DefaultNodeContext {
@@ -45,15 +50,28 @@ public NodeExtension createNodeExtension(@NonNull Node node) {
4550

4651
@Override
4752
public Joiner createJoiner(Node node) {
53+
4854
JoinConfig join =
4955
getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
5056
join.verify();
5157

52-
if (join.getTcpIpConfig().isEnabled()) {
58+
// update for seatunnel, lite member can not become master node
59+
if (join.getMulticastConfig().isEnabled() && node.multicastService != null) {
60+
log.info("Using LiteNodeDropOutMulticast Multicast discovery");
61+
return new LiteNodeDropOutMulticastJoiner(node);
62+
} else if (join.getTcpIpConfig().isEnabled()) {
5363
log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
5464
return new LiteNodeDropOutTcpIpJoiner(node);
65+
} else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
66+
|| isAnyAliasedConfigEnabled(join)
67+
|| join.isAutoDetectionEnabled()) {
68+
log.info("Using LiteNodeDropOutDiscoveryJoiner Discovery SPI");
69+
return new LiteNodeDropOutDiscoveryJoiner(node);
5570
}
71+
return null;
72+
}
5673

57-
return super.createJoiner(node);
74+
private boolean isAnyAliasedConfigEnabled(JoinConfig join) {
75+
return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
5876
}
5977
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.server.joiner;
19+
20+
import com.hazelcast.cluster.Address;
21+
import com.hazelcast.cluster.impl.MemberImpl;
22+
import com.hazelcast.config.JoinConfig;
23+
import com.hazelcast.instance.EndpointQualifier;
24+
import com.hazelcast.instance.ProtocolType;
25+
import com.hazelcast.instance.impl.Node;
26+
import com.hazelcast.internal.config.AliasedDiscoveryConfigUtils;
27+
import com.hazelcast.internal.util.Preconditions;
28+
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
29+
import com.hazelcast.internal.util.concurrent.IdleStrategy;
30+
import com.hazelcast.spi.discovery.DiscoveryNode;
31+
import com.hazelcast.spi.discovery.integration.DiscoveryService;
32+
import com.hazelcast.spi.properties.ClusterProperty;
33+
34+
import java.util.ArrayList;
35+
import java.util.Collection;
36+
import java.util.Collections;
37+
import java.util.Iterator;
38+
import java.util.Set;
39+
import java.util.concurrent.TimeUnit;
40+
41+
import static com.hazelcast.internal.config.AliasedDiscoveryConfigUtils.allUsePublicAddress;
42+
import static com.hazelcast.spi.properties.ClusterProperty.DISCOVERY_SPI_PUBLIC_IP_ENABLED;
43+
44+
public class LiteNodeDropOutDiscoveryJoiner extends LiteNodeDropOutTcpIpJoiner {
45+
46+
private final DiscoveryService discoveryService;
47+
private final boolean usePublicAddress;
48+
private final IdleStrategy idleStrategy;
49+
private final int maximumWaitingTimeBeforeJoinSeconds;
50+
51+
public LiteNodeDropOutDiscoveryJoiner(Node node) {
52+
super(node);
53+
this.idleStrategy =
54+
new BackoffIdleStrategy(
55+
0L,
56+
0L,
57+
TimeUnit.MILLISECONDS.toNanos(10L),
58+
TimeUnit.MILLISECONDS.toNanos(500L));
59+
this.maximumWaitingTimeBeforeJoinSeconds =
60+
node.getProperties().getInteger(ClusterProperty.WAIT_SECONDS_BEFORE_JOIN);
61+
this.discoveryService = node.discoveryService;
62+
this.usePublicAddress = usePublicAddress(node.getConfig().getNetworkConfig().getJoin());
63+
}
64+
65+
private boolean usePublicAddress(JoinConfig join) {
66+
return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
67+
|| allUsePublicAddress(
68+
AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
69+
}
70+
71+
protected Collection<Address> getPossibleAddressesForInitialJoin() {
72+
long deadLine =
73+
System.nanoTime()
74+
+ TimeUnit.SECONDS.toNanos((long) this.maximumWaitingTimeBeforeJoinSeconds);
75+
76+
for (int i = 0; System.nanoTime() < deadLine; ++i) {
77+
Collection<Address> possibleAddresses = this.getPossibleAddresses();
78+
if (!possibleAddresses.isEmpty()) {
79+
return possibleAddresses;
80+
}
81+
82+
this.idleStrategy.idle((long) i);
83+
}
84+
85+
return Collections.emptyList();
86+
}
87+
88+
protected Collection<Address> getPossibleAddresses() {
89+
Iterable<DiscoveryNode> discoveredNodes =
90+
(Iterable)
91+
Preconditions.checkNotNull(
92+
this.discoveryService.discoverNodes(),
93+
"Discovered nodes cannot be null!");
94+
MemberImpl localMember = this.node.nodeEngine.getLocalMember();
95+
Set<Address> localAddresses = this.node.getLocalAddressRegistry().getLocalAddresses();
96+
Collection<Address> possibleMembers = new ArrayList();
97+
Iterator var5 = discoveredNodes.iterator();
98+
99+
while (var5.hasNext()) {
100+
DiscoveryNode discoveryNode = (DiscoveryNode) var5.next();
101+
Address discoveredAddress =
102+
this.usePublicAddress
103+
? discoveryNode.getPublicAddress()
104+
: discoveryNode.getPrivateAddress();
105+
if (localAddresses.contains(discoveredAddress)) {
106+
if (!this.usePublicAddress && discoveryNode.getPublicAddress() != null) {
107+
localMember
108+
.getAddressMap()
109+
.put(
110+
EndpointQualifier.resolve(ProtocolType.CLIENT, "public"),
111+
this.publicAddress(localMember, discoveryNode));
112+
}
113+
} else {
114+
possibleMembers.add(discoveredAddress);
115+
}
116+
}
117+
118+
return possibleMembers;
119+
}
120+
121+
private Address publicAddress(MemberImpl localMember, DiscoveryNode discoveryNode) {
122+
if (localMember.getAddressMap().containsKey(EndpointQualifier.CLIENT)) {
123+
try {
124+
String publicHost = discoveryNode.getPublicAddress().getHost();
125+
int clientPort =
126+
((Address) localMember.getAddressMap().get(EndpointQualifier.CLIENT))
127+
.getPort();
128+
return new Address(publicHost, clientPort);
129+
} catch (Exception var5) {
130+
Exception e = var5;
131+
this.logger.fine(e);
132+
}
133+
}
134+
135+
return discoveryNode.getPublicAddress();
136+
}
137+
}

0 commit comments

Comments
 (0)