Skip to content

Commit 56ac569

Browse files
committed
[ci] works, with canton restartSource
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent 57e19e4 commit 56ac569

File tree

4 files changed

+90
-52
lines changed

4 files changed

+90
-52
lines changed

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanKeyValueProvider.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,27 @@ class ScanKeyValueProvider(val store: KeyValueStore, val loggerFactory: NamedLog
2121
private val latestAcsSnapshotInBulkStorageKey = "latest_acs_snapshot_in_bulk_storage"
2222

2323
final def setLatestAcsSnapshotsInBulkStorage(
24-
timestamp: CantonTimestamp,
2524
migrationId: Long,
25+
timestamp: CantonTimestamp,
2626
)(implicit tc: TraceContext): Future[Unit] = store.setValue(
2727
latestAcsSnapshotInBulkStorageKey,
28-
AcsSnapshotTimestampMigration(timestamp, migrationId),
28+
AcsSnapshotTimestampMigration(migrationId, timestamp),
2929
)
3030

3131
final def getLatestAcsSnapshotInBulkStorage()(implicit
3232
tc: TraceContext,
3333
ec: ExecutionContext,
34-
): OptionT[Future, (CantonTimestamp, Long)] = {
35-
val result: OptionT[Future, AcsSnapshotTimestampMigration] = store.readValueAndLogOnDecodingFailure(latestAcsSnapshotInBulkStorageKey)
36-
result.map(result => (result.timestamp, result.migrationId))
34+
): OptionT[Future, (Long, CantonTimestamp)] = {
35+
val result: OptionT[Future, AcsSnapshotTimestampMigration] =
36+
store.readValueAndLogOnDecodingFailure(latestAcsSnapshotInBulkStorageKey)
37+
result.map(result => (result.migrationId, result.timestamp))
3738
}
3839
}
3940

4041
object ScanKeyValueProvider {
4142
final case class AcsSnapshotTimestampMigration(
42-
timestamp: CantonTimestamp,
4343
migrationId: Long,
44+
timestamp: CantonTimestamp,
4445
)
4546
implicit val timestampCodec: Codec[CantonTimestamp] =
4647
Codec

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import org.apache.pekko.actor.ActorSystem
1313
import org.apache.pekko.stream.scaladsl.{Keep, Source}
1414
import org.apache.pekko.pattern.after
1515
import org.apache.pekko.stream.{KillSwitch, KillSwitches}
16-
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
16+
import org.lfdecentralizedtrust.splice.scan.store.{AcsSnapshotStore, ScanKeyValueProvider}
1717

1818
import scala.concurrent.{ExecutionContext, Future}
1919
import scala.concurrent.duration.*
@@ -22,18 +22,19 @@ class AcsSnapshotBulkStorage(
2222
val config: BulkStorageConfig,
2323
val acsSnapshotStore: AcsSnapshotStore,
2424
val s3Connection: S3BucketConnection,
25+
val kvProvider: ScanKeyValueProvider,
2526
override val loggerFactory: NamedLoggerFactory,
2627
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
2728
extends NamedLogging {
2829

2930
// TODO(#3429): persist progress (or conclude it from the S3 storage), and start from latest successfully dumped snapshot upon restart
30-
private def getStartTimestamp: (Long, CantonTimestamp) = (0, CantonTimestamp.MinValue)
31+
private def getStartTimestamp: Future[Option[(Long, CantonTimestamp)]] = kvProvider.getLatestAcsSnapshotInBulkStorage().value
3132

3233
// When new snapshot is not yet available, how long to wait for a new one.
3334
// TODO(#3429): make it longer for prod (so consider making it configurable/overridable for tests)
3435
private val snapshotPollingInterval = 5.seconds
3536

36-
private def getAcsSnapshotTimestampAfter(
37+
private def getAcsSnapshotTimestampsAfter(
3738
startMigrationId: Long,
3839
startTimestamp: CantonTimestamp,
3940
): Source[(Long, CantonTimestamp), NotUsed] = {
@@ -66,15 +67,15 @@ class AcsSnapshotBulkStorage(
6667
/** This is the main implementation of the pipeline. It is a Pekko Source that gets a `start` timestamp, and starts dumping to S3
6768
* all snapshots (strictly) after `start`. It is an infinite source that should never complete.
6869
*/
69-
private def mksrc(
70-
start: (Long, CantonTimestamp)
71-
): Source[(Long, CantonTimestamp), (KillSwitch, Future[Done])] = start match {
72-
case (startMigrationId, startAfterTimestamp) =>
73-
logger.debug(
74-
s"Starting ACS snapshot dump source, from migration $startMigrationId, timestamp $startAfterTimestamp"
75-
)
70+
private def mksrc(): Source[(Long, CantonTimestamp), (KillSwitch, Future[Done])] = {
71+
logger.debug("Starting ACS snapshot dump source")
7672
val base =
77-
getAcsSnapshotTimestampAfter(startMigrationId, startAfterTimestamp)
73+
Source.single[Unit](())
74+
.mapAsync(1){_ => getStartTimestamp}
75+
.flatMapConcat {
76+
case Some((startMigrationId, startAfterTimestamp)) => getAcsSnapshotTimestampsAfter(startMigrationId, startAfterTimestamp)
77+
case None => getAcsSnapshotTimestampsAfter(0, CantonTimestamp.MinValue)
78+
}
7879
.via(
7980
SingleAcsSnapshotBulkStorage.asFlow(
8081
config,
@@ -83,8 +84,16 @@ class AcsSnapshotBulkStorage(
8384
loggerFactory,
8485
)
8586
)
87+
.mapAsync(1){case (migrationId, timestamp) =>
88+
for {
89+
_ <- kvProvider.setLatestAcsSnapshotsInBulkStorage(migrationId, timestamp)
90+
} yield {
91+
(migrationId, timestamp)
92+
}
93+
}
94+
8695

87-
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
96+
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
8897
withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) }
8998
}
9099

@@ -96,28 +105,28 @@ class AcsSnapshotBulkStorage(
96105
// to Pekko's built-in RestartSource (for now, it's convenient to use Canton's ability to track state via lastEmittedElement)
97106
// TODO(#3429): tweak the retry parameters here
98107
val delay = FiniteDuration(5, "seconds")
99-
val policy = new RetrySourcePolicy[(Long, CantonTimestamp), (Long, CantonTimestamp)] {
108+
val policy = new RetrySourcePolicy[Unit, (Long, CantonTimestamp)] {
100109
override def shouldRetry(
101-
lastState: (Long, CantonTimestamp),
110+
lastState: Unit,
102111
lastEmittedElement: Option[(Long, CantonTimestamp)],
103112
lastFailure: Option[Throwable],
104-
): Option[(scala.concurrent.duration.FiniteDuration, (Long, CantonTimestamp))] = {
113+
): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = {
105114
lastFailure.map { t =>
106115
logger.warn(
107116
s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
108117
.messageWithStacktrace(t)}, will retry after delay of $delay from last successful timestamp $lastEmittedElement"
109118
)
110119
// Always retry (TODO(#3429): consider a max number of retries?)
111-
delay -> lastEmittedElement.fold(lastState)(identity)
120+
delay -> ()
112121
}
113122
}
114123
}
115124

116125
PekkoUtil
117126
.restartSource(
118127
name = "acs-snapshot-dump",
119-
initial = getStartTimestamp,
120-
mkSource = (start: (Long, CantonTimestamp)) => mksrc(start),
128+
initial = (),
129+
mkSource = (_: Unit) => mksrc(),
121130
policy = policy,
122131
)
123132

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanKeyValueProviderTest.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.lfdecentralizedtrust.splice.store.db.SplicePostgresTest
1313
import org.scalatest.matchers.should.Matchers
1414

1515
import scala.concurrent.Future
16+
import scala.concurrent.duration._
1617

1718
class ScanKeyValueProviderTest
1819
extends StoreTest
@@ -22,13 +23,15 @@ class ScanKeyValueProviderTest
2223
"ScanKeyValueProvider" should {
2324
"set and get acs snapshots timestamps" in {
2425
val ts = CantonTimestamp.now()
26+
val ts2 = ts.add(1.minute)
2527
val migrationId = 7L
2628
for {
2729
provider <- mkProvider
28-
_ <- provider.setLatestAcsSnapshotsInBulkStorage(ts, migrationId)
30+
_ <- provider.setLatestAcsSnapshotsInBulkStorage(migrationId, ts)
31+
_ <- provider.setLatestAcsSnapshotsInBulkStorage(migrationId, ts2)
2932
readBack <- provider.getLatestAcsSnapshotInBulkStorage().value
3033
} yield {
31-
readBack.value shouldBe (ts, migrationId)
34+
readBack.value shouldBe (migrationId, ts2)
3235
}
3336
}
3437
}

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorageTest.scala

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,24 @@
44
package org.lfdecentralizedtrust.splice.scan.store.bulk
55

66
import com.digitalasset.canton.data.CantonTimestamp
7+
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
78
import com.digitalasset.canton.logging.NamedLoggerFactory
89
import com.digitalasset.canton.protocol.LfContractId
10+
import com.digitalasset.canton.resource.DbStorage
911
import com.digitalasset.canton.topology.PartyId
1012
import com.digitalasset.canton.tracing.TraceContext
1113
import com.digitalasset.canton.util.PekkoUtil.WithKillSwitch
1214
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
1315
import org.apache.pekko.stream.scaladsl.Sink
1416
import org.apache.pekko.stream.testkit.scaladsl.TestSink
1517
import org.lfdecentralizedtrust.splice.http.v0.definitions as httpApi
16-
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
18+
import org.lfdecentralizedtrust.splice.scan.store.{
19+
AcsSnapshotStore,
20+
ScanKeyValueProvider,
21+
ScanKeyValueStore,
22+
}
1723
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult
24+
import org.lfdecentralizedtrust.splice.store.db.SplicePostgresTest
1825
import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent
1926
import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, StoreTest}
2027
import org.lfdecentralizedtrust.splice.util.PackageQualifiedName
@@ -34,7 +41,8 @@ class AcsSnapshotBulkStorageTest
3441
extends StoreTest
3542
with HasExecutionContext
3643
with HasActorSystem
37-
with HasS3Mock {
44+
with HasS3Mock
45+
with SplicePostgresTest {
3846

3947
val acsSnapshotSize = 48500
4048
val bulkStorageTestConfig = BulkStorageConfig(
@@ -99,33 +107,38 @@ class AcsSnapshotBulkStorageTest
99107
withS3Mock {
100108
val store = new MockAcsSnapshotStore()
101109
val s3BucketConnection = getS3BucketConnectionWithInjectedErrors(loggerFactory)
102-
val probe = new AcsSnapshotBulkStorage(
103-
bulkStorageTestConfig,
104-
store.store,
105-
s3BucketConnection,
106-
loggerFactory,
107-
).getSource
108-
.runWith(TestSink.probe[WithKillSwitch[(Long, CantonTimestamp)]])
110+
val ts1 = CantonTimestamp.tryFromInstant(Instant.ofEpochSecond(10))
111+
val ts2 = CantonTimestamp.tryFromInstant(Instant.ofEpochSecond(20))
112+
for {
113+
kvProvider <- mkProvider
114+
probe = new AcsSnapshotBulkStorage(
115+
bulkStorageTestConfig,
116+
store.store,
117+
s3BucketConnection,
118+
kvProvider,
119+
loggerFactory,
120+
).getSource
121+
.runWith(TestSink.probe[WithKillSwitch[(Long, CantonTimestamp)]])
109122

110-
clue("Initially, a single snapshot is dumped") {
111-
probe.request(2)
112-
probe.expectNext(2.minutes).value shouldBe (0, CantonTimestamp.tryFromInstant(
113-
Instant.ofEpochSecond(10)
114-
))
115-
probe.expectNoMessage(10.seconds)
116-
}
123+
_ = clue("Initially, a single snapshot is dumped") {
124+
probe.request(2)
125+
probe.expectNext(2.minutes).value shouldBe(0, ts1)
126+
probe.expectNoMessage(10.seconds)
127+
}
128+
persistedTs1 <- kvProvider.getLatestAcsSnapshotInBulkStorage().value
129+
_ = persistedTs1.value shouldBe (0, ts1)
117130

118-
clue("Add another snapshot to the store, it is also dumped") {
119-
store.addSnapshot(CantonTimestamp.tryFromInstant(Instant.ofEpochSecond(20)))
120-
val next = probe.expectNext(2.minutes)
121-
next.value shouldBe (0, CantonTimestamp.tryFromInstant(
122-
Instant.ofEpochSecond(20)
123-
))
124-
probe.expectNoMessage(10.seconds)
125-
next.killSwitch.shutdown()
131+
_ = clue("Add another snapshot to the store, it is also dumped") {
132+
store.addSnapshot(CantonTimestamp.tryFromInstant(Instant.ofEpochSecond(20)))
133+
val next = probe.expectNext(2.minutes)
134+
next.value shouldBe(0, ts2)
135+
probe.expectNoMessage(10.seconds)
136+
next.killSwitch.shutdown()
137+
}
138+
persistedTs2 <- kvProvider.getLatestAcsSnapshotInBulkStorage().value
139+
} yield {
140+
persistedTs2.value shouldBe (0, ts2)
126141
}
127-
128-
succeed
129142
}
130143
}
131144
}
@@ -254,4 +267,16 @@ class AcsSnapshotBulkStorageTest
254267
s3BucketConnectionWithErrors
255268
}
256269

270+
def mkProvider: Future[ScanKeyValueProvider] = {
271+
ScanKeyValueStore(
272+
dsoParty = dsoParty,
273+
participantId = mkParticipantId("participant"),
274+
storage,
275+
loggerFactory,
276+
).map(new ScanKeyValueProvider(_, loggerFactory))
277+
}
278+
279+
override protected def cleanDb(
280+
storage: DbStorage
281+
)(implicit traceContext: TraceContext): FutureUnlessShutdown[?] = resetAllAppTables(storage)
257282
}

0 commit comments

Comments
 (0)