Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ object ConfigTransforms {
}
)

def updateInitialTickDuration(tick: NonNegativeFiniteDuration): ConfigTransform = {
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = tick)
)
}

def noDevNet: ConfigTransform =
updateAllSvAppFoundDsoConfigs_(_.focus(_.isDevNet).replace(false))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ case class EnvironmentDefinition(
.replace(NonNegativeFiniteDuration.Zero)
)(conf)
)
.addConfigTransform((_, conf) =>
ConfigTransforms.updateAllAutomationConfigs(
// disable round based triggers because tests don't advance time for the triggers to run
_.focus(_.enableNewRewardTriggerScheduling).replace(false)
)(conf)
)
.withSequencerConnectionsFromScanDisabled(10_000)

override lazy val environmentFactory: EnvironmentFactory[SpliceConfig, SpliceEnvironment] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ class AutomationControlIntegrationTest
// start only sv1 but not sv2-4, to speed up the test
.simpleTopology1Sv(this.getClass.getSimpleName)
// Very short round ticks
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
// Start rounds trigger in paused state
.addConfigTransforms((_, config) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ class ExternalPartySetupProposalIntegrationTest
.withPausedTrigger[ExpireIssuingMiningRoundTrigger]
)(config),
(_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
ConfigTransforms.updateInitialTickDuration(
NonNegativeFiniteDuration.ofMillis(500)
)(config),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ class FeaturedAppActivityMarkerIntegrationTest
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(
initialTickDuration = NonNegativeFiniteDuration.ofMillis(500),
initialFeaturedAppActivityMarkerAmount = Some(1.0),
initialAmuletPrice = 1.0,
)
)(config)
)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
.addConfigTransforms((_, config) =>
updateAutomationConfig(ConfigurableApp.Sv)(
_.withPausedTrigger[AdvanceOpenMiningRoundTrigger]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ class ScanHistoryBackfillingIntegrationTest
.withPausedTrigger[DeleteCorruptAcsSnapshotTrigger]
)(config)
)
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllScanAppConfigs((_, scanConfig) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ class ScanIntegrationTest extends IntegrationTest with WalletTestUtil with TimeT
)
)(config)
)
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
.withTrafficTopupsEnabled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ class UnclaimedSvRewardsScriptIntegrationTest
.withPausedTrigger[ArchiveClosedMiningRoundsTrigger]
)(config)
)
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
.withTrafficTopupsDisabled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ class UpdateHistoryIntegrationTest
_.withPausedTrigger[AdvanceOpenMiningRoundTrigger]
)(config)
)
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
.withTrafficTopupsDisabled

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ class WalletExpirationsIntegrationTest
)(config)
)
// Very short round ticks
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
// Start rounds trigger in paused state
.addConfigTransforms((_, config) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ class WalletManualRoundsIntegrationTest
)(config)
)
// Very short round ticks
.addConfigTransforms((_, config) =>
ConfigTransforms.updateAllSvAppFoundDsoConfigs_(
_.copy(initialTickDuration = NonNegativeFiniteDuration.ofMillis(500))
)(config)
.addConfigTransform((_, config) =>
ConfigTransforms.updateInitialTickDuration(NonNegativeFiniteDuration.ofMillis(500))(config)
)
// Start rounds trigger in paused state
.addConfigTransforms((_, config) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.automation

import com.digitalasset.canton.logging.pretty.Pretty
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
import org.lfdecentralizedtrust.splice.automation.RoundBasedRewardTrigger.RoundBasedTask

import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

abstract class RoundBasedRewardTrigger[T <: RoundBasedTask: Pretty]()(implicit
ec: ExecutionContext,
mat: Materializer,
tracer: Tracer,
) extends PollingParallelTaskExecutionTrigger[T] {
private val nextRunTime = new AtomicReference[Option[(Long, Instant)]](None)

private val isNewSchedulingLogicEnabled: Boolean = context.config.enableNewRewardTriggerScheduling

// if the new logic is disable then use the old behaviour that uses increased polling intervals
override protected def isRewardOperationTrigger: Boolean = !isNewSchedulingLogicEnabled

override protected def retrieveTasks()(implicit tc: TraceContext): Future[Seq[T]] = {
if (isNewSchedulingLogicEnabled) {
if (shouldRun) {
retrieveAvailableTasksForRound().map(tasks => {
tasks.minByOption(_.roundDetails._1) match {
case Some(firstRound) =>
val (roundNumber, roundOpening) = firstRound.roundDetails
val lastRunWasForAnOlderRound = nextRunTime.get().forall(_._1 < roundNumber)
if (lastRunWasForAnOlderRound) {
@SuppressWarnings(Array("org.wartremover.warts.IterableOps"))
val minRunTime = Seq(roundOpening, context.clock.now.toInstant).max
val maxRunTime = roundOpening.plus(firstRound.tickDuration)
val minRunAt = randomInstantBetween(minRunTime, maxRunTime)
nextRunTime.set(
Some(
roundNumber -> minRunAt
)
)
if (shouldRun) {
logger.info(
s"Running for $tasks because the calculated run time $minRunAt is now (between $minRunTime and $maxRunTime)."
)
tasks
} else {
logger.info(
s"Will run for $tasks at min time $minRunAt (computed for interval between $minRunTime and $maxRunTime)."
)
Seq.empty
}
} else {
logger.info(
s"Running for $tasks as the round still matches the next run ${nextRunTime.get()}."
)
tasks
}
case None =>
// no tasks available
tasks
}
})
} else Future.successful(Seq.empty)
} else {
retrieveAvailableTasksForRound()
}
}

protected def retrieveAvailableTasksForRound()(implicit tc: TraceContext): Future[Seq[T]]

private def shouldRun = {
nextRunTime
.get()
.fold(true) { case (_, runAt) =>
runAt.isBefore(context.clock.now.toInstant) || runAt.equals(context.clock.now.toInstant)
}
}

private def randomInstantBetween(start: Instant, end: Instant): Instant = {
val range = Duration.between(start, end)
if (start.isBefore(end) || range.toMillis <= 0)
start
else {
val randomMillisInRange = Random.nextLong(range.toMillis)
start.plusMillis(randomMillisInRange)
}
}

}

object RoundBasedRewardTrigger {

trait RoundBasedTask {
def roundDetails: (Long, Instant)
def tickDuration: Duration
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ case class AutomationConfig(
* `[pollingInterval * (1 - 0.5*pollingJitter), pollingInterval * (1 + 0.5 * pollingJitter)]`
*/
pollingJitter: Double = 0.2,
/** Enabled schedling reward operations unfiromly across the first tick of a round opening.
*/
enableNewRewardTriggerScheduling: Boolean = true,
/** Reward operations can result in spikes overloading sequencers on each round switch so we
* use a lower polling interval of 1/3 tick with tick = 600s
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class AdvanceOpenMiningRoundTrigger(
val rounds = task.work.openRounds
for {
dsoRules <- store.getDsoRules()
_ = logger.debug(
_ = logger.info(
s"Starting work as for ${task.work}"
)
amuletPriceVotes <- store.listSvAmuletPriceVotes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,38 @@
package org.lfdecentralizedtrust.splice.sv.automation.singlesv

import cats.data.OptionT
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.ParticipantId
import com.digitalasset.canton.topology.store.TopologyStoreId
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.MonadUtil
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
import org.lfdecentralizedtrust.splice.automation.RoundBasedRewardTrigger.RoundBasedTask
import org.lfdecentralizedtrust.splice.automation.{
PollingParallelTaskExecutionTrigger,
RoundBasedRewardTrigger,
TaskOutcome,
TaskSuccess,
TriggerContext,
}
import org.lfdecentralizedtrust.splice.codegen.java.da.types.Tuple2
import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletconfig.PackageConfig
import org.lfdecentralizedtrust.splice.codegen.java.splice.dso.svstate.SvRewardState
import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.DsoRules
import org.lfdecentralizedtrust.splice.codegen.java.da.types.Tuple2
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState
import org.lfdecentralizedtrust.splice.environment.{
DarResources,
ParticipantAdminConnection,
SpliceLedgerConnection,
}
import org.lfdecentralizedtrust.splice.store.MiningRoundsStore.OpenMiningRoundContract
import org.lfdecentralizedtrust.splice.sv.config.BeneficiaryConfig
import org.lfdecentralizedtrust.splice.sv.store.SvDsoStore
import org.lfdecentralizedtrust.splice.store.MiningRoundsStore.OpenMiningRoundContract
import org.lfdecentralizedtrust.splice.sv.util.SvUtil
import org.lfdecentralizedtrust.splice.util.{AmuletConfigSchedule, AssignedContract}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.ParticipantId
import com.digitalasset.canton.topology.store.TopologyStoreId
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.MonadUtil
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletconfig.PackageConfig
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState

import java.time.temporal.ChronoUnit
import java.time.{Duration, Instant}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.*
import scala.math.Ordering.Implicits.*
Expand All @@ -47,14 +50,12 @@ class ReceiveSvRewardCouponTrigger(
override val ec: ExecutionContext,
mat: Materializer,
override val tracer: Tracer,
) extends PollingParallelTaskExecutionTrigger[ReceiveSvRewardCouponTrigger.Task] {

override def isRewardOperationTrigger: Boolean = true
) extends RoundBasedRewardTrigger[ReceiveSvRewardCouponTrigger.Task] {

private val svParty = store.key.svParty
private val dsoParty = store.key.dsoParty

override protected def retrieveTasks()(implicit
override protected def retrieveAvailableTasksForRound()(implicit
tc: TraceContext
): Future[Seq[ReceiveSvRewardCouponTrigger.Task]] = {
for {
Expand Down Expand Up @@ -232,9 +233,10 @@ object ReceiveSvRewardCouponTrigger {
rewardState: AssignedContract[SvRewardState.ContractId, SvRewardState],
round: OpenMiningRoundContract,
beneficiaries: Seq[BeneficiaryConfig],
) extends PrettyPrinting {
import org.lfdecentralizedtrust.splice.util.PrettyInstances.*
) extends PrettyPrinting
with RoundBasedTask {
import com.digitalasset.canton.participant.pretty.Implicits.prettyContractId
import org.lfdecentralizedtrust.splice.util.PrettyInstances.*

override def pretty: Pretty[this.type] =
prettyOfClass(
Expand All @@ -243,6 +245,12 @@ object ReceiveSvRewardCouponTrigger {
param("rewardState", _.rewardState),
param("round", _.round),
)

override def roundDetails: (Long, Instant) =
Long.unbox(round.payload.round.number) -> round.payload.opensAt

override def tickDuration: Duration =
Duration.of(round.payload.tickDuration.microseconds, ChronoUnit.MICROS)
}

def svLatestVettedPackages(packages: PackageConfig): Seq[String] = Seq(
Expand Down
4 changes: 4 additions & 0 deletions docs/src/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Upcoming
and relies purely on network-level access control for securing the validator, i.e. anyone with access to your node
can act on your behalf.

- Reward collection

- Changed the behavior of automation around rewards and coupons to run for the first time in the interval of ``round open time`` -> ``round open time + tick duration``. This might increase the observed duration between rewards and coupons being issued and until they are collected. Once the first tick elapses retries will happen more aggressively.

0.4.20
------

Expand Down
Loading