Skip to content

Commit ef212e9

Browse files
authored
reimplement fix for akka/pekko cluster (#1594)
* Revert "revert #1568 due to test failures (#1587)" This reverts commit 7af03e5. * temp run nightly test in this PR * no need for square brackets because the set print adds them * logging to find issue * support tcp protocols * Update ClusterDaemon.scala * remove temp logging * try to fix issue in Remoting * extra tests * more tests * ignore udp tests * try to make tests tidy up after failures * Update MixedProtocolClusterSpec.scala * Update MixedProtocolClusterSpec.scala * run main cluster tests for PR
1 parent def84bf commit ef212e9

File tree

4 files changed

+275
-15
lines changed

4 files changed

+275
-15
lines changed

.github/workflows/build-test-prValidation.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,53 @@ jobs:
102102
-Dio.netty.leakDetection.level=PARANOID \
103103
validatePullRequest
104104
105+
pekko-classic-remoting-tests:
106+
name: Pekko Classic Remoting Tests
107+
runs-on: ubuntu-22.04
108+
if: github.repository == 'apache/pekko'
109+
strategy:
110+
fail-fast: false
111+
matrix:
112+
command:
113+
- cluster/test distributed-data/test cluster-tools/test cluster-metrics/test
114+
steps:
115+
- name: Checkout
116+
uses: actions/checkout@v4
117+
with:
118+
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
119+
fetch-depth: 0
120+
fetch-tags: true
121+
122+
- name: Setup Java 11
123+
uses: actions/setup-java@v4
124+
with:
125+
distribution: temurin
126+
java-version: 11
127+
128+
- name: Install sbt
129+
uses: sbt/setup-sbt@v1
130+
131+
- name: Cache Coursier cache
132+
uses: coursier/cache-action@v6
133+
134+
- name: Enable jvm-opts
135+
run: cp .jvmopts-ci .jvmopts
136+
137+
- name: sbt ${{ matrix.command }}
138+
env:
139+
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
140+
# note that this is not running any multi-jvm tests because multi-in-test=false
141+
run: |-
142+
sbt \
143+
-Djava.security.egd=file:/dev/./urandom \
144+
-Dpekko.remote.artery.enabled=off \
145+
-Dpekko.test.timefactor=2 \
146+
-Dpekko.actor.testkit.typed.timefactor=2 \
147+
-Dpekko.test.tags.exclude=gh-exclude,timing \
148+
-Dpekko.test.multi-in-test=false \
149+
-Dpekko.cluster.assert=on \
150+
clean ${{ matrix.command }}
151+
105152
jdk-21-extra-tests:
106153
name: Java 21 Extra Tests (including all tests that need Java 9+)
107154
runs-on: ubuntu-22.04

cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313

1414
package org.apache.pekko.cluster
1515

16+
import scala.annotation.nowarn
1617
import scala.collection.immutable
1718
import scala.concurrent.Future
1819
import scala.concurrent.Promise
1920
import scala.concurrent.duration._
2021
import scala.util.control.NonFatal
2122

22-
import scala.annotation.nowarn
2323
import com.typesafe.config.Config
2424

2525
import org.apache.pekko
@@ -30,13 +30,11 @@ import pekko.annotation.InternalApi
3030
import pekko.cluster.ClusterEvent._
3131
import pekko.cluster.MemberStatus._
3232
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
33-
import pekko.event.ActorWithLogClass
34-
import pekko.event.Logging
33+
import pekko.event.{ ActorWithLogClass, Logging }
3534
import pekko.pattern.ask
36-
import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent }
35+
import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent, RemoteSettings }
3736
import pekko.remote.artery.QuarantinedEvent
38-
import pekko.util.Timeout
39-
import pekko.util.Version
37+
import pekko.util.{ Timeout, Version }
4038

4139
/**
4240
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@@ -365,6 +363,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
365363
val statsEnabled = PublishStatsInterval.isFinite
366364
var gossipStats = GossipStats()
367365

366+
val acceptedProtocols: Set[String] = {
367+
val remoteSettings: RemoteSettings = new RemoteSettings(context.system.settings.config)
368+
val initSet = remoteSettings.AcceptProtocolNames
369+
val tcpSet = initSet.map(protocol => s"$protocol.tcp")
370+
initSet ++ tcpSet
371+
}
372+
368373
var seedNodes = SeedNodes
369374
var seedNodeProcess: Option[ActorRef] = None
370375
var seedNodeProcessCounter = 0 // for unique names
@@ -701,10 +706,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
701706
* which will reply with a `Welcome` message.
702707
*/
703708
def join(address: Address): Unit = {
704-
if (address.protocol != selfAddress.protocol)
709+
if (!acceptedProtocols.contains(address.protocol))
705710
logWarning(
706-
"Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]",
707-
selfAddress.protocol,
711+
"Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}]",
712+
acceptedProtocols,
708713
address.protocol)
709714
else if (address.system != selfAddress.system)
710715
logWarning(
@@ -750,10 +755,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
750755
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = {
751756
if (!preparingForShutdown) {
752757
val selfStatus = latestGossip.member(selfUniqueAddress).status
753-
if (joiningNode.address.protocol != selfAddress.protocol)
758+
if (!acceptedProtocols.contains(joiningNode.address.protocol))
754759
logWarning(
755-
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
756-
selfAddress.protocol,
760+
"Member with wrong protocol tried to join, but was ignored, expected any of {} but was [{}]",
761+
acceptedProtocols,
757762
joiningNode.address.protocol)
758763
else if (joiningNode.address.system != selfAddress.system)
759764
logWarning(
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.cluster
19+
20+
import com.typesafe.config.{ Config, ConfigFactory }
21+
22+
import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec }
23+
24+
object MixedProtocolClusterSpec {
25+
26+
val baseConfig: Config =
27+
ConfigFactory.parseString("""
28+
pekko.actor.provider = "cluster"
29+
pekko.coordinated-shutdown.terminate-actor-system = on
30+
31+
pekko.remote.artery.canonical.port = 0
32+
pekko.remote.classic.netty.tcp.port = 0
33+
pekko.remote.artery.advanced.aeron.idle-cpu-level = 3
34+
pekko.remote.accept-protocol-names = ["pekko", "akka"]
35+
36+
pekko.cluster.jmx.multi-mbeans-in-same-jvm = on
37+
pekko.cluster.configuration-compatibility-check.enforce-on-join = off
38+
""")
39+
40+
val configWithUdp: Config =
41+
ConfigFactory.parseString("""
42+
pekko.remote.artery.transport = "aeron-udp"
43+
""").withFallback(baseConfig)
44+
45+
val configWithPekkoUdp: Config =
46+
ConfigFactory.parseString("""
47+
pekko.remote.protocol-name = "pekko"
48+
""").withFallback(configWithUdp)
49+
50+
val configWithAkkaUdp: Config =
51+
ConfigFactory.parseString("""
52+
pekko.remote.protocol-name = "akka"
53+
""").withFallback(configWithUdp)
54+
55+
val configWithPekkoTcp: Config =
56+
ConfigFactory.parseString("""
57+
pekko.remote.protocol-name = "pekko"
58+
""").withFallback(baseConfig)
59+
60+
val configWithAkkaTcp: Config =
61+
ConfigFactory.parseString("""
62+
pekko.remote.protocol-name = "akka"
63+
""").withFallback(baseConfig)
64+
65+
val configWithNetty: Config =
66+
ConfigFactory.parseString("""
67+
pekko.remote.artery.enabled = false
68+
pekko.remote.classic {
69+
enabled-transports = ["pekko.remote.classic.netty.tcp"]
70+
}
71+
""").withFallback(baseConfig)
72+
73+
val configWithPekkoNetty: Config =
74+
ConfigFactory.parseString("""
75+
pekko.remote.protocol-name = "pekko"
76+
""").withFallback(configWithNetty)
77+
78+
val configWithAkkaNetty: Config =
79+
ConfigFactory.parseString("""
80+
pekko.remote.protocol-name = "akka"
81+
""").withFallback(configWithNetty)
82+
}
83+
84+
class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit {
85+
86+
import MixedProtocolClusterSpec._
87+
88+
"A node using the akka protocol" must {
89+
90+
"be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest in {
91+
92+
val clusterTestUtil = new ClusterTestUtil(system.name)
93+
try {
94+
// start the first node with the "pekko" protocol
95+
clusterTestUtil.newActorSystem(configWithPekkoUdp)
96+
97+
// have a node using the "akka" protocol join
98+
val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp)
99+
clusterTestUtil.formCluster()
100+
101+
awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
102+
} finally {
103+
clusterTestUtil.shutdownAll()
104+
}
105+
}
106+
107+
"be allowed to join a cluster with a node using the pekko protocol (tcp)" taggedAs LongRunningTest in {
108+
109+
val clusterTestUtil = new ClusterTestUtil(system.name)
110+
try {
111+
// start the first node with the "pekko" protocol
112+
clusterTestUtil.newActorSystem(configWithPekkoTcp)
113+
114+
// have a node using the "akka" protocol join
115+
val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp)
116+
clusterTestUtil.formCluster()
117+
118+
awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
119+
} finally {
120+
clusterTestUtil.shutdownAll()
121+
}
122+
}
123+
124+
"be allowed to join a cluster with a node using the pekko protocol (netty)" taggedAs LongRunningTest in {
125+
126+
val clusterTestUtil = new ClusterTestUtil(system.name)
127+
try {
128+
// start the first node with the "pekko" protocol
129+
clusterTestUtil.newActorSystem(configWithPekkoNetty)
130+
131+
// have a node using the "akka" protocol join
132+
val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty)
133+
clusterTestUtil.formCluster()
134+
135+
awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
136+
} finally {
137+
clusterTestUtil.shutdownAll()
138+
}
139+
}
140+
141+
"allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest in {
142+
143+
val clusterTestUtil = new ClusterTestUtil(system.name)
144+
try {
145+
// create the first node with the "akka" protocol
146+
clusterTestUtil.newActorSystem(configWithAkkaUdp)
147+
148+
// have a node using the "pekko" protocol join
149+
val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp)
150+
clusterTestUtil.formCluster()
151+
152+
awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
153+
} finally {
154+
clusterTestUtil.shutdownAll()
155+
}
156+
}
157+
158+
"allow a node using the pekko protocol to join the cluster (tcp)" taggedAs LongRunningTest in {
159+
160+
val clusterTestUtil = new ClusterTestUtil(system.name)
161+
try {
162+
// create the first node with the "akka" protocol
163+
clusterTestUtil.newActorSystem(configWithAkkaTcp)
164+
165+
// have a node using the "pekko" protocol join
166+
val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp)
167+
clusterTestUtil.formCluster()
168+
169+
awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
170+
} finally {
171+
clusterTestUtil.shutdownAll()
172+
}
173+
}
174+
175+
"allow a node using the pekko protocol to join the cluster (netty)" taggedAs LongRunningTest in {
176+
177+
val clusterTestUtil = new ClusterTestUtil(system.name)
178+
try {
179+
// create the first node with the "akka" protocol
180+
clusterTestUtil.newActorSystem(configWithAkkaNetty)
181+
182+
// have a node using the "pekko" protocol join
183+
val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty)
184+
clusterTestUtil.formCluster()
185+
186+
awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'")
187+
} finally {
188+
clusterTestUtil.shutdownAll()
189+
}
190+
}
191+
}
192+
}

remote/src/main/scala/org/apache/pekko/remote/Remoting.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
225225
Await.result(addressesPromise.future, StartupTimeout.duration)
226226
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)
227227

228-
transportMapping = transports
228+
val mapping = transports
229229
.groupBy {
230230
case (transport, _) => transport.schemeIdentifier
231231
}
232232
.map { case (k, v) => k -> v.toSet }
233+
transportMapping = addProtocolsToMap(mapping)
233234

234235
defaultAddress = transports.head._2
235236
addresses = transports.map { _._2 }.toSet
@@ -296,6 +297,21 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
296297
}
297298
}
298299
}
300+
301+
private def addProtocolsToMap(
302+
map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String, Set[(PekkoProtocolTransport, Address)]] = {
303+
if (AcceptProtocolNames.size > 1) {
304+
map.flatMap { case (protocol, transports) =>
305+
val tcpProtocol = protocol.endsWith(".tcp")
306+
AcceptProtocolNames.map { newProtocol =>
307+
if (tcpProtocol)
308+
s"$newProtocol.tcp" -> transports
309+
else
310+
newProtocol -> transports
311+
}
312+
}
313+
} else map
314+
}
299315
}
300316

301317
/**
@@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
567583
}
568584

569585
OneForOneStrategy(loggingEnabled = false) {
570-
case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) =>
586+
case InvalidAssociation(localAddress, remoteAddress, reason, disassociationInfo) =>
571587
keepQuarantinedOr(remoteAddress) {
572588
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
573589
log.warning(
@@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
580596
causedBy)
581597
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
582598
}
583-
disassiciationInfo.foreach {
599+
disassociationInfo.foreach {
584600
case AssociationHandle.Quarantined =>
585601
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress))
586602
case _ => // do nothing

0 commit comments

Comments
 (0)