Skip to content

Commit d573805

Browse files
authored
chore: Java AbstractShardAllocationStrategy API should return CompletionStage (#32809)
1 parent 617ad47 commit d573805

File tree

1 file changed

+75
-3
lines changed

1 file changed

+75
-3
lines changed

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package akka.cluster.sharding
66

77
import java.util.UUID
8+
import java.util.concurrent.CompletionStage
89

910
import scala.annotation.nowarn
1011
import scala.collection.immutable
@@ -197,9 +198,15 @@ object ShardCoordinator {
197198

198199
/**
199200
* Java API: Java implementations of custom shard allocation and rebalancing logic used by the [[ShardCoordinator]]
200-
* should extend this abstract class and implement the two methods.
201+
* should extend this abstract class and override [[allocateNewShard]] and [[rebalanceShard]].
202+
*
203+
* Earlier versions of this API had different extension points. Overriding those is still supported
204+
* but they may be removed in a future release.
201205
*/
202206
abstract class AbstractShardAllocationStrategy extends ShardAllocationStrategy {
207+
import java.util.concurrent.CompletableFuture
208+
209+
@nowarn("cat=deprecation") // calls deprecated allocateShard
203210
override final def allocateShard(
204211
requester: ActorRef,
205212
shardId: ShardId,
@@ -209,6 +216,7 @@ object ShardCoordinator {
209216
allocateShard(requester, shardId, currentShardAllocations.asJava)
210217
}
211218

219+
@nowarn("cat=deprecation") // calls deprecated rebalance
212220
override final def rebalance(
213221
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
214222
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
@@ -220,6 +228,12 @@ object ShardCoordinator {
220228
/**
221229
* Invoked when the location of a new shard is to be decided.
222230
*
231+
* The default implementation defers to [[allocateNewShard]]. In earlier versions of this API, this
232+
* Scala Future-returning method was the extension point.
233+
*
234+
* New implementations of this class should prefer to override [[allocateNewShard]]. This method may be removed
235+
* in a future release.
236+
*
223237
* @param requester actor reference to the [[ShardRegion]] that requested the location of the
224238
* shard, can be returned if preference should be given to the node where the shard was first accessed
225239
* @param shardId the id of the shard to allocate
@@ -228,23 +242,81 @@ object ShardCoordinator {
228242
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
229243
* the references included in the `currentShardAllocations` parameter
230244
*/
245+
@deprecated("prefer allocateNewShard", "2.10.10")
231246
def allocateShard(
232247
requester: ActorRef,
233248
shardId: String,
234-
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): Future[ActorRef]
249+
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): Future[ActorRef] = {
250+
import scala.jdk.FutureConverters.CompletionStageOps
251+
allocateNewShard(requester, shardId, currentShardAllocations).asScala
252+
}
253+
254+
/**
255+
* Invoked when the location of a new shard is to be decided.
256+
*
257+
* New implementations of this class should override this method instead of overriding [[allocateShard]].
258+
*
259+
* For compatibility with earlier versions of this API, this method's default implementation returns
260+
* an immediately-failing [[java.util.concurrent.CompletionStage]].
261+
*
262+
* @param requester actor reference to the [[ShardRegion]] that requested the location of the
263+
* shard, can be returned if preference should be given to the node where the shard was first accessed
264+
* @param shardId the id of the shard to allocate
265+
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
266+
* in the order they were allocated
267+
* @return a [[java.util.concurrent.CompletionStage]] of the actor ref of the [[ShardRegion]] that is to be responsible for the shard,
268+
* must be one of the references included in the `currentShardAllocations` parameter
269+
*/
270+
@nowarn("msg=never used")
271+
def allocateNewShard(
272+
requester: ActorRef,
273+
shardId: String,
274+
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): CompletionStage[ActorRef] =
275+
CompletableFuture.failedStage(new scala.NotImplementedError("Must override allocateNewShard or allocateShard"))
235276

236277
/**
237278
* Invoked periodically to decide which shards to rebalance to another location.
238279
*
280+
* The default implementation defers to [[rebalanceShards]]. In earlier versions of this API, this
281+
* Scala Future-returning method was the extension point.
282+
*
283+
* New implementations of this class should prefer to override [[rebalanceShards]]. This method may be removed in a
284+
* future release
285+
*
239286
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
240287
* in the order they were allocated
241288
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
242289
* you should not include these in the returned set
243290
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
244291
*/
292+
@deprecated("prefer rebalanceShards", "2.10.10")
245293
def rebalance(
246294
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]],
247-
rebalanceInProgress: java.util.Set[String]): Future[java.util.Set[String]]
295+
rebalanceInProgress: java.util.Set[String]): Future[java.util.Set[String]] = {
296+
import scala.jdk.FutureConverters.CompletionStageOps
297+
rebalanceShards(currentShardAllocations, rebalanceInProgress).asScala
298+
}
299+
300+
/**
301+
* Invoked periodically to decide which shards to rebalance to another location.
302+
*
303+
* New implementations of this class should override this method instead of overriding [[rebalance]].
304+
*
305+
* For compatibility with earlier versions of this API, this method's default implementation returns
306+
* an immediately-failing [[java.util.concurrent.CompletionStage]].
307+
*
308+
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
309+
* in the order they were allocated
310+
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
311+
* you should not include these in the returned set
312+
* @return a [[java.util.concurrent.CompletionStage]] of the set of shards to be migrated, may be empty to skip rebalance in this round
313+
*/
314+
@nowarn("msg=never used")
315+
def rebalanceShards(
316+
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]],
317+
rebalanceInProgress: java.util.Set[String]): CompletionStage[java.util.Set[String]] =
318+
CompletableFuture.failedStage(new scala.NotImplementedError("Must override rebalanceShard or rebalance"))
319+
248320
}
249321

250322
private val emptyRebalanceResult = Future.successful(Set.empty[ShardId])

0 commit comments

Comments
 (0)