Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ import akka.util.{ ByteString, Timeout }
}
}

override def preResolveShard[M, E](entity: scaladsl.Entity[M, E], shard: String): Unit =
classicSharding.preResolveShard(entity.typeKey.name, shard)

override def preResolveShard[M, E](entity: javadsl.Entity[M, E], shard: String): Unit =
classicSharding.preResolveShard(entity.typeKey.name, shard)

override lazy val shardState: ActorRef[ClusterShardingQuery] = {
import akka.actor.typed.scaladsl.adapter._
val behavior = ShardingState.behavior(classicSharding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ abstract class ClusterSharding {
* The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy

/**
* Direct the shard region or proxy actor for the given entity to resolve the location of the given shard
* and cache it. This may result in the shard being allocated on some node in the cluster. No message will
* be sent to any entity within the shard.
*/
def preResolveShard[M, E](entity: Entity[M, E], shard: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should follow the existing convention here too, that it's actor messages. We have that for the ClusterShardingQuery and ShardCommand/Passivate. I think we can expand the ShardCommand with this PreResolveShard message.

}

object Entity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding =
*/
@InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf

/**
* Direct the shard region or proxy actor for the given entity to resolve the location of the given shard
* and cache it. This may result in the shard being allocated on some node in the cluster. No message will
* be sent to any entity within the shard.
*/
def preResolveShard[M, E](entity: Entity[M, E], shard: String): Unit
}

object Entity {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
}
}

/**
* Direct the [[ShardRegion]] actor responsible for the named entity type to resolve the location
* of the given shard and cache it. If the `ShardRegion` already knows the location, it will not do anything,
* otherwise it will request the home of the shard, which may result in the shard being allocated on
* some node in the cluster. No message will be sent to any entity within the shard.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it even be possible to automate this (if feature is enabled in config)? For remember entities we have already unallocatedShards in the Coordinator State. Could we make use of that and automatically pre-allocate shards from the coordinator? Once a shard has been in use, it will always be allocated again as soon as possible (best effort).

*/
def preResolveShard(typeName: String, shard: ShardRegion.ShardId): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the classic api we should follow the existing convention to have this as public message that is sent to the shardRegion instead of via this additional method. See for example GracefulShutdown

shardRegion(typeName) ! ShardRegion.ResolveShard(shard)
}

/**
* Retrieve the actor reference of the [[ShardRegion]] actor that will act as a proxy to the
* named entity type running in another data center. A proxy within the same data center can be accessed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ object ShardRegion {
*/
final case class ShardInitialized(shardId: ShardId)

/**
* Sent this message to the `ShardRegion` actor to request that the `ShardRegion` attempt to resolve
* this shard without sending a message to any entity within the shard. If the `ShardRegion` does not
* know the location of the shard, it will request the shard's location from the `ShardCoordinator`; if
* the `ShardCoordinator` has not allocated the shard, the `ShardCoordinator` will allocate the shard.
*/
case class ResolveShard(shard: ShardId) extends ShardRegionCommand

sealed trait ShardRegionQuery

/**
Expand Down Expand Up @@ -438,7 +446,7 @@ object ShardRegion {
*
* Discover if the shard region is registered with the coordinator.
* Not serializable as only to be sent to the local shard region
* Response is [[ShardRegionState]]
* Response is [[ShardRegionStatus]]
*/
@InternalApi
private[akka] object GetShardRegionStatus extends ShardRegionQuery
Expand Down Expand Up @@ -1022,6 +1030,19 @@ private[akka] class ShardRegion(
shardBuffers.totalSize)
context.stop(self)

case ResolveShard(shardId) =>
if (shardId == null || shardId == "") {
log.warning("{}: Shard must not be empty, not resolving", typeName)
} else
regionByShard.get(shardId) match {
case Some(_) => () // already resolved
case None =>
if (!shardBuffers.contains(shardId)) {
log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
}
}

case _ => unhandled(cmd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import akka.cluster.{ Cluster, MemberStatus }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.testkit.{ AkkaSpec, DeadLettersFilter, TestProbe, WithLogCapturing }
import akka.testkit.TestEvent.Mute
import akka.cluster.sharding.ShardRegion.GetClusterShardingStats
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.ShardRegion.ResolveShard

object ShardRegionSpec {
val host = "127.0.0.1"
Expand Down Expand Up @@ -124,9 +128,51 @@ class ShardRegionSpec extends AkkaSpec(ShardRegionSpec.config) with WithLogCaptu
}
}

val coordinator = sysA.actorSelection(s"/system/sharding/${shardTypeName}Coordinator/singleton/coordinator")
within(2.seconds) {
awaitAssert {
coordinator.tell(GetClusterShardingStats(1.second), testActor)

// No shards allocated
val regions = expectMsgType[ClusterShardingStats].regions
regions.size shouldBe 2
regions.foreach {
case (_, regionStats) =>
regionStats.stats shouldBe empty
regionStats.failed shouldBe empty
}
}
}

region1.tell(1, p1.ref)
p1.expectMsg(1)

within(1.second) {
awaitAssert {
coordinator.tell(GetClusterShardingStats(1.second), testActor)
val regions = expectMsgType[ClusterShardingStats].regions
regions.size shouldBe 2
regions.map(_._2.failed.size).sum shouldBe 0
regions.foldLeft(Set.empty[ShardId]) { (shards, kv) =>
shards ++ kv._2.stats.keySet
} should contain theSameElementsAs (Set("1"))
}
}

// pre-allocate shard 2 from region 1
region1 ! ResolveShard("2")
within(1.second) {
awaitAssert {
coordinator.tell(GetClusterShardingStats(1.second), testActor)
val regions = expectMsgType[ClusterShardingStats].regions
regions.size shouldBe 2
regions.map(_._2.failed.size).sum shouldBe 0
regions.foldLeft(Set.empty[ShardId]) { (shards, kv) =>
shards ++ kv._2.stats.keySet
} should contain theSameElementsAs (Set("1", "2"))
}
}

region2.tell(2, p2.ref)
p2.expectMsg(2)

Expand Down
Loading