diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala index 5a4fc42f26..092801a2cd 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala @@ -3,39 +3,20 @@ package org.lfdecentralizedtrust.splice.scan.store.bulk -import scala.concurrent.ExecutionContext import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} import com.digitalasset.canton.tracing.TraceContext -import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil} import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy +import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil} +import org.apache.pekko.{Done, NotUsed} import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy} -import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source} -import org.apache.pekko.util.ByteString -import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings +import org.apache.pekko.stream.scaladsl.{Keep, Source} +import org.apache.pekko.pattern.after +import org.apache.pekko.stream.{KillSwitch, KillSwitches} import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore -import org.lfdecentralizedtrust.splice.store.HardLimit - -import scala.concurrent.Future -import io.circe.syntax.* - -import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.duration.FiniteDuration - -import Position.* - -object Position { - sealed trait Position - - case object Start extends Position - case object End extends Position - - final case class Index(value: Long) extends Position -} +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.* class AcsSnapshotBulkStorage( val config: BulkStorageConfig, @@ -45,79 +26,89 @@ class AcsSnapshotBulkStorage( )(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext) extends NamedLogging { - private def getAcsSnapshotChunk( - migrationId: Long, - timestamp: CantonTimestamp, - after: Option[Long], - ): Future[(Position, ByteString)] = { - for { - snapshot <- acsSnapshotStore.queryAcsSnapshot( - migrationId, - snapshot = timestamp, - after, - HardLimit.tryCreate(config.dbReadChunkSize), - Seq.empty, - Seq.empty, - ) - } yield { - val encoded = snapshot.createdEventsInPage.map(event => - CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(event.eventId, event.event) - ) - val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n" - val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8)) - logger.debug( - s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes" - ) - (snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes) - } - + // TODO(#3429): persist progress (or conclude it from the S3 storage), and start from latest successfully dumped snapshot upon restart + private def getStartTimestamp: (Long, CantonTimestamp) = (0, CantonTimestamp.MinValue) + + // When new snapshot is not yet available, how long to wait for a new one. + // TODO(#3429): make it longer for prod (so consider making it configurable/overridable for tests) + private val snapshotPollingInterval = 5.seconds + + private def getAcsSnapshotTimestampAfter( + startMigrationId: Long, + startTimestamp: CantonTimestamp, + ): Source[(Long, CantonTimestamp), NotUsed] = { + Source + .unfoldAsync((startMigrationId, startTimestamp)) { + case (lastMigrationId: Long, lastTimestamp: CantonTimestamp) => + acsSnapshotStore.lookupSnapshotAfter(lastMigrationId, lastTimestamp).flatMap { + case Some(snapshot) => + logger.info( + s"next snapshot available, at migration ${snapshot.migrationId}, record time ${snapshot.snapshotRecordTime}" + ) + Future.successful( + Some( + ( + (snapshot.migrationId, snapshot.snapshotRecordTime), + Some((snapshot.migrationId, snapshot.snapshotRecordTime)), + ) + ) + ) + case None => + after(snapshotPollingInterval, actorSystem.scheduler) { + logger.debug("No new snapshot available, sleeping...") + Future.successful(Some(((lastMigrationId, lastTimestamp), None))) + } + } + } + .collect { case Some((migrationId, timestamp)) => (migrationId, timestamp) } } - def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = { + /** This is the main implementation of the pipeline. It is a Pekko Source that gets a `start` timestamp, and starts dumping to S3 + * all snapshots (strictly) after `start`. It is an infinite source that should never complete. + */ + private def mksrc( + start: (Long, CantonTimestamp) + ): Source[(Long, CantonTimestamp), (KillSwitch, Future[Done])] = start match { + case (startMigrationId, startAfterTimestamp) => + logger.debug( + s"Starting ACS snapshot dump source, from migration $startMigrationId, timestamp $startAfterTimestamp" + ) + val base = + getAcsSnapshotTimestampAfter(startMigrationId, startAfterTimestamp) + .via( + SingleAcsSnapshotBulkStorage.asFlow( + config, + acsSnapshotStore, + s3Connection, + loggerFactory, + ) + ) - def mksrc = { - val idx = new AtomicInteger(0) - val base = Source - .unfoldAsync(Start: Position) { - case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_)) - case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_)) - case End => Future.successful(None) - } - .via(ZstdGroupedWeight(config.maxFileSize)) - // Add a buffer so that the next object continues accumulating while we write the previous one - .buffer( - 1, - OverflowStrategy.backpressure, - ) - .mapAsync(1) { case ByteStringWithTermination(zstdObj, isLast) => - val objectKey = if (isLast) s"snapshot_${idx}_last.zstd" else s"snapshot_$idx.zstd" - // TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole. - // Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly, - // i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the - // partially written object. - for { - _ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe())) - } yield { - idx.addAndGet(1) - } - } val withKs = base.viaMat(KillSwitches.single)(Keep.right) withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) } - } + } + /** wraps mksrc (where the main pipeline logic is implemented) in a retry loop, to retry upon failures. + */ + def getSource + : Source[PekkoUtil.WithKillSwitch[(Long, CantonTimestamp)], (KillSwitch, Future[Done])] = { + // TODO(#3429): once we persist the state, i.e. the last dumped snapshot, consider moving from Canton's PekkoUtil.restartSource + // to Pekko's built-in RestartSource (for now, it's convenient to use Canton's ability to track state via lastEmittedElement) // TODO(#3429): tweak the retry parameters here val delay = FiniteDuration(5, "seconds") - val policy = new RetrySourcePolicy[Unit, Int] { + val policy = new RetrySourcePolicy[(Long, CantonTimestamp), (Long, CantonTimestamp)] { override def shouldRetry( - lastState: Unit, - lastEmittedElement: Option[Int], + lastState: (Long, CantonTimestamp), + lastEmittedElement: Option[(Long, CantonTimestamp)], lastFailure: Option[Throwable], - ): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = { + ): Option[(scala.concurrent.duration.FiniteDuration, (Long, CantonTimestamp))] = { lastFailure.map { t => - logger.warn(s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil - .messageWithStacktrace(t)}, will retry after delay of ${delay}") + logger.warn( + s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil + .messageWithStacktrace(t)}, will retry after delay of $delay from last successful timestamp $lastEmittedElement" + ) // Always retry (TODO(#3429): consider a max number of retries?) - delay -> () + delay -> lastEmittedElement.fold(lastState)(identity) } } } @@ -125,11 +116,11 @@ class AcsSnapshotBulkStorage( PekkoUtil .restartSource( name = "acs-snapshot-dump", - initial = (), - mkSource = (_: Unit) => mksrc, + initial = getStartTimestamp, + mkSource = (start: (Long, CantonTimestamp)) => mksrc(start), policy = policy, ) - .runWith(Sink.ignore) - }.map(_ => ()) + } + } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/SingleAcsSnapshotBulkStorage.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/SingleAcsSnapshotBulkStorage.scala new file mode 100644 index 0000000000..ae4f3c3524 --- /dev/null +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/SingleAcsSnapshotBulkStorage.scala @@ -0,0 +1,216 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.scan.store.bulk + +import scala.concurrent.ExecutionContext +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} +import com.digitalasset.canton.tracing.TraceContext +import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil} +import com.digitalasset.canton.util.PekkoUtil.{RetrySourcePolicy, WithKillSwitch} +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy} +import org.apache.pekko.stream.scaladsl.{Flow, Keep, Source} +import org.apache.pekko.util.ByteString +import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings +import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore +import org.lfdecentralizedtrust.splice.store.HardLimit + +import scala.concurrent.Future +import io.circe.syntax.* + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.duration.FiniteDuration +import Position.* +import org.apache.pekko.{Done, NotUsed} + +object Position { + sealed trait Position + + case object Start extends Position + + case object End extends Position + + final case class Index(value: Long) extends Position +} + +class SingleAcsSnapshotBulkStorage( + val migrationId: Long, + val timestamp: CantonTimestamp, + val config: BulkStorageConfig, + val acsSnapshotStore: AcsSnapshotStore, + val s3Connection: S3BucketConnection, + override val loggerFactory: NamedLoggerFactory, +)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext) + extends NamedLogging { + + private def getAcsSnapshotChunk( + migrationId: Long, + timestamp: CantonTimestamp, + after: Option[Long], + ): Future[(Position, ByteString)] = { + for { + snapshot <- acsSnapshotStore.queryAcsSnapshot( + migrationId, + snapshot = timestamp, + after, + HardLimit.tryCreate(config.dbReadChunkSize), + Seq.empty, + Seq.empty, + ) + } yield { + val encoded = snapshot.createdEventsInPage.map(event => + CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(event.eventId, event.event) + ) + val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n" + val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8)) + logger.debug( + s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes" + ) + (snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes) + } + + } + + private def getSource: Source[(Long, CantonTimestamp), NotUsed] = { + val idx = new AtomicInteger(0) + Source + .unfoldAsync(Start: Position) { + case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_)) + case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_)) + case End => Future.successful(None) + } + .via(ZstdGroupedWeight(config.maxFileSize)) + // Add a buffer so that the next object continues accumulating while we write the previous one + .buffer( + 1, + OverflowStrategy.backpressure, + ) + .mapAsync(1) { case ByteStringWithTermination(zstdObj, isLast) => + // TODO(#3429): use actual prefixes for segments, for now we just use the snapshot + val objectKeyPrefix = s"${timestamp.toInstant.toString}" + val objectKey = + if (isLast) s"$objectKeyPrefix/snapshot_${idx}_last.zstd" + else s"$objectKeyPrefix/snapshot_$idx.zstd" + // TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole. + // Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly, + // i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the + // partially written object. + for { + _ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe())) + } yield { + idx.addAndGet(1) + objectKey + } + } + // emit a Unit upon completion of the write to s3 + .fold(()) { case ((), _) => () } + // emit back the (migration, timestamp) upon completion + .map(_ => (migrationId, timestamp)) + + } + + // TODO(#3429): I'm no longer sure the retrying source is actually useful, + // we probably want to just rely on the of the full stream of ACS snapshot dumps (in AcsSnapshotBulkStorage), + // but keeping it for now (and the corresponding unit test) until that is fully resolved + private def getRetryingSource + : Source[WithKillSwitch[(Long, CantonTimestamp)], (KillSwitch, Future[Done])] = { + + def mksrc = { + val base = getSource + val withKs = base.viaMat(KillSwitches.single)(Keep.right) + withKs.watchTermination() { case (ks, done) => + ( + ks: KillSwitch, + done.map { done => + logger.debug( + s"Finished dumping to bulk storage the ACS snapshot for migration $migrationId, timestamp $timestamp" + ) + done + }, + ) + } + } + + // TODO(#3429): tweak the retry parameters here + val delay = FiniteDuration(5, "seconds") + val policy = new RetrySourcePolicy[Unit, (Long, CantonTimestamp)] { + override def shouldRetry( + lastState: Unit, + lastEmittedElement: Option[(Long, CantonTimestamp)], + lastFailure: Option[Throwable], + ): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = { + lastFailure.map { t => + logger.warn(s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil + .messageWithStacktrace(t)}, will retry after delay of ${delay}") + // Always retry (TODO(#3429): consider a max number of retries?) + delay -> () + } + } + } + + PekkoUtil + .restartSource( + name = "acs-snapshot-dump", + initial = (), + mkSource = (_: Unit) => mksrc, + policy = policy, + ) + } +} + +object SingleAcsSnapshotBulkStorage { + + /** Returns a Pekko flow that receives (timestamp, migration) elements, each identifying an ACS snapshot to be dumped, + * and dumps each corresponding snapshot to the S3 storage. Every successful dump emits back the (timestamp, migration) + * pair, to indicate the last successfully dumped snapshot. + */ + def asFlow( + config: BulkStorageConfig, + acsSnapshotStore: AcsSnapshotStore, + s3Connection: S3BucketConnection, + loggerFactory: NamedLoggerFactory, + )(implicit + actorSystem: ActorSystem, + tc: TraceContext, + ec: ExecutionContext, + ): Flow[(Long, CantonTimestamp), (Long, CantonTimestamp), NotUsed] = + Flow[(Long, CantonTimestamp)].flatMapConcat { + case (migrationId: Long, timestamp: CantonTimestamp) => + new SingleAcsSnapshotBulkStorage( + migrationId, + timestamp, + config, + acsSnapshotStore, + s3Connection, + loggerFactory, + ).getSource + } + + /** The same flow as a source, currently used only for unit testing. + */ + def asSource( + migrationId: Long, + timestamp: CantonTimestamp, + config: BulkStorageConfig, + acsSnapshotStore: AcsSnapshotStore, + s3Connection: S3BucketConnection, + loggerFactory: NamedLoggerFactory, + )(implicit + actorSystem: ActorSystem, + tc: TraceContext, + ec: ExecutionContext, + ): Source[WithKillSwitch[(Long, CantonTimestamp)], (KillSwitch, Future[Done])] = + new SingleAcsSnapshotBulkStorage( + migrationId, + timestamp, + config, + acsSnapshotStore, + s3Connection, + loggerFactory, + ).getRetryingSource + +} diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotBulkStorageTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotBulkStorageTest.scala deleted file mode 100644 index 7e4e595bed..0000000000 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotBulkStorageTest.scala +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package org.lfdecentralizedtrust.splice.scan.store - -import org.lfdecentralizedtrust.splice.http.v0.definitions as httpApi -import com.digitalasset.canton.data.CantonTimestamp -import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.protocol.LfContractId -import com.digitalasset.canton.topology.PartyId -import com.digitalasset.canton.tracing.TraceContext -import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} -import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult -import org.lfdecentralizedtrust.splice.scan.store.bulk.{ - AcsSnapshotBulkStorage, - BulkStorageConfig, - S3BucketConnection, -} -import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, StoreTest} -import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent -import org.lfdecentralizedtrust.splice.util.PackageQualifiedName -import org.mockito.ArgumentMatchers.anyString -import org.mockito.Mockito -import org.mockito.invocation.InvocationOnMock -import software.amazon.awssdk.services.s3.model.ListObjectsRequest - -import java.nio.ByteBuffer -import scala.concurrent.{ExecutionContext, Future} -import scala.jdk.FutureConverters.* -import scala.jdk.CollectionConverters.* - -class AcsSnapshotBulkStorageTest - extends StoreTest - with HasExecutionContext - with HasActorSystem - with HasS3Mock { - - val acsSnapshotSize = 48500 - val bulkStorageTestConfig = BulkStorageConfig( - 1000, - 50000L, - ) - - "AcsSnapshotBulkStorage" should { - "work" in { - withS3Mock { - val store = mockAcsSnapshotStore(acsSnapshotSize) - val timestamp = CantonTimestamp.now() - val s3BucketConnection = getS3BucketConnectionWithInjectedErrors(loggerFactory) - for { - _ <- new AcsSnapshotBulkStorage( - bulkStorageTestConfig, - store, - s3BucketConnection, - loggerFactory, - ).dumpAcsSnapshot(0, timestamp) - - s3Objects <- s3BucketConnection.s3Client - .listObjects( - ListObjectsRequest.builder().bucket("bucket").build() - ) - .asScala - allContracts <- store - .queryAcsSnapshot( - 0, - timestamp, - None, - HardLimit.tryCreate(acsSnapshotSize, acsSnapshotSize), - Seq.empty, - Seq.empty, - ) - .map(_.createdEventsInPage) - } yield { - val objectKeys = s3Objects.contents.asScala.sortBy(_.key()) - objectKeys should have length 6 - objectKeys.take(objectKeys.size - 1).forall { - !_.key().endsWith("_last.zstd") - } - objectKeys.last.key() should endWith("_last.zstd") - - val allContractsFromS3 = objectKeys.flatMap( - readUncompressAndDecode( - s3BucketConnection, - io.circe.parser.decode[httpApi.CreatedEvent], - ) - ) - allContractsFromS3.map( - CompactJsonScanHttpEncodingsWithFieldLabels().httpToJavaCreatedEvent - ) should contain theSameElementsInOrderAs allContracts.map(_.event) - } - } - } - } - - def mockAcsSnapshotStore(snapshotSize: Int): AcsSnapshotStore = { - val store = mock[AcsSnapshotStore] - val partyId = mkPartyId("alice") - when( - store.queryAcsSnapshot( - anyLong, - any[CantonTimestamp], - any[Option[Long]], - any[Limit], - any[Seq[PartyId]], - any[Seq[PackageQualifiedName]], - )(any[TraceContext]) - ).thenAnswer { - ( - migration: Long, - timestamp: CantonTimestamp, - after: Option[Long], - limit: Limit, - _: Seq[PartyId], - _: Seq[PackageQualifiedName], - ) => - Future { - val remaining = snapshotSize - after.getOrElse(0L) - val numElems = math.min(limit.limit.toLong, remaining) - val result = QueryAcsSnapshotResult( - migration, - timestamp, - Vector - .range(0, numElems) - .map(i => { - val idx = i + after.getOrElse(0L) - val amt = amulet( - partyId, - BigDecimal(idx), - 0L, - BigDecimal(0.1), - contractId = LfContractId.assertFromString("00" + f"$idx%064x").coid, - ) - SpliceCreatedEvent(s"#event_id_$idx:1", toCreatedEvent(amt)) - }), - if (numElems < remaining) Some(after.getOrElse(0L) + numElems) else None, - ) - result - } - } - store - } - - def getS3BucketConnectionWithInjectedErrors( - loggerFactory: NamedLoggerFactory - ): S3BucketConnection = { - val s3BucketConnection: S3BucketConnection = getS3BucketConnection(loggerFactory) - val s3BucketConnectionWithErrors = Mockito.spy(s3BucketConnection) - var failureCount = 0 - val _ = doAnswer { (invocation: InvocationOnMock) => - val args = invocation.getArguments - args.toList match { - case (key: String) :: _ if key.endsWith("2.zstd") => - if (failureCount < 2) { - failureCount += 1 - Future.failed(new RuntimeException("Simulated S3 write error")) - } else { - invocation.callRealMethod().asInstanceOf[Future[Unit]] - } - case _ => - invocation.callRealMethod().asInstanceOf[Future[Unit]] - } - }.when(s3BucketConnectionWithErrors) - .writeFullObject(anyString(), any[ByteBuffer])(any[TraceContext], any[ExecutionContext]) - s3BucketConnectionWithErrors - } - -} diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorageTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorageTest.scala new file mode 100644 index 0000000000..03f476c2db --- /dev/null +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorageTest.scala @@ -0,0 +1,257 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.scan.store.bulk + +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.logging.NamedLoggerFactory +import com.digitalasset.canton.protocol.LfContractId +import com.digitalasset.canton.topology.PartyId +import com.digitalasset.canton.tracing.TraceContext +import com.digitalasset.canton.util.PekkoUtil.WithKillSwitch +import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.testkit.scaladsl.TestSink +import org.lfdecentralizedtrust.splice.http.v0.definitions as httpApi +import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore +import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult +import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent +import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, StoreTest} +import org.lfdecentralizedtrust.splice.util.PackageQualifiedName +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito +import org.mockito.invocation.InvocationOnMock +import software.amazon.awssdk.services.s3.model.ListObjectsRequest + +import java.nio.ByteBuffer +import java.time.Instant +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters.* +import scala.jdk.FutureConverters.* +import scala.concurrent.duration.* + +class AcsSnapshotBulkStorageTest + extends StoreTest + with HasExecutionContext + with HasActorSystem + with HasS3Mock { + + val acsSnapshotSize = 48500 + val bulkStorageTestConfig = BulkStorageConfig( + 1000, + 50000L, + ) + + "AcsSnapshotBulkStorage" should { + "successfully dump a single ACS snapshot" in { + withS3Mock { + val store = new MockAcsSnapshotStore().store + val s3BucketConnection = getS3BucketConnectionWithInjectedErrors(loggerFactory) + for { + _ <- SingleAcsSnapshotBulkStorage + .asSource( + 0, + MockAcsSnapshotStore.initialSnapshotTimestamp, + bulkStorageTestConfig, + store, + s3BucketConnection, + loggerFactory, + ) + .runWith(Sink.ignore) + + s3Objects <- s3BucketConnection.s3Client + .listObjects( + ListObjectsRequest.builder().bucket("bucket").build() + ) + .asScala + allContracts <- store + .queryAcsSnapshot( + 0, + MockAcsSnapshotStore.initialSnapshotTimestamp, + None, + HardLimit.tryCreate(acsSnapshotSize, acsSnapshotSize), + Seq.empty, + Seq.empty, + ) + .map(_.createdEventsInPage) + } yield { + val objectKeys = s3Objects.contents.asScala.sortBy(_.key()) + objectKeys should have length 6 + objectKeys.take(objectKeys.size - 1).forall { + !_.key().endsWith("_last.zstd") + } + objectKeys.last.key() should endWith("_last.zstd") + + val allContractsFromS3 = objectKeys.flatMap( + readUncompressAndDecode( + s3BucketConnection, + io.circe.parser.decode[httpApi.CreatedEvent], + ) + ) + allContractsFromS3.map( + CompactJsonScanHttpEncodingsWithFieldLabels().httpToJavaCreatedEvent + ) should contain theSameElementsInOrderAs allContracts.map(_.event) + } + } + } + + "correctly process multiple ACS snapshots" in { + withS3Mock { + val store = new MockAcsSnapshotStore() + val s3BucketConnection = getS3BucketConnectionWithInjectedErrors(loggerFactory) + val probe = new AcsSnapshotBulkStorage( + bulkStorageTestConfig, + store.store, + s3BucketConnection, + loggerFactory, + ).getSource + .runWith(TestSink.probe[WithKillSwitch[(Long, CantonTimestamp)]]) + + clue("Initially, a single snapshot is dumped") { + probe.request(2) + probe.expectNext(2.minutes).value shouldBe (0, CantonTimestamp.tryFromInstant( + Instant.ofEpochSecond(10) + )) + probe.expectNoMessage(10.seconds) + } + + clue("Add another snapshot to the store, it is also dumped") { + store.addSnapshot(CantonTimestamp.tryFromInstant(Instant.ofEpochSecond(20))) + val next = probe.expectNext(2.minutes) + next.value shouldBe (0, CantonTimestamp.tryFromInstant( + Instant.ofEpochSecond(20) + )) + probe.expectNoMessage(10.seconds) + next.killSwitch.shutdown() + } + + succeed + } + } + } + + object MockAcsSnapshotStore { + val initialSnapshotTimestamp: CantonTimestamp = + CantonTimestamp.tryFromInstant(Instant.ofEpochSecond(10)) + } + class MockAcsSnapshotStore { + private var snapshots = Seq(MockAcsSnapshotStore.initialSnapshotTimestamp) + val store = mockAcsSnapshotStore(acsSnapshotSize) + + def addSnapshot(timestamp: CantonTimestamp) = { snapshots = snapshots :+ timestamp } + + def mockAcsSnapshotStore(snapshotSize: Int): AcsSnapshotStore = { + val store = mock[AcsSnapshotStore] + val partyId = mkPartyId("alice") + when( + store.queryAcsSnapshot( + anyLong, + any[CantonTimestamp], + any[Option[Long]], + any[Limit], + any[Seq[PartyId]], + any[Seq[PackageQualifiedName]], + )(any[TraceContext]) + ).thenAnswer { + ( + migration: Long, + timestamp: CantonTimestamp, + after: Option[Long], + limit: Limit, + _: Seq[PartyId], + _: Seq[PackageQualifiedName], + ) => + if (snapshots.contains(timestamp)) { + Future { + val remaining = snapshotSize - after.getOrElse(0L) + val numElems = math.min(limit.limit.toLong, remaining) + val result = QueryAcsSnapshotResult( + migration, + timestamp, + Vector + .range(0, numElems) + .map(i => { + val idx = i + after.getOrElse(0L) + val amt = amulet( + partyId, + BigDecimal(idx), + 0L, + BigDecimal(0.1), + contractId = LfContractId.assertFromString("00" + f"$idx%064x").coid, + ) + SpliceCreatedEvent(s"#event_id_$idx:1", toCreatedEvent(amt)) + }), + if (numElems < remaining) Some(after.getOrElse(0L) + numElems) else None, + ) + result + } + } else { + Future.failed( + new RuntimeException( + s"Unexpected timestamp $timestamp. Known snapshots are: $snapshots" + ) + ) + } + } + + when( + store.lookupSnapshotAfter( + anyLong, + any[CantonTimestamp], + )(any[TraceContext]) + ).thenAnswer { + ( + _: Long, + timestamp: CantonTimestamp, + ) => + Future.successful { + snapshots + .filter(_ > timestamp) + .sorted + .headOption + .map(next => + AcsSnapshotStore.AcsSnapshot( + // only record time and migration ID are used, everything else is ignored + snapshotRecordTime = next, + migrationId = 0L, + 0L, + 0L, + 0L, + None, + None, + ) + ) + } + + } + + store + } + } + + def getS3BucketConnectionWithInjectedErrors( + loggerFactory: NamedLoggerFactory + ): S3BucketConnection = { + val s3BucketConnection: S3BucketConnection = getS3BucketConnection(loggerFactory) + val s3BucketConnectionWithErrors = Mockito.spy(s3BucketConnection) + var failureCount = 0 + val _ = doAnswer { (invocation: InvocationOnMock) => + val args = invocation.getArguments + args.toList match { + case (key: String) :: _ if key.endsWith("2.zstd") => + if (failureCount < 2) { + failureCount += 1 + Future.failed(new RuntimeException("Simulated S3 write error")) + } else { + failureCount = 0 + invocation.callRealMethod().asInstanceOf[Future[Unit]] + } + case _ => + invocation.callRealMethod().asInstanceOf[Future[Unit]] + } + }.when(s3BucketConnectionWithErrors) + .writeFullObject(anyString(), any[ByteBuffer])(any[TraceContext], any[ExecutionContext]) + s3BucketConnectionWithErrors + } + +} diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/HasS3Mock.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/HasS3Mock.scala similarity index 96% rename from apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/HasS3Mock.scala rename to apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/HasS3Mock.scala index 9e5a46cfe4..4150971c2a 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/HasS3Mock.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/HasS3Mock.scala @@ -1,11 +1,10 @@ -package org.lfdecentralizedtrust.splice.scan.store +package org.lfdecentralizedtrust.splice.scan.store.bulk -import com.digitalasset.canton.{BaseTest, FutureHelpers} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} +import com.digitalasset.canton.{BaseTest, FutureHelpers} import com.github.luben.zstd.ZstdDirectBufferDecompressingStream import io.netty.buffer.PooledByteBufAllocator import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings -import org.lfdecentralizedtrust.splice.scan.store.bulk.{S3BucketConnection, S3Config} import org.scalatest.EitherValues import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.regions.Region diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistoryBulkStorageTest.scala similarity index 95% rename from apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala rename to apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistoryBulkStorageTest.scala index a3878fe5ff..2b7d270214 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistoryBulkStorageTest.scala @@ -1,7 +1,7 @@ // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -package org.lfdecentralizedtrust.splice.scan.store +package org.lfdecentralizedtrust.splice.scan.store.bulk import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.protocol.LfContractId @@ -9,27 +9,16 @@ import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} import org.lfdecentralizedtrust.splice.environment.ledger.api.TransactionTreeUpdate import org.lfdecentralizedtrust.splice.http.v0.definitions.UpdateHistoryItemV2 -import org.lfdecentralizedtrust.splice.scan.store.bulk.{ - BulkStorageConfig, - Result, - UpdateHistorySegmentBulkStorage, -} import org.lfdecentralizedtrust.splice.store.UpdateHistory.UpdateHistoryResponse -import org.lfdecentralizedtrust.splice.store.{ - HardLimit, - Limit, - StoreTest, - TreeUpdateWithMigrationId, - UpdateHistory, -} +import org.lfdecentralizedtrust.splice.store.* import org.scalatest.concurrent.PatienceConfiguration import software.amazon.awssdk.services.s3.model.ListObjectsRequest import java.time.Instant import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import scala.jdk.FutureConverters.* import scala.jdk.CollectionConverters.* +import scala.jdk.FutureConverters.* class UpdateHistoryBulkStorageTest extends StoreTest diff --git a/test-full-class-names-local-net-based.log b/test-full-class-names-local-net-based.log index c7efbc8e7f..875d26b6d5 100644 --- a/test-full-class-names-local-net-based.log +++ b/test-full-class-names-local-net-based.log @@ -1,3 +1,3 @@ org.lfdecentralizedtrust.splice.integration.tests.LocalNetFrontendIntegrationTest -org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotBulkStorageTest -org.lfdecentralizedtrust.splice.scan.store.UpdateHistoryBulkStorageTest +org.lfdecentralizedtrust.splice.scan.store.bulk.AcsSnapshotBulkStorageTest +org.lfdecentralizedtrust.splice.scan.store.bulk.UpdateHistoryBulkStorageTest