From 7b38dc57633c0be8fea879ccebabcde3f655937b Mon Sep 17 00:00:00 2001 From: Jon Moore Date: Wed, 12 Feb 2014 16:12:36 -0500 Subject: [PATCH] Allow for a Sirius initialization callback. Adds some messages to the SiriusSupervisor so that you can send it a message to register interest in an initialization event and get a message back once that has occurred. Added an onInitialized() method to SiriusImpl to hook that up to a callback. Also defined a new trait/interface `Sirius1Dot2` that extends `Sirius`. This seems like a good way to track the additional methods without losing backwards compatibility with version 1.1.4. In theory, this new interface would be modifiable up until the next release (which should be 1.2.0 in this case). Discussion points: * onInitialized takes a java.lang.Runnable as a callback to facilitate calling it from Java. Kinda kludgy, since from Scala it would just want a lazy `=> Unit`. * Does the approach with the additional interface seem ok? --- .../api/impl/Sirius1Dot2Extensions.scala | 36 +++++++++++++++++++ .../xfinity/sirius/api/impl/SiriusImpl.scala | 11 ++++-- .../sirius/api/impl/SiriusSupervisor.scala | 9 +++++ .../sirius/api/impl/SiriusImplTest.scala | 17 +++++++++ .../api/impl/SiriusSupervisorTest.scala | 26 ++++++++++++++ 5 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala new file mode 100644 index 00000000..ef0514ed --- /dev/null +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/Sirius1Dot2Extensions.scala @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.comcast.xfinity.sirius.api.impl + +import com.comcast.xfinity.sirius.api.Sirius + +/** + * These are additional methods supported by an expanded Sirius interface. + */ +trait Sirius1Dot2Extensions { + /** + * Register a callback to be invoked once the Sirius subsystem has been + * initialized (i.e. log replay has completed). This may be called + * multiple times to install multiple init hook callbacks; each will be + * called once upon initialization. If Sirius has already been + * initialized, the callback will be invoked right away. + * + * @param initHook callback to run + */ + def onInitialized(initHook: Runnable): Unit +} + +trait Sirius1Dot2 extends Sirius with Sirius1Dot2Extensions diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala index 55a6262e..195189e0 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala @@ -25,11 +25,12 @@ import akka.actor._ import java.util.concurrent.Future import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import com.comcast.xfinity.sirius.api.SiriusConfiguration -import scala.concurrent.{Await, Future => AkkaFuture} +import scala.concurrent.{Future => AkkaFuture, ExecutionContext, Await} import akka.util.Timeout import scala.concurrent.duration._ import status.NodeStats.FullNodeStatus import status.StatusWorker._ +import scala.concurrent.ExecutionContext.Implicits.global object SiriusImpl { @@ -61,7 +62,7 @@ object SiriusImpl { * @param actorSystem the actorSystem to use to create the Actors for Sirius */ class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val actorSystem: ActorSystem) - extends Sirius { + extends Sirius1Dot2 { val supName = config.getProp(SiriusConfiguration.SIRIUS_SUPERVISOR_NAME, "sirius") implicit val timeout: Timeout = @@ -131,6 +132,12 @@ class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val acto onShutdownHook = Some(() => shutdownHook) } + def onInitialized(initHook: Runnable) { + (supervisor ? SiriusSupervisor.RegisterInitHook) onSuccess { + case _ => initHook.run() + } + } + /** * Terminate this instance. Shuts down all associated Actors. */ diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala index 5d1d027e..4f2a92bb 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisor.scala @@ -44,6 +44,8 @@ object SiriusSupervisor { case object CheckPaxosMembership extends SupervisorMessage case class IsInitializedResponse(initialized: Boolean) + case object RegisterInitHook + case object Initialized /** * Factory for creating the children actors of SiriusSupervisor. @@ -133,6 +135,8 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu val membershipCheckSchedule = context.system.scheduler. schedule(0 seconds, checkIntervalSecs seconds, self, CheckPaxosMembership) + var initHookClients : Set[ActorRef] = Set.empty + override def postStop() { membershipCheckSchedule.cancel() } @@ -149,9 +153,13 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu context.become(initialized) sender ! SiriusSupervisor.IsInitializedResponse(initialized = true) + + initHookClients foreach { _ ! SiriusSupervisor.Initialized } + initHookClients = Set.empty } else { sender ! SiriusSupervisor.IsInitializedResponse(initialized = false) } + case SiriusSupervisor.RegisterInitHook => initHookClients += sender // Ignore other messages until Initialized. case _ => @@ -162,6 +170,7 @@ private[impl] class SiriusSupervisor(childProvider: ChildProvider, config: Siriu case logQuery: LogQuery => stateSup forward logQuery case membershipMessage: MembershipMessage => membershipActor forward membershipMessage case SiriusSupervisor.IsInitializedRequest => sender ! new SiriusSupervisor.IsInitializedResponse(true) + case SiriusSupervisor.RegisterInitHook => sender ! SiriusSupervisor.Initialized case statusQuery: StatusQuery => statusSubsystem forward statusQuery case compactionMessage: CompactionMessage => compactionManager match { case Some(actor) => actor forward compactionMessage diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala index b5dec599..d1da2fe4 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusImplTest.scala @@ -30,6 +30,9 @@ import com.comcast.xfinity.sirius.{TimedTest, NiceTest} import com.comcast.xfinity.sirius.api.{SiriusConfiguration, SiriusResult} import status.NodeStats.FullNodeStatus import status.StatusWorker._ +import scala.concurrent.ExecutionContext.Implicits.global +import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.{Initialized, RegisterInitHook} +import scala.concurrent.{Await, Promise} object SiriusImplTestCompanion { @@ -91,6 +94,9 @@ class SiriusImplTest extends NiceTest with TimedTest { case GetStatus => sender ! mockNodeStatus this + case RegisterInitHook => + sender ! Initialized + this } }) @@ -105,6 +111,17 @@ class SiriusImplTest extends NiceTest with TimedTest { } describe("a SiriusImpl") { + it("should send a RegisterInitHook message to the supervisor actor when onInitialized is called") { + underTest.onInitialized(new Runnable() { def run() { } }) + supervisorActorProbe.expectMsg(SiriusSupervisor.RegisterInitHook) + } + + it("should call the initHook once the supervisor actor responds with an Initialized message") { + val p : Promise[Boolean] = Promise() + underTest.onInitialized(new Runnable() { def run() { p.success(true) }}) + assert(Await.result(p.future, 50 millis)) + } + it("should send a Get message to the supervisor actor when enqueueGet is called") { val key = "hello" val getFuture = underTest.enqueueGet(key) diff --git a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala index 9caf90e3..271bcc63 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/api/impl/SiriusSupervisorTest.scala @@ -116,6 +116,32 @@ class SiriusSupervisorTest extends NiceTest with BeforeAndAfterAll with TimedTes waitForTrue(stateAgent().supervisorInitialized, 5000, 250) } + it("should reply to a registered initHook once initialized") { + val probe = TestProbe() + probe.send(supervisor, SiriusSupervisor.RegisterInitHook) + initializeSupervisor(supervisor) + probe.expectMsg(SiriusSupervisor.Initialized) + } + + it("should reply to all registered initHooks once initialized") { + val probe1 = TestProbe() + val probe2 = TestProbe() + probe1.send(supervisor, SiriusSupervisor.RegisterInitHook) + probe2.send(supervisor, SiriusSupervisor.RegisterInitHook) + initializeSupervisor(supervisor) + probe1.expectMsg(SiriusSupervisor.Initialized) + probe2.expectMsg(SiriusSupervisor.Initialized) + } + + it("should reply immediately to an initHook registration once already initialized") { + val probe = TestProbe() + val stateAgent = supervisor.underlyingActor.siriusStateAgent + initializeSupervisor(supervisor) + waitForTrue(stateAgent().supervisorInitialized, 5000, 250) + probe.send(supervisor, SiriusSupervisor.RegisterInitHook) + probe.expectMsg(SiriusSupervisor.Initialized) + } + it("should forward MembershipMessages to the membershipActor") { initializeSupervisor(supervisor) initializeOrdering(supervisor, Some(paxosProbe.ref))