Skip to content

Commit 3c93319

Browse files
committed
wip
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent bb07b6e commit 3c93319

File tree

4 files changed

+20
-34
lines changed

4 files changed

+20
-34
lines changed

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import com.digitalasset.canton.data.CantonTimestamp
88
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
99
import com.digitalasset.canton.tracing.TraceContext
1010
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
11-
import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy
11+
import com.digitalasset.canton.util.PekkoUtil.{RetrySourcePolicy, WithKillSwitch}
1212
import org.apache.pekko.actor.ActorSystem
1313
import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy}
14-
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
14+
import org.apache.pekko.stream.scaladsl.{Keep, Source}
1515
import org.apache.pekko.util.ByteString
1616
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
1717
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
@@ -24,8 +24,8 @@ import java.nio.ByteBuffer
2424
import java.nio.charset.StandardCharsets
2525
import java.util.concurrent.atomic.AtomicInteger
2626
import scala.concurrent.duration.FiniteDuration
27-
2827
import Position.*
28+
import org.apache.pekko.Done
2929

3030
object Position {
3131
sealed trait Position
@@ -73,7 +73,10 @@ class AcsSnapshotBulkStorage(
7373

7474
}
7575

76-
def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {
76+
private[bulk] def getSingleAcsSnapshotDumpSource(
77+
migrationId: Long,
78+
timestamp: CantonTimestamp,
79+
): Source[WithKillSwitch[Int], (KillSwitch, Future[Done])] = {
7780

7881
def mksrc = {
7982
val idx = new AtomicInteger(0)
@@ -129,7 +132,5 @@ class AcsSnapshotBulkStorage(
129132
mkSource = (_: Unit) => mksrc,
130133
policy = policy,
131134
)
132-
.runWith(Sink.ignore)
133-
134-
}.map(_ => ())
135+
}
135136
}

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotBulkStorageTest.scala renamed to apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorageTest.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
11
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package org.lfdecentralizedtrust.splice.scan.store
4+
package org.lfdecentralizedtrust.splice.scan.store.bulk
55

6-
import org.lfdecentralizedtrust.splice.http.v0.definitions as httpApi
76
import com.digitalasset.canton.data.CantonTimestamp
87
import com.digitalasset.canton.logging.NamedLoggerFactory
98
import com.digitalasset.canton.protocol.LfContractId
109
import com.digitalasset.canton.topology.PartyId
1110
import com.digitalasset.canton.tracing.TraceContext
1211
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
12+
import org.apache.pekko.stream.scaladsl.Sink
13+
import org.lfdecentralizedtrust.splice.http.v0.definitions as httpApi
14+
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
1315
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult
14-
import org.lfdecentralizedtrust.splice.scan.store.bulk.{
15-
AcsSnapshotBulkStorage,
16-
BulkStorageConfig,
17-
S3BucketConnection,
18-
}
19-
import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, StoreTest}
2016
import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent
17+
import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, StoreTest}
2118
import org.lfdecentralizedtrust.splice.util.PackageQualifiedName
2219
import org.mockito.ArgumentMatchers.anyString
2320
import org.mockito.Mockito
@@ -26,8 +23,8 @@ import software.amazon.awssdk.services.s3.model.ListObjectsRequest
2623

2724
import java.nio.ByteBuffer
2825
import scala.concurrent.{ExecutionContext, Future}
29-
import scala.jdk.FutureConverters.*
3026
import scala.jdk.CollectionConverters.*
27+
import scala.jdk.FutureConverters.*
3128

3229
class AcsSnapshotBulkStorageTest
3330
extends StoreTest
@@ -53,7 +50,7 @@ class AcsSnapshotBulkStorageTest
5350
store,
5451
s3BucketConnection,
5552
loggerFactory,
56-
).dumpAcsSnapshot(0, timestamp)
53+
).getSingleAcsSnapshotDumpSource(0, timestamp).runWith(Sink.ignore)
5754

5855
s3Objects <- s3BucketConnection.s3Client
5956
.listObjects(

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/HasS3Mock.scala renamed to apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/HasS3Mock.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
package org.lfdecentralizedtrust.splice.scan.store
1+
package org.lfdecentralizedtrust.splice.scan.store.bulk
22

3-
import com.digitalasset.canton.{BaseTest, FutureHelpers}
43
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
4+
import com.digitalasset.canton.{BaseTest, FutureHelpers}
55
import com.github.luben.zstd.ZstdDirectBufferDecompressingStream
66
import io.netty.buffer.PooledByteBufAllocator
77
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
8-
import org.lfdecentralizedtrust.splice.scan.store.bulk.{S3BucketConnection, S3Config}
98
import org.scalatest.EitherValues
109
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
1110
import software.amazon.awssdk.regions.Region

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala renamed to apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistoryBulkStorageTest.scala

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,24 @@
11
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package org.lfdecentralizedtrust.splice.scan.store
4+
package org.lfdecentralizedtrust.splice.scan.store.bulk
55

66
import com.digitalasset.canton.data.CantonTimestamp
77
import com.digitalasset.canton.protocol.LfContractId
88
import com.digitalasset.canton.tracing.TraceContext
99
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
1010
import org.lfdecentralizedtrust.splice.environment.ledger.api.TransactionTreeUpdate
1111
import org.lfdecentralizedtrust.splice.http.v0.definitions.UpdateHistoryItemV2
12-
import org.lfdecentralizedtrust.splice.scan.store.bulk.{
13-
BulkStorageConfig,
14-
Result,
15-
UpdateHistorySegmentBulkStorage,
16-
}
1712
import org.lfdecentralizedtrust.splice.store.UpdateHistory.UpdateHistoryResponse
18-
import org.lfdecentralizedtrust.splice.store.{
19-
HardLimit,
20-
Limit,
21-
StoreTest,
22-
TreeUpdateWithMigrationId,
23-
UpdateHistory,
24-
}
13+
import org.lfdecentralizedtrust.splice.store.*
2514
import org.scalatest.concurrent.PatienceConfiguration
2615
import software.amazon.awssdk.services.s3.model.ListObjectsRequest
2716

2817
import java.time.Instant
2918
import scala.concurrent.Future
3019
import scala.concurrent.duration.FiniteDuration
31-
import scala.jdk.FutureConverters.*
3220
import scala.jdk.CollectionConverters.*
21+
import scala.jdk.FutureConverters.*
3322

3423
class UpdateHistoryBulkStorageTest
3524
extends StoreTest

0 commit comments

Comments
 (0)