diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index fc02882a35a..fe9fb06f655 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -313,6 +313,12 @@ import akka.util.{ ByteString, Timeout } } } + override def preResolveShard[M, E](entity: scaladsl.Entity[M, E], shard: String): Unit = + classicSharding.preResolveShard(entity.typeKey.name, shard) + + override def preResolveShard[M, E](entity: javadsl.Entity[M, E], shard: String): Unit = + classicSharding.preResolveShard(entity.typeKey.name, shard) + override lazy val shardState: ActorRef[ClusterShardingQuery] = { import akka.actor.typed.scaladsl.adapter._ val behavior = ShardingState.behavior(classicSharding) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 95fd8be391c..c0712e45810 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -225,6 +225,13 @@ abstract class ClusterSharding { * The default `ShardAllocationStrategy` is configured by `least-shard-allocation-strategy` properties. */ def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy + + /** + * Direct the shard region or proxy actor for the given entity to resolve the location of the given shard + * and cache it. This may result in the shard being allocated on some node in the cluster. No message will + * be sent to any entity within the shard. + */ + def preResolveShard[M, E](entity: Entity[M, E], shard: String): Unit } object Entity { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index cbf7c05be0a..26b835cdbbd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -232,6 +232,12 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding = */ @InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf + /** + * Direct the shard region or proxy actor for the given entity to resolve the location of the given shard + * and cache it. This may result in the shard being allocated on some node in the cluster. No message will + * be sent to any entity within the shard. + */ + def preResolveShard[M, E](entity: Entity[M, E], shard: String): Unit } object Entity { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index a3bc03942c1..3db2d8a1497 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -649,6 +649,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } } + /** + * Direct the [[ShardRegion]] actor responsible for the named entity type to resolve the location + * of the given shard and cache it. If the `ShardRegion` already knows the location, it will not do anything, + * otherwise it will request the home of the shard, which may result in the shard being allocated on + * some node in the cluster. No message will be sent to any entity within the shard. + */ + def preResolveShard(typeName: String, shard: ShardRegion.ShardId): Unit = { + shardRegion(typeName) ! ShardRegion.ResolveShard(shard) + } + /** * Retrieve the actor reference of the [[ShardRegion]] actor that will act as a proxy to the * named entity type running in another data center. A proxy within the same data center can be accessed diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 9d35b5e7ad0..e314cbc0995 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -225,6 +225,14 @@ object ShardRegion { */ final case class ShardInitialized(shardId: ShardId) + /** + * Sent this message to the `ShardRegion` actor to request that the `ShardRegion` attempt to resolve + * this shard without sending a message to any entity within the shard. If the `ShardRegion` does not + * know the location of the shard, it will request the shard's location from the `ShardCoordinator`; if + * the `ShardCoordinator` has not allocated the shard, the `ShardCoordinator` will allocate the shard. + */ + case class ResolveShard(shard: ShardId) extends ShardRegionCommand + sealed trait ShardRegionQuery /** @@ -438,7 +446,7 @@ object ShardRegion { * * Discover if the shard region is registered with the coordinator. * Not serializable as only to be sent to the local shard region - * Response is [[ShardRegionState]] + * Response is [[ShardRegionStatus]] */ @InternalApi private[akka] object GetShardRegionStatus extends ShardRegionQuery @@ -1022,6 +1030,19 @@ private[akka] class ShardRegion( shardBuffers.totalSize) context.stop(self) + case ResolveShard(shardId) => + if (shardId == null || shardId == "") { + log.warning("{}: Shard must not be empty, not resolving", typeName) + } else + regionByShard.get(shardId) match { + case Some(_) => () // already resolved + case None => + if (!shardBuffers.contains(shardId)) { + log.debug("{}: Request shard [{}] home. Coordinator [{}]", typeName, shardId, coordinator) + coordinator.foreach(_ ! GetShardHome(shardId)) + } + } + case _ => unhandled(cmd) } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala index eef3afea80d..eec8817e115 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala @@ -14,6 +14,10 @@ import akka.cluster.{ Cluster, MemberStatus } import akka.cluster.ClusterEvent.CurrentClusterState import akka.testkit.{ AkkaSpec, DeadLettersFilter, TestProbe, WithLogCapturing } import akka.testkit.TestEvent.Mute +import akka.cluster.sharding.ShardRegion.GetClusterShardingStats +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.ShardRegion.ResolveShard object ShardRegionSpec { val host = "127.0.0.1" @@ -124,9 +128,51 @@ class ShardRegionSpec extends AkkaSpec(ShardRegionSpec.config) with WithLogCaptu } } + val coordinator = sysA.actorSelection(s"/system/sharding/${shardTypeName}Coordinator/singleton/coordinator") + within(2.seconds) { + awaitAssert { + coordinator.tell(GetClusterShardingStats(1.second), testActor) + + // No shards allocated + val regions = expectMsgType[ClusterShardingStats].regions + regions.size shouldBe 2 + regions.foreach { + case (_, regionStats) => + regionStats.stats shouldBe empty + regionStats.failed shouldBe empty + } + } + } + region1.tell(1, p1.ref) p1.expectMsg(1) + within(1.second) { + awaitAssert { + coordinator.tell(GetClusterShardingStats(1.second), testActor) + val regions = expectMsgType[ClusterShardingStats].regions + regions.size shouldBe 2 + regions.map(_._2.failed.size).sum shouldBe 0 + regions.foldLeft(Set.empty[ShardId]) { (shards, kv) => + shards ++ kv._2.stats.keySet + } should contain theSameElementsAs (Set("1")) + } + } + + // pre-allocate shard 2 from region 1 + region1 ! ResolveShard("2") + within(1.second) { + awaitAssert { + coordinator.tell(GetClusterShardingStats(1.second), testActor) + val regions = expectMsgType[ClusterShardingStats].regions + regions.size shouldBe 2 + regions.map(_._2.failed.size).sum shouldBe 0 + regions.foldLeft(Set.empty[ShardId]) { (shards, kv) => + shards ++ kv._2.stats.keySet + } should contain theSameElementsAs (Set("1", "2")) + } + } + region2.tell(2, p2.ref) p2.expectMsg(2)