diff --git a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/ConfigTransforms.scala b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/ConfigTransforms.scala index 1b7cd06de9..7b81545823 100644 --- a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/ConfigTransforms.scala +++ b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/ConfigTransforms.scala @@ -333,6 +333,12 @@ object ConfigTransforms { } ) + def updateInitialTickDuration(tick: NonNegativeFiniteDuration): ConfigTransform = { + ConfigTransforms.updateAllSvAppFoundDsoConfigs_( + _.copy(initialTickDuration = tick) + ) + } + def noDevNet: ConfigTransform = updateAllSvAppFoundDsoConfigs_(_.focus(_.isDevNet).replace(false)) diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/EnvironmentDefinition.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/EnvironmentDefinition.scala index c2030918c3..ba2029680b 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/EnvironmentDefinition.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/EnvironmentDefinition.scala @@ -425,6 +425,12 @@ case class EnvironmentDefinition( .replace(NonNegativeFiniteDuration.Zero) )(conf) ) + .addConfigTransform((_, conf) => + ConfigTransforms.updateAllAutomationConfigs( + // disable round based triggers because tests don't advance time for the triggers to run + _.focus(_.enableNewRewardTriggerScheduling).replace(false) + )(conf) + ) .withSequencerConnectionsFromScanDisabled(10_000) override lazy val environmentFactory: EnvironmentFactory[SpliceConfig, SpliceEnvironment] = diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/AutomationControlIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/AutomationControlIntegrationTest.scala index 910b8da8cf..244c0edb3b 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/AutomationControlIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/AutomationControlIntegrationTest.scala @@ -22,10 +22,8 @@ class AutomationControlIntegrationTest // start only sv1 but not sv2-4, to speed up the test .simpleTopology1Sv(this.getClass.getSimpleName) // Very short round ticks - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) // Start rounds trigger in paused state .addConfigTransforms((_, config) => diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ExternalPartySetupProposalIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ExternalPartySetupProposalIntegrationTest.scala index e1541682c6..892c432194 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ExternalPartySetupProposalIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ExternalPartySetupProposalIntegrationTest.scala @@ -90,8 +90,8 @@ class ExternalPartySetupProposalIntegrationTest .withPausedTrigger[ExpireIssuingMiningRoundTrigger] )(config), (_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) + ConfigTransforms.updateInitialTickDuration( + NonNegativeFiniteDuration.ofMillis(500) )(config), ) diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/FeaturedAppActivityMarkerIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/FeaturedAppActivityMarkerIntegrationTest.scala index 948d33e746..d715b04d42 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/FeaturedAppActivityMarkerIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/FeaturedAppActivityMarkerIntegrationTest.scala @@ -32,12 +32,14 @@ class FeaturedAppActivityMarkerIntegrationTest .addConfigTransforms((_, config) => ConfigTransforms.updateAllSvAppFoundDsoConfigs_( _.copy( - initialTickDuration = NonNegativeFiniteDuration.ofMillis(500), initialFeaturedAppActivityMarkerAmount = Some(1.0), initialAmuletPrice = 1.0, ) )(config) ) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) + ) .addConfigTransforms((_, config) => updateAutomationConfig(ConfigurableApp.Sv)( _.withPausedTrigger[AdvanceOpenMiningRoundTrigger] diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala index 8ab26b8879..07fb7c77a7 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala @@ -65,10 +65,8 @@ class ScanHistoryBackfillingIntegrationTest .withPausedTrigger[DeleteCorruptAcsSnapshotTrigger] )(config) ) - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) .addConfigTransforms((_, config) => ConfigTransforms.updateAllScanAppConfigs((_, scanConfig) => diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanIntegrationTest.scala index 550a0dea38..1620535b07 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanIntegrationTest.scala @@ -92,10 +92,8 @@ class ScanIntegrationTest extends IntegrationTest with WalletTestUtil with TimeT ) )(config) ) - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) .withTrafficTopupsEnabled diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UnclaimedSvRewardsScriptIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UnclaimedSvRewardsScriptIntegrationTest.scala index 5543e3f2e8..cd27f2a1cb 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UnclaimedSvRewardsScriptIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UnclaimedSvRewardsScriptIntegrationTest.scala @@ -45,10 +45,8 @@ class UnclaimedSvRewardsScriptIntegrationTest .withPausedTrigger[ArchiveClosedMiningRoundsTrigger] )(config) ) - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) .withTrafficTopupsDisabled diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala index 90d7453559..2f438f70a2 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala @@ -39,10 +39,8 @@ class UpdateHistoryIntegrationTest _.withPausedTrigger[AdvanceOpenMiningRoundTrigger] )(config) ) - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) .withTrafficTopupsDisabled diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletExpirationsIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletExpirationsIntegrationTest.scala index 095d5bc0e6..e2832f0ef7 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletExpirationsIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletExpirationsIntegrationTest.scala @@ -43,10 +43,8 @@ class WalletExpirationsIntegrationTest )(config) ) // Very short round ticks - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) // Start rounds trigger in paused state .addConfigTransforms((_, config) => diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletManualRoundsIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletManualRoundsIntegrationTest.scala index 3bf5c21f9d..d14e4f1693 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletManualRoundsIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletManualRoundsIntegrationTest.scala @@ -70,10 +70,8 @@ class WalletManualRoundsIntegrationTest )(config) ) // Very short round ticks - .addConfigTransforms((_, config) => - ConfigTransforms.updateAllSvAppFoundDsoConfigs_( - _.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500)) - )(config) + .addConfigTransform((_, config) => + ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config) ) // Start rounds trigger in paused state .addConfigTransforms((_, config) => diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/RoundBasedRewardTrigger.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/RoundBasedRewardTrigger.scala new file mode 100644 index 0000000000..f0289400c8 --- /dev/null +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/RoundBasedRewardTrigger.scala @@ -0,0 +1,103 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.automation + +import com.digitalasset.canton.logging.pretty.Pretty +import com.digitalasset.canton.tracing.TraceContext +import io.opentelemetry.api.trace.Tracer +import org.apache.pekko.stream.Materializer +import org.lfdecentralizedtrust.splice.automation.RoundBasedRewardTrigger.RoundBasedTask + +import java.time.{Duration, Instant} +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random + +abstract class RoundBasedRewardTrigger[T <: RoundBasedTask: Pretty]()(implicit + ec: ExecutionContext, + mat: Materializer, + tracer: Tracer, +) extends PollingParallelTaskExecutionTrigger[T] { + private val nextRunTime = new AtomicReference[Option[(Long, Instant)]](None) + + private val isNewSchedulingLogicEnabled: Boolean = context.config.enableNewRewardTriggerScheduling + + // if the new logic is disable then use the old behaviour that uses increased polling intervals + override protected def isRewardOperationTrigger: Boolean = !isNewSchedulingLogicEnabled + + override protected def retrieveTasks()(implicit tc: TraceContext): Future[Seq[T]] = { + if (isNewSchedulingLogicEnabled) { + if (shouldRun) { + retrieveAvailableTasksForRound().map(tasks => { + tasks.minByOption(_.roundDetails._1) match { + case Some(firstRound) => + val (roundNumber, roundOpening) = firstRound.roundDetails + val lastRunWasForAnOlderRound = nextRunTime.get().forall(_._1 < roundNumber) + if (lastRunWasForAnOlderRound) { + @SuppressWarnings(Array("org.wartremover.warts.IterableOps")) + val minRunTime = Seq(roundOpening, context.clock.now.toInstant).max + val maxRunTime = roundOpening.plus(firstRound.tickDuration) + val minRunAt = randomInstantBetween(minRunTime, maxRunTime) + nextRunTime.set( + Some( + roundNumber -> minRunAt + ) + ) + if (shouldRun) { + logger.info( + s"Running for $tasks because the calculated run time $minRunAt is now (between $minRunTime and $maxRunTime)." + ) + tasks + } else { + logger.info( + s"Will run for $tasks at min time $minRunAt (computed for interval between $minRunTime and $maxRunTime)." + ) + Seq.empty + } + } else { + logger.info( + s"Running for $tasks as the round still matches the next run ${nextRunTime.get()}." + ) + tasks + } + case None => + // no tasks available + tasks + } + }) + } else Future.successful(Seq.empty) + } else { + retrieveAvailableTasksForRound() + } + } + + protected def retrieveAvailableTasksForRound()(implicit tc: TraceContext): Future[Seq[T]] + + private def shouldRun = { + nextRunTime + .get() + .fold(true) { case (_, runAt) => + runAt.isBefore(context.clock.now.toInstant) || runAt.equals(context.clock.now.toInstant) + } + } + + private def randomInstantBetween(start: Instant, end: Instant): Instant = { + val range = Duration.between(start, end) + if (start.isBefore(end) || range.toMillis <= 0) + start + else { + val randomMillisInRange = Random.nextLong(range.toMillis) + start.plusMillis(randomMillisInRange) + } + } + +} + +object RoundBasedRewardTrigger { + + trait RoundBasedTask { + def roundDetails: (Long, Instant) + def tickDuration: Duration + } +} diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/AutomationConfig.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/AutomationConfig.scala index e44544e64f..043c0377dc 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/AutomationConfig.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/AutomationConfig.scala @@ -25,6 +25,9 @@ case class AutomationConfig( * `[pollingInterval * (1 - 0.5*pollingJitter), pollingInterval * (1 + 0.5 * pollingJitter)]` */ pollingJitter: Double = 0.2, + /** Enabled schedling reward operations unfiromly across the first tick of a round opening. + */ + enableNewRewardTriggerScheduling: Boolean = true, /** Reward operations can result in spikes overloading sequencers on each round switch so we * use a lower polling interval of 1/3 tick with tick = 600s */ diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/AdvanceOpenMiningRoundTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/AdvanceOpenMiningRoundTrigger.scala index fc66118e11..c65e7c840b 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/AdvanceOpenMiningRoundTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/AdvanceOpenMiningRoundTrigger.scala @@ -55,7 +55,7 @@ class AdvanceOpenMiningRoundTrigger( val rounds = task.work.openRounds for { dsoRules <- store.getDsoRules() - _ = logger.debug( + _ = logger.info( s"Starting work as for ${task.work}" ) amuletPriceVotes <- store.listSvAmuletPriceVotes() diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/ReceiveSvRewardCouponTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/ReceiveSvRewardCouponTrigger.scala index 1639525c7d..4e436ea77f 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/ReceiveSvRewardCouponTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/ReceiveSvRewardCouponTrigger.scala @@ -4,35 +4,38 @@ package org.lfdecentralizedtrust.splice.sv.automation.singlesv import cats.data.OptionT +import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} +import com.digitalasset.canton.topology.ParticipantId +import com.digitalasset.canton.topology.store.TopologyStoreId +import com.digitalasset.canton.tracing.TraceContext +import com.digitalasset.canton.util.MonadUtil +import io.opentelemetry.api.trace.Tracer +import org.apache.pekko.stream.Materializer +import org.lfdecentralizedtrust.splice.automation.RoundBasedRewardTrigger.RoundBasedTask import org.lfdecentralizedtrust.splice.automation.{ - PollingParallelTaskExecutionTrigger, + RoundBasedRewardTrigger, TaskOutcome, TaskSuccess, TriggerContext, } +import org.lfdecentralizedtrust.splice.codegen.java.da.types.Tuple2 +import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletconfig.PackageConfig import org.lfdecentralizedtrust.splice.codegen.java.splice.dso.svstate.SvRewardState import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.DsoRules -import org.lfdecentralizedtrust.splice.codegen.java.da.types.Tuple2 +import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState import org.lfdecentralizedtrust.splice.environment.{ DarResources, ParticipantAdminConnection, SpliceLedgerConnection, } +import org.lfdecentralizedtrust.splice.store.MiningRoundsStore.OpenMiningRoundContract import org.lfdecentralizedtrust.splice.sv.config.BeneficiaryConfig import org.lfdecentralizedtrust.splice.sv.store.SvDsoStore -import org.lfdecentralizedtrust.splice.store.MiningRoundsStore.OpenMiningRoundContract import org.lfdecentralizedtrust.splice.sv.util.SvUtil import org.lfdecentralizedtrust.splice.util.{AmuletConfigSchedule, AssignedContract} -import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} -import com.digitalasset.canton.topology.ParticipantId -import com.digitalasset.canton.topology.store.TopologyStoreId -import com.digitalasset.canton.tracing.TraceContext -import com.digitalasset.canton.util.MonadUtil -import io.opentelemetry.api.trace.Tracer -import org.apache.pekko.stream.Materializer -import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletconfig.PackageConfig -import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState +import java.time.temporal.ChronoUnit +import java.time.{Duration, Instant} import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters.* import scala.math.Ordering.Implicits.* @@ -47,14 +50,12 @@ class ReceiveSvRewardCouponTrigger( override val ec: ExecutionContext, mat: Materializer, override val tracer: Tracer, -) extends PollingParallelTaskExecutionTrigger[ReceiveSvRewardCouponTrigger.Task] { - - override def isRewardOperationTrigger: Boolean = true +) extends RoundBasedRewardTrigger[ReceiveSvRewardCouponTrigger.Task] { private val svParty = store.key.svParty private val dsoParty = store.key.dsoParty - override protected def retrieveTasks()(implicit + override protected def retrieveAvailableTasksForRound()(implicit tc: TraceContext ): Future[Seq[ReceiveSvRewardCouponTrigger.Task]] = { for { @@ -232,9 +233,10 @@ object ReceiveSvRewardCouponTrigger { rewardState: AssignedContract[SvRewardState.ContractId, SvRewardState], round: OpenMiningRoundContract, beneficiaries: Seq[BeneficiaryConfig], - ) extends PrettyPrinting { - import org.lfdecentralizedtrust.splice.util.PrettyInstances.* + ) extends PrettyPrinting + with RoundBasedTask { import com.digitalasset.canton.participant.pretty.Implicits.prettyContractId + import org.lfdecentralizedtrust.splice.util.PrettyInstances.* override def pretty: Pretty[this.type] = prettyOfClass( @@ -243,6 +245,12 @@ object ReceiveSvRewardCouponTrigger { param("rewardState", _.rewardState), param("round", _.round), ) + + override def roundDetails: (Long, Instant) = + Long.unbox(round.payload.round.number) -> round.payload.opensAt + + override def tickDuration: Duration = + Duration.of(round.payload.tickDuration.microseconds, ChronoUnit.MICROS) } def svLatestVettedPackages(packages: PackageConfig): Seq[String] = Seq( diff --git a/docs/src/release_notes.rst b/docs/src/release_notes.rst index 44c1b42198..576faab8c4 100644 --- a/docs/src/release_notes.rst +++ b/docs/src/release_notes.rst @@ -17,6 +17,10 @@ Upcoming and relies purely on network-level access control for securing the validator, i.e. anyone with access to your node can act on your behalf. + - Reward collection + + - Changed the behavior of automation around rewards and coupons to run for the first time in the interval of ``round open time`` -> ``round open time + tick duration``. This might increase the observed duration between rewards and coupons being issued and until they are collected. Once the first tick elapses retries will happen more aggressively. + 0.4.20 ------