Skip to content

Commit e6e0b75

Browse files
committed
Gracefully leave the cluster and terminate the container instance when no running user actors left
1 parent b5df6cb commit e6e0b75

4 files changed

Lines changed: 76 additions & 24 deletions

File tree

akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,16 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
4848
instanceStorage.stop()
4949
}
5050

51-
private def launchUserActors: Unit = {
51+
private def launchUserActors(): Unit = {
5252
userActors.foreach(actor => {
5353
log.debug(s"Deploying actor ${actor.name} (${actor.fqn})")
5454
val clazz = Class.forName(actor.fqn)
55-
context.actorOf(Props(clazz), actor.name)
55+
val userActorRef = context.actorOf(Props(clazz), actor.name)
56+
context.watch(userActorRef)
5657
})
5758
}
5859

59-
private def notifyMonitoringService: Unit = {
60+
private def notifyMonitoringService(): Unit = {
6061
try {
6162
thisInstance.foreach(info => {
6263
val monitoringService = MonitoringService.createRemote(context.system)
@@ -69,7 +70,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
6970
}
7071
}
7172

72-
private def registerThisInstance: Unit = {
73+
private def registerThisInstance(): Unit = {
7374
if (!thisInstance.isDefined) {
7475
val actors = context.children.map(r => r.path.toStringWithoutAddress)
7576
val info = InstanceInfo(
@@ -91,11 +92,16 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
9192
})
9293
}
9394

95+
private def terminateThisInstance(): Unit = {
96+
cluster.leave(cluster.selfAddress)
97+
cluster.registerOnMemberRemoved(context.system.terminate())
98+
}
99+
94100
private def initializedReceive: Receive = {
95101
case _: InstanceId =>
96102
// The record was successfully saved to a storage.
97103
log.debug("Successfully registered this instance")
98-
notifyMonitoringService
104+
notifyMonitoringService()
99105
case OperationFailed(_, e) =>
100106
// Failed to save the record to a storage.
101107
log.error(e, "Failed to store this instance information. " +
@@ -105,24 +111,25 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
105111
self, RetryRegistration)
106112
case RetryRegistration =>
107113
log.info("Retrying instance registration process")
108-
registerThisInstance
114+
registerThisInstance()
109115
case StopInstance =>
110116
log.info("Termination command received. Stopping this instance")
111-
cluster.leave(cluster.selfAddress)
112-
context.system.terminate()
117+
terminateThisInstance()
118+
case Terminated(_) =>
119+
if (context.children.isEmpty) {
120+
log.info("No running user actors left. Terminating this instance")
121+
terminateThisInstance()
122+
}
113123
case JoinClusterTimeout =>
114124
// Safely ignore the timeout command.
115125
}
116126

117127
private def joiningTheClusterReceive: Receive = {
118-
case MemberUp(member) =>
119-
if (member.address == cluster.selfAddress) {
120-
log.debug("Successfully joined the cluster")
121-
launchUserActors
122-
cluster.unsubscribe(self)
123-
context.become(initializedReceive)
124-
registerThisInstance
125-
}
128+
case InstanceJoinedCluster =>
129+
log.debug("Successfully joined the cluster")
130+
launchUserActors()
131+
context.become(initializedReceive)
132+
registerThisInstance()
126133
case JoinClusterTimeout =>
127134
log.error(s"Couldn't join the cluster during ${joinClusterTimeout.toSeconds} seconds. " +
128135
"Terminating this instance...")
@@ -134,7 +141,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
134141
context.become(joiningTheClusterReceive)
135142
log.debug(s"Joining the cluster (master: $masterAddress)")
136143
cluster.join(masterAddress)
137-
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberUp])
144+
cluster.registerOnMemberUp(self ! InstanceJoinedCluster)
138145
// Scheduling a timeout command.
139146
context.system.scheduler.scheduleOnce(joinClusterTimeout, self, JoinClusterTimeout)
140147
}
@@ -146,6 +153,7 @@ object ContainerInstanceService {
146153
private case object JoinCluster
147154
private case object RetryRegistration
148155
private case object JoinClusterTimeout
156+
private case object InstanceJoinedCluster
149157
private[akkeeper] val DefaultRegistrationRetryInterval = 30 seconds
150158
private[akkeeper] val DefaultJoinClusterTimeout = 120 seconds
151159

akkeeper/src/main/scala/akkeeper/master/service/MonitoringService.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private[akkeeper] class MonitoringService(instanceStorage: InstanceStorage.Async
6060
}
6161

6262
private def memberAutoDown(instanceId: InstanceId, instanceAddr: UniqueAddress): Unit = {
63-
val autoDownService = MemberAutoDownService.createLocal(context.system,
63+
val autoDownService = MemberAutoDownService.createLocal(context,
6464
instanceAddr, instanceId, instanceStorage)
6565
autoDownService ! MemberAutoDownService.PollInstanceStatus
6666
}

akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package akkeeper.container.service
1717

1818
import akka.actor._
1919
import akka.cluster.{Cluster, UniqueAddress}
20-
import akka.testkit.{ImplicitSender, TestKit}
20+
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
2121
import akkeeper.{ActorTestUtils, AkkeeperException}
2222
import akkeeper.common._
2323
import akkeeper.master.service._
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory
2727
import org.scalamock.scalatest.MockFactory
2828
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
2929

30-
import scala.concurrent.Future
30+
import scala.concurrent.{Future, Promise}
3131
import scala.concurrent.duration._
3232
import ContainerInstanceService._
3333
import ContainerInstanceServiceSpec._
@@ -50,14 +50,16 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
5050
}
5151

5252
private def createExpectedInstanceInfo(instanceId: InstanceId,
53-
addr: UniqueAddress): InstanceInfo = {
53+
addr: UniqueAddress,
54+
actorPath: String = "/system/testActor-1/akkeeperInstance/testActor"
55+
): InstanceInfo = {
5456
InstanceInfo(
5557
instanceId = instanceId,
5658
status = InstanceUp,
5759
containerName = instanceId.containerName,
5860
roles = Set("akkeeperMaster", "dc-default"),
5961
address = Some(addr),
60-
actors = Set("/system/testActor-1/akkeeperInstance/testActor")
62+
actors = Set(actorPath)
6163
)
6264
}
6365

@@ -98,7 +100,6 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
98100
expectMsg(TestPong)
99101

100102
gracefulActorStop(service)
101-
gracefulActorStop(masterServiceMock)
102103
}
103104

104105
it should "retry if the registration failed" in {
@@ -124,7 +125,6 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
124125
val maxWaitForNoMsg = numberOfAttempts.seconds
125126
expectNoMessage(maxWaitForNoMsg)
126127

127-
gracefulActorStop(masterServiceMock)
128128
gracefulActorStop(service)
129129
}
130130

@@ -145,6 +145,48 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
145145

146146
await(newSystem.whenTerminated)
147147
}
148+
149+
it should "terminate this instance if all user actors have been terminated" in {
150+
val newSystem = ActorSystem("ContainerInstanceServiceSpecTemp",
151+
ConfigFactory.load().withMasterPort.withMasterRole)
152+
val newCluster = Cluster(newSystem)
153+
val instanceId = InstanceId("container")
154+
val expectedInstanceInfo = createExpectedInstanceInfo(instanceId,
155+
newCluster.selfUniqueAddress, actorPath = "/user/akkeeperInstance/testActor")
156+
157+
val storage = mock[InstanceStorage.Async]
158+
(storage.start _).expects()
159+
(storage.stop _).expects()
160+
(storage.registerInstance _)
161+
.expects(expectedInstanceInfo)
162+
.returns(Future successful instanceId)
163+
164+
val actors = Seq(ActorLaunchContext("testActor", classOf[TestUserActor].getName))
165+
166+
ContainerInstanceService.createLocal(newSystem, actors, storage, instanceId, newCluster.selfAddress)
167+
168+
// Verify that the user actor was actually launched.
169+
val testProbe = TestProbe()(newSystem)
170+
val userActor = newSystem.actorSelection("/user/akkeeperInstance/testActor")
171+
172+
val joinPromise = Promise[Unit]
173+
newCluster.registerOnMemberUp(joinPromise.success(()))
174+
await(joinPromise.future)
175+
val waitTimeoutMs = 1000
176+
Thread.sleep(waitTimeoutMs)
177+
178+
userActor.tell(TestPing, testProbe.ref)
179+
testProbe.expectMsg(TestPong)
180+
181+
// Terminate the user actor.
182+
userActor ! TestTerminate
183+
184+
val leavePromise = Promise[Unit]
185+
newCluster.registerOnMemberRemoved(leavePromise.success(()))
186+
await(leavePromise.future)
187+
188+
await(newSystem.whenTerminated)
189+
}
148190
}
149191

150192
object ContainerInstanceServiceSpec {

akkeeper/src/test/scala/akkeeper/container/service/TestUserActor.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import TestUserActor._
2121
class TestUserActor extends Actor {
2222
override def receive: Receive = {
2323
case TestPing => sender() ! TestPong
24+
case TestTerminate => context.stop(self)
2425
}
2526
}
2627

2728
object TestUserActor {
2829
case object TestPing
2930
case object TestPong
31+
case object TestTerminate
3032
}

0 commit comments

Comments
 (0)