Skip to content

Commit 04e6f6c

Browse files
committed
[ci] add support for key-value store in Scan
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent bb81cad commit 04e6f6c

File tree

4 files changed

+141
-0
lines changed

4 files changed

+141
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.scan.store
5+
6+
import com.digitalasset.canton.data.CantonTimestamp
7+
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
8+
import io.circe.Codec
9+
import io.circe.generic.semiauto.deriveCodec
10+
import org.lfdecentralizedtrust.splice.store.KeyValueStore
11+
import cats.data.OptionT
12+
import cats.implicits.toBifunctorOps
13+
import com.digitalasset.canton.tracing.TraceContext
14+
import org.lfdecentralizedtrust.splice.scan.store.ScanKeyValueProvider.*
15+
16+
import scala.concurrent.{ExecutionContext, Future}
17+
18+
class ScanKeyValueProvider(val store: KeyValueStore, val loggerFactory: NamedLoggerFactory)
19+
extends NamedLogging {
20+
21+
private val latestAcsSnapshotInBulkStorageKey = "latest_acs_snapshot_in_bulk_storage"
22+
23+
final def setLatestAcsSnapshotsInBulkStorage(
24+
timestamp: CantonTimestamp,
25+
migrationId: Long,
26+
)(implicit tc: TraceContext): Future[Unit] = store.setValue(
27+
latestAcsSnapshotInBulkStorageKey,
28+
AcsSnapshotTimestampMigration(timestamp, migrationId),
29+
)
30+
31+
final def getLatestAcsSnapshotInBulkStorage()(implicit
32+
tc: TraceContext,
33+
ec: ExecutionContext,
34+
): OptionT[Future, (CantonTimestamp, Long)] = {
35+
val result: OptionT[Future, AcsSnapshotTimestampMigration] = store.readValueAndLogOnDecodingFailure(latestAcsSnapshotInBulkStorageKey)
36+
result.map(result => (result.timestamp, result.migrationId))
37+
}
38+
}
39+
40+
object ScanKeyValueProvider {
41+
final case class AcsSnapshotTimestampMigration(
42+
timestamp: CantonTimestamp,
43+
migrationId: Long,
44+
)
45+
implicit val timestampCodec: Codec[CantonTimestamp] =
46+
Codec
47+
.from[Long](implicitly, implicitly)
48+
.iemap(timestamp => CantonTimestamp.fromProtoPrimitive(timestamp).leftMap(_.message))(
49+
_.toProtoPrimitive
50+
)
51+
implicit val acsSnapshotTimestampMigrationCodec: Codec[AcsSnapshotTimestampMigration] =
52+
deriveCodec[AcsSnapshotTimestampMigration]
53+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.lfdecentralizedtrust.splice.scan.store
2+
3+
import com.digitalasset.canton.lifecycle.CloseContext
4+
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory}
5+
import com.digitalasset.canton.resource.DbStorage
6+
import com.digitalasset.canton.topology.{ParticipantId, PartyId}
7+
import com.digitalasset.canton.tracing.TraceContext
8+
import org.lfdecentralizedtrust.splice.store.KeyValueStore
9+
import org.lfdecentralizedtrust.splice.store.db.StoreDescriptor
10+
11+
import scala.concurrent.{ExecutionContext, Future}
12+
13+
object ScanKeyValueStore {
14+
def apply(
15+
dsoParty: PartyId,
16+
participantId: ParticipantId,
17+
storage: DbStorage,
18+
loggerFactory: NamedLoggerFactory,
19+
)(implicit
20+
ec: ExecutionContext,
21+
lc: ErrorLoggingContext,
22+
cc: CloseContext,
23+
tc: TraceContext,
24+
): Future[KeyValueStore] = {
25+
KeyValueStore(
26+
StoreDescriptor(
27+
version = 1,
28+
name = "ScanKeyValueStore",
29+
party = dsoParty,
30+
participant = participantId,
31+
key = Map(
32+
"dsoParty" -> dsoParty.toProtoPrimitive
33+
),
34+
),
35+
storage,
36+
loggerFactory,
37+
)
38+
}
39+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.scan.store
5+
6+
import com.digitalasset.canton.HasExecutionContext
7+
import com.digitalasset.canton.data.CantonTimestamp
8+
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
9+
import com.digitalasset.canton.resource.DbStorage
10+
import com.digitalasset.canton.tracing.TraceContext
11+
import org.lfdecentralizedtrust.splice.store.StoreTest
12+
import org.lfdecentralizedtrust.splice.store.db.SplicePostgresTest
13+
import org.scalatest.matchers.should.Matchers
14+
15+
import scala.concurrent.Future
16+
17+
class ScanKeyValueProviderTest
18+
extends StoreTest
19+
with Matchers
20+
with HasExecutionContext
21+
with SplicePostgresTest {
22+
"ScanKeyValueProvider" should {
23+
"set and get acs snapshots timestamps" in {
24+
val ts = CantonTimestamp.now()
25+
val migrationId = 7L
26+
for {
27+
provider <- mkProvider
28+
_ <- provider.setLatestAcsSnapshotsInBulkStorage(ts, migrationId)
29+
readBack <- provider.getLatestAcsSnapshotInBulkStorage().value
30+
} yield {
31+
readBack.value shouldBe (ts, migrationId)
32+
}
33+
}
34+
}
35+
36+
def mkProvider: Future[ScanKeyValueProvider] = {
37+
ScanKeyValueStore(
38+
dsoParty = dsoParty,
39+
participantId = mkParticipantId("participant"),
40+
storage,
41+
loggerFactory,
42+
).map(new ScanKeyValueProvider(_, loggerFactory))
43+
}
44+
45+
override protected def cleanDb(
46+
storage: DbStorage
47+
)(implicit traceContext: TraceContext): FutureUnlessShutdown[?] = resetAllAppTables(storage)
48+
}

test-full-class-names-non-integration.log

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnectionTest
1212
org.lfdecentralizedtrust.splice.scan.admin.http.ScanHttpEncodingsTest
1313
org.lfdecentralizedtrust.splice.scan.automation.AcsSnapshotTriggerTest
1414
org.lfdecentralizedtrust.splice.scan.store.ScanEventStoreTest
15+
org.lfdecentralizedtrust.splice.scan.store.ScanKeyValueProviderTest
1516
org.lfdecentralizedtrust.splice.store.DomainTimeStoreTest
1617
org.lfdecentralizedtrust.splice.store.InMemorySynchronizerStoreTest
1718
org.lfdecentralizedtrust.splice.store.ScanHistoryBackfillingTest

0 commit comments

Comments
 (0)