Skip to content

Commit becc385

Browse files
committed
feat: add self-healing mechanism for the shard state
1 parent 4af0506 commit becc385

File tree

9 files changed

+2729
-6
lines changed

9 files changed

+2729
-6
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Self-healing settings added to ClusterShardingSettings constructor
2+
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ClusterShardingSettings.this")
3+
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ClusterShardingSettings.copy")
4+
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ClusterShardingSettings.copy$default$*")

akka-cluster-sharding/src/main/resources/reference.conf

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,39 @@ akka.cluster.sharding {
466466
# coordinator for that entity type.
467467
disabled-after = 10s
468468
}
469+
470+
# Self-healing configuration for automatic shard state cleanup when cluster coordination is impaired.
471+
# When enabled, shards allocated to regions that have been unreachable beyond a configurable threshold
472+
# will be automatically deallocated, enabling graceful degradation and recovery.
473+
self-healing {
474+
# Enable/disable the self-healing mechanism.
475+
# When enabled, shards from regions that have been unreachable beyond the
476+
# stale-region-timeout will be automatically deallocated and can be reallocated
477+
# to healthy regions.
478+
enabled = off
479+
480+
# Time threshold after which an unreachable region's shards are considered stale
481+
# and will be deallocated. This should be longer than expected network hiccups
482+
# but shorter than manual intervention time.
483+
# Recommended: At least 2x the failure-detector acceptable-heartbeat-pause (default 3s).
484+
stale-region-timeout = 30s
485+
486+
# How often to check for stale regions.
487+
# This interval controls the resolution of the timeout detection.
488+
# Smaller values detect staleness faster but add more CPU overhead.
489+
check-interval = 5s
490+
491+
# Grace period after coordinator startup before self-healing activates.
492+
# This prevents premature cleanup during cluster formation when nodes
493+
# may be temporarily unreachable as they join.
494+
startup-grace-period = 60s
495+
496+
# Whether to only log warnings instead of actually deallocating shards.
497+
# Useful for testing/monitoring the self-healing behavior before enabling
498+
# full automatic cleanup. When enabled, logs what would be deallocated
499+
# without actually performing the deallocation.
500+
dry-run = off
501+
}
469502
}
470503
# //#sharding-ext-config
471504

akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala

Lines changed: 141 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ object ClusterShardingSettings {
105105
))
106106
}
107107

108+
val selfHealingSettings = SelfHealingSettings(config.getConfig("self-healing"))
109+
108110
new ClusterShardingSettings(
109111
role = roleOption(config.getString("role")),
110112
rememberEntities = config.getBoolean("remember-entities"),
@@ -117,7 +119,8 @@ object ClusterShardingSettings {
117119
tuningParameters,
118120
config.getBoolean("coordinator-singleton-role-override"),
119121
coordinatorSingletonSettings,
120-
lease)
122+
lease,
123+
selfHealingSettings)
121124
}
122125

123126
/**
@@ -909,6 +912,95 @@ object ClusterShardingSettings {
909912
}
910913
}
911914

915+
/**
916+
* Settings for the self-healing mechanism that automatically removes stale shard state
917+
* when cluster coordination is impaired.
918+
*/
919+
object SelfHealingSettings {
920+
921+
/**
922+
* Create settings from a configuration with the same layout as
923+
* the default configuration `akka.cluster.sharding.self-healing`.
924+
*/
925+
def apply(config: Config): SelfHealingSettings = {
926+
new SelfHealingSettings(
927+
enabled = config.getBoolean("enabled"),
928+
staleRegionTimeout = config.getDuration("stale-region-timeout", MILLISECONDS).millis,
929+
checkInterval = config.getDuration("check-interval", MILLISECONDS).millis,
930+
startupGracePeriod = config.getDuration("startup-grace-period", MILLISECONDS).millis,
931+
dryRun = config.getBoolean("dry-run"))
932+
}
933+
934+
/**
935+
* Default settings with self-healing disabled.
936+
*/
937+
val disabled: SelfHealingSettings = new SelfHealingSettings(
938+
enabled = false,
939+
staleRegionTimeout = 30.seconds,
940+
checkInterval = 5.seconds,
941+
startupGracePeriod = 60.seconds,
942+
dryRun = false)
943+
}
944+
945+
/**
946+
* Settings for the self-healing mechanism that automatically removes stale shard state
947+
* when cluster coordination is impaired.
948+
*
949+
* @param enabled Whether self-healing is enabled
950+
* @param staleRegionTimeout Time threshold after which an unreachable region's shards are considered stale
951+
* @param checkInterval How often to check for stale regions
952+
* @param startupGracePeriod Grace period after coordinator startup before self-healing activates
953+
* @param dryRun If true, only log warnings without actually deallocating shards
954+
*/
955+
final class SelfHealingSettings(
956+
val enabled: Boolean,
957+
val staleRegionTimeout: FiniteDuration,
958+
val checkInterval: FiniteDuration,
959+
val startupGracePeriod: FiniteDuration,
960+
val dryRun: Boolean) {
961+
962+
require(staleRegionTimeout > Duration.Zero, "stale-region-timeout must be > 0")
963+
require(checkInterval > Duration.Zero, "check-interval must be > 0")
964+
require(startupGracePeriod >= Duration.Zero, "startup-grace-period must be >= 0")
965+
966+
def withEnabled(enabled: Boolean): SelfHealingSettings =
967+
copy(enabled = enabled)
968+
969+
def withStaleRegionTimeout(timeout: FiniteDuration): SelfHealingSettings =
970+
copy(staleRegionTimeout = timeout)
971+
972+
def withStaleRegionTimeout(timeout: java.time.Duration): SelfHealingSettings =
973+
copy(staleRegionTimeout = timeout.toScala)
974+
975+
def withCheckInterval(interval: FiniteDuration): SelfHealingSettings =
976+
copy(checkInterval = interval)
977+
978+
def withCheckInterval(interval: java.time.Duration): SelfHealingSettings =
979+
copy(checkInterval = interval.toScala)
980+
981+
def withStartupGracePeriod(period: FiniteDuration): SelfHealingSettings =
982+
copy(startupGracePeriod = period)
983+
984+
def withStartupGracePeriod(period: java.time.Duration): SelfHealingSettings =
985+
copy(startupGracePeriod = period.toScala)
986+
987+
def withDryRun(dryRun: Boolean): SelfHealingSettings =
988+
copy(dryRun = dryRun)
989+
990+
private def copy(
991+
enabled: Boolean = enabled,
992+
staleRegionTimeout: FiniteDuration = staleRegionTimeout,
993+
checkInterval: FiniteDuration = checkInterval,
994+
startupGracePeriod: FiniteDuration = startupGracePeriod,
995+
dryRun: Boolean = dryRun): SelfHealingSettings =
996+
new SelfHealingSettings(enabled, staleRegionTimeout, checkInterval, startupGracePeriod, dryRun)
997+
998+
override def toString: String =
999+
s"SelfHealingSettings(enabled=$enabled, staleRegionTimeout=$staleRegionTimeout, " +
1000+
s"checkInterval=$checkInterval, startupGracePeriod=$startupGracePeriod, " +
1001+
s"dryRun=$dryRun)"
1002+
}
1003+
9121004
class TuningParameters(
9131005
val coordinatorFailureBackoff: FiniteDuration,
9141006
val retryInterval: FiniteDuration,
@@ -1176,6 +1268,8 @@ object ClusterShardingSettings {
11761268
* Note that if you define a custom lease name and have several sharding entity types each one must have a unique
11771269
* lease name. If the lease name is undefined it will be derived from ActorSystem name and shard name,
11781270
* but that may result in too long lease names.
1271+
* @param selfHealingSettings Settings for the self-healing mechanism that automatically removes stale shard state
1272+
* when cluster coordination is impaired.
11791273
*/
11801274
final class ClusterShardingSettings(
11811275
val role: Option[String],
@@ -1189,8 +1283,40 @@ final class ClusterShardingSettings(
11891283
val tuningParameters: ClusterShardingSettings.TuningParameters,
11901284
val coordinatorSingletonOverrideRole: Boolean,
11911285
val coordinatorSingletonSettings: ClusterSingletonManagerSettings,
1192-
val leaseSettings: Option[LeaseUsageSettings])
1286+
val leaseSettings: Option[LeaseUsageSettings],
1287+
val selfHealingSettings: ClusterShardingSettings.SelfHealingSettings)
11931288
extends NoSerializationVerificationNeeded {
1289+
@deprecated(
1290+
"Use the ClusterShardingSettings factory methods or the constructor including selfHealingSettings instead",
1291+
"2.10.0")
1292+
def this(
1293+
role: Option[String],
1294+
rememberEntities: Boolean,
1295+
journalPluginId: String,
1296+
snapshotPluginId: String,
1297+
stateStoreMode: String,
1298+
rememberEntitiesStore: String,
1299+
passivationStrategySettings: ClusterShardingSettings.PassivationStrategySettings,
1300+
shardRegionQueryTimeout: FiniteDuration,
1301+
tuningParameters: ClusterShardingSettings.TuningParameters,
1302+
coordinatorSingletonOverrideRole: Boolean,
1303+
coordinatorSingletonSettings: ClusterSingletonManagerSettings,
1304+
leaseSettings: Option[LeaseUsageSettings]) =
1305+
this(
1306+
role,
1307+
rememberEntities,
1308+
journalPluginId,
1309+
snapshotPluginId,
1310+
stateStoreMode,
1311+
rememberEntitiesStore,
1312+
passivationStrategySettings,
1313+
shardRegionQueryTimeout,
1314+
tuningParameters,
1315+
coordinatorSingletonOverrideRole,
1316+
coordinatorSingletonSettings,
1317+
leaseSettings,
1318+
ClusterShardingSettings.SelfHealingSettings.disabled)
1319+
11941320
@deprecated(
11951321
"Use the ClusterShardingSettings factory methods or the constructor including coordinatorSingletonOverrideRole instead",
11961322
"2.6.20")
@@ -1218,7 +1344,8 @@ final class ClusterShardingSettings(
12181344
tuningParameters,
12191345
coordinatorSingletonOverrideRole = true,
12201346
coordinatorSingletonSettings,
1221-
leaseSettings)
1347+
leaseSettings,
1348+
ClusterShardingSettings.SelfHealingSettings.disabled)
12221349

12231350
@deprecated(
12241351
"Use the ClusterShardingSettings factory methods or the constructor including passivationStrategySettings instead",
@@ -1390,6 +1517,13 @@ final class ClusterShardingSettings(
13901517
coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings =
13911518
copy(coordinatorSingletonSettings = coordinatorSingletonSettings)
13921519

1520+
/**
1521+
* Configure the self-healing mechanism for automatic shard state cleanup.
1522+
*/
1523+
def withSelfHealingSettings(
1524+
selfHealingSettings: ClusterShardingSettings.SelfHealingSettings): ClusterShardingSettings =
1525+
copy(selfHealingSettings = selfHealingSettings)
1526+
13931527
private def copy(
13941528
role: Option[String] = role,
13951529
rememberEntities: Boolean = rememberEntities,
@@ -1401,7 +1535,8 @@ final class ClusterShardingSettings(
14011535
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
14021536
coordinatorSingletonOverrideRole: Boolean = coordinatorSingletonOverrideRole,
14031537
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings,
1404-
leaseSettings: Option[LeaseUsageSettings] = leaseSettings): ClusterShardingSettings =
1538+
leaseSettings: Option[LeaseUsageSettings] = leaseSettings,
1539+
selfHealingSettings: ClusterShardingSettings.SelfHealingSettings = selfHealingSettings): ClusterShardingSettings =
14051540
new ClusterShardingSettings(
14061541
role,
14071542
rememberEntities,
@@ -1414,5 +1549,6 @@ final class ClusterShardingSettings(
14141549
tuningParameters,
14151550
coordinatorSingletonOverrideRole,
14161551
coordinatorSingletonSettings,
1417-
leaseSettings)
1552+
leaseSettings,
1553+
selfHealingSettings)
14181554
}

0 commit comments

Comments
 (0)