Skip to content

Commit 66d983c

Browse files
committed
[ci] refactor: extract KeyValueStore to common
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent 739017f commit 66d983c

File tree

6 files changed

+172
-163
lines changed

6 files changed

+172
-163
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.store
5+
6+
import cats.data.OptionT
7+
import cats.implicits.catsSyntaxOptionId
8+
import com.digitalasset.canton.lifecycle.CloseContext
9+
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
10+
import com.digitalasset.canton.resource.DbStorage
11+
import com.digitalasset.canton.tracing.TraceContext
12+
import io.circe.{Decoder, Encoder}
13+
import org.lfdecentralizedtrust.splice.store.db.{DbValidatorInternalStore, StoreDescriptor}
14+
15+
import scala.concurrent.{ExecutionContext, Future}
16+
17+
case class KeyValueStoreDbTableConfig(
18+
tableName: String,
19+
keyColumnName: String,
20+
valueColumnName: String,
21+
)
22+
23+
trait KeyValueStore extends NamedLogging {
24+
25+
def setValue[T](key: String, value: T)(implicit
26+
tc: TraceContext,
27+
encoder: Encoder[T],
28+
): Future[Unit]
29+
30+
def getValue[T](
31+
key: String
32+
)(implicit tc: TraceContext, decoder: Decoder[T]): OptionT[Future, Decoder.Result[T]]
33+
34+
def deleteKey(key: String)(implicit
35+
tc: TraceContext
36+
): Future[Unit]
37+
38+
def readValueAndLogOnDecodingFailure[T: Decoder](
39+
key: String
40+
)(implicit tc: TraceContext, ec: ExecutionContext): OptionT[Future, T] = {
41+
getValue[T](key)
42+
.subflatMap(
43+
_.fold(
44+
{ failure =>
45+
logger.warn(s"Failed to decode $key from the db $failure")
46+
None
47+
},
48+
_.some,
49+
)
50+
)
51+
}
52+
53+
}
54+
55+
object KeyValueStore {
56+
57+
def apply(
58+
descriptor: StoreDescriptor,
59+
tableConfig: KeyValueStoreDbTableConfig,
60+
storage: DbStorage,
61+
loggerFactory: NamedLoggerFactory,
62+
)(implicit
63+
ec: ExecutionContext,
64+
lc: ErrorLoggingContext,
65+
cc: CloseContext,
66+
tc: TraceContext,
67+
): Future[KeyValueStore] = {
68+
storage match {
69+
case storage: DbStorage =>
70+
DbValidatorInternalStore(
71+
descriptor,
72+
tableConfig,
73+
storage,
74+
loggerFactory,
75+
)
76+
77+
case storageType => throw new RuntimeException(s"Unsupported storage type $storageType")
78+
}
79+
}
80+
}

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/db/DbValidatorInternalStore.scala renamed to apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbValidatorInternalStore.scala

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,48 @@
11
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package org.lfdecentralizedtrust.splice.validator.store.db
4+
package org.lfdecentralizedtrust.splice.store.db
55

66
import cats.data.OptionT
7+
import com.digitalasset.canton.discard.Implicits.DiscardOps
78
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
89
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
9-
import com.digitalasset.canton.discard.Implicits.DiscardOps
1010
import com.digitalasset.canton.resource.DbStorage
1111
import com.digitalasset.canton.tracing.TraceContext
12-
import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture
13-
import com.digitalasset.canton.topology.{ParticipantId, PartyId}
14-
import org.lfdecentralizedtrust.splice.validator.store.ValidatorInternalStore
1512
import io.circe.syntax.EncoderOps
1613
import io.circe.{Decoder, Encoder, Json}
17-
import org.lfdecentralizedtrust.splice.store.db.AcsJdbcTypes
14+
import org.lfdecentralizedtrust.splice.store.{KeyValueStore, KeyValueStoreDbTableConfig}
15+
import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture
1816
import slick.jdbc.JdbcProfile
1917
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
18+
2019
import scala.concurrent.{ExecutionContext, Future}
21-
import org.lfdecentralizedtrust.splice.store.db.{StoreDescriptor, StoreDescriptorStore}
2220

2321
class DbValidatorInternalStore private (
2422
storage: DbStorage,
23+
val dbTableConfig: KeyValueStoreDbTableConfig,
2524
val storeId: Int,
2625
val loggerFactory: NamedLoggerFactory,
2726
)(implicit
2827
val ec: ExecutionContext,
2928
val loggingContext: ErrorLoggingContext,
3029
val closeContext: CloseContext,
31-
) extends ValidatorInternalStore
30+
) extends KeyValueStore
3231
with AcsJdbcTypes
3332
with NamedLogging {
3433

3534
val profile: JdbcProfile = storage.profile.jdbc
3635

37-
override def setConfig[T](key: String, value: T)(implicit
38-
tc: TraceContext,
39-
encoder: Encoder[T],
36+
override def setValue[T](key: String, value: T)(implicit
37+
tc: TraceContext,
38+
encoder: Encoder[T],
4039
): Future[Unit] = {
4140
val jsonValue: Json = value.asJson
4241

43-
val action = sql"""INSERT INTO validator_internal_config (config_key, config_value, store_id)
42+
val action = sql"""INSERT INTO ${dbTableConfig.tableName} (${dbTableConfig.keyColumnName}, ${dbTableConfig.valueColumnName}, store_id)
4443
VALUES ($key, $jsonValue, $storeId)
45-
ON CONFLICT (store_id, config_key) DO UPDATE
46-
SET config_value = excluded.config_value""".asUpdate
44+
ON CONFLICT (store_id, ${dbTableConfig.keyColumnName}) DO UPDATE
45+
SET ${dbTableConfig.keyColumnName} = excluded.${dbTableConfig.keyColumnName}""".asUpdate
4746

4847
val updateAction = storage.update(action, "set-validator-internal-config")
4948

@@ -53,15 +52,15 @@ class DbValidatorInternalStore private (
5352
updateAction.map(_ => ())
5453
}
5554

56-
override def getConfig[T](
55+
override def getValue[T](
5756
key: String
5857
)(implicit tc: TraceContext, decoder: Decoder[T]): OptionT[Future, Decoder.Result[T]] = {
5958

6059
logger.debug(s"Retrieving config key $key")
6160

62-
val queryAction = sql"""SELECT config_value
63-
FROM validator_internal_config
64-
WHERE config_key = $key AND store_id = $storeId
61+
val queryAction = sql"""SELECT ${dbTableConfig.valueColumnName}
62+
FROM ${dbTableConfig.tableName}
63+
WHERE ${dbTableConfig.keyColumnName} = $key AND store_id = $storeId
6564
""".as[Json].headOption
6665

6766
val jsonOptionT: OptionT[FutureUnlessShutdown, Json] =
@@ -87,15 +86,15 @@ class DbValidatorInternalStore private (
8786
OptionT(futureUnlessShutdownToFuture(resultOptionT.value))
8887
}
8988

90-
override def deleteConfig(key: String)(implicit
91-
tc: TraceContext
89+
override def deleteKey(key: String)(implicit
90+
tc: TraceContext
9291
): Future[Unit] = {
9392
logger.debug(
9493
s"Deleting config key $key"
9594
)
9695
storage
9796
.update(
98-
sqlu"delete from validator_internal_config WHERE config_key = $key AND store_id = $storeId",
97+
sqlu"delete from ${dbTableConfig.tableName} WHERE ${dbTableConfig.keyColumnName} = $key AND store_id = $storeId",
9998
"delete config key",
10099
)
101100
.map(_.discard)
@@ -105,8 +104,8 @@ class DbValidatorInternalStore private (
105104
object DbValidatorInternalStore {
106105

107106
def apply(
108-
participant: ParticipantId,
109-
validatorParty: PartyId,
107+
storeDescriptor: StoreDescriptor,
108+
dbTableConfig: KeyValueStoreDbTableConfig,
110109
storage: DbStorage,
111110
loggerFactory: NamedLoggerFactory,
112111
)(implicit
@@ -117,21 +116,11 @@ object DbValidatorInternalStore {
117116
): Future[DbValidatorInternalStore] = {
118117

119118
StoreDescriptorStore
120-
.getStoreIdForDescriptor(
121-
StoreDescriptor(
122-
version = 2,
123-
name = "DbValidatorInternalConfigStore",
124-
party = validatorParty,
125-
participant = participant,
126-
key = Map(
127-
"validatorParty" -> validatorParty.toProtoPrimitive
128-
),
129-
),
130-
storage,
131-
)
119+
.getStoreIdForDescriptor(storeDescriptor, storage)
132120
.map(storeId => {
133121
new DbValidatorInternalStore(
134122
storage,
123+
dbTableConfig,
135124
storeId,
136125
loggerFactory,
137126
)

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ import org.apache.pekko.http.scaladsl.server.directives.BasicDirectives
3232
import org.lfdecentralizedtrust.splice.admin.api.TraceContextDirectives.withTraceContext
3333
import org.lfdecentralizedtrust.splice.admin.http.{AdminRoutes, HttpErrorHandler}
3434
import org.lfdecentralizedtrust.splice.auth.*
35-
import org.lfdecentralizedtrust.splice.automation.{
36-
DomainParamsAutomationService,
37-
DomainTimeAutomationService,
38-
}
35+
import org.lfdecentralizedtrust.splice.automation.{DomainParamsAutomationService, DomainTimeAutomationService}
3936
import org.lfdecentralizedtrust.splice.config.{NetworkAppClientConfig, SharedSpliceAppParameters}
4037
import org.lfdecentralizedtrust.splice.environment.*
4138
import org.lfdecentralizedtrust.splice.environment.ledger.api.DedupDuration
@@ -48,19 +45,10 @@ import org.lfdecentralizedtrust.splice.http.v0.validator_admin.ValidatorAdminRes
4845
import org.lfdecentralizedtrust.splice.http.v0.validator_public.ValidatorPublicResource
4946
import org.lfdecentralizedtrust.splice.http.v0.wallet.WalletResource as InternalWalletResource
5047
import org.lfdecentralizedtrust.splice.identities.NodeIdentitiesStore
51-
import org.lfdecentralizedtrust.splice.migration.{
52-
DomainDataRestorer,
53-
DomainMigrationInfo,
54-
MigrationTimeInfo,
55-
ParticipantUsersDataRestorer,
56-
}
48+
import org.lfdecentralizedtrust.splice.migration.{DomainDataRestorer, DomainMigrationInfo, MigrationTimeInfo, ParticipantUsersDataRestorer}
5749
import org.lfdecentralizedtrust.splice.scan.admin.api.client
5850
import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.BftScanClientConfig
59-
import org.lfdecentralizedtrust.splice.scan.admin.api.client.{
60-
BftScanConnection,
61-
MinimalScanConnection,
62-
SingleScanConnection,
63-
}
51+
import org.lfdecentralizedtrust.splice.scan.admin.api.client.{BftScanConnection, MinimalScanConnection, SingleScanConnection}
6452
import org.lfdecentralizedtrust.splice.scan.config.ScanAppClientConfig
6553
import org.lfdecentralizedtrust.splice.setup.{NodeInitializer, ParticipantInitializer}
6654
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority
@@ -70,29 +58,14 @@ import org.lfdecentralizedtrust.splice.store.{AppStoreWithIngestion, HistoryMetr
7058
import org.lfdecentralizedtrust.splice.util.*
7159
import org.lfdecentralizedtrust.splice.validator.ValidatorApp.OAuthRealms
7260
import org.lfdecentralizedtrust.splice.validator.admin.http.*
73-
import org.lfdecentralizedtrust.splice.validator.automation.{
74-
ValidatorAutomationService,
75-
ValidatorPackageVettingTrigger,
76-
}
61+
import org.lfdecentralizedtrust.splice.validator.automation.{ValidatorAutomationService, ValidatorPackageVettingTrigger}
7762
import org.lfdecentralizedtrust.splice.validator.config.*
7863
import org.lfdecentralizedtrust.splice.validator.domain.DomainConnector
7964
import org.lfdecentralizedtrust.splice.validator.metrics.ValidatorAppMetrics
80-
import org.lfdecentralizedtrust.splice.validator.migration.{
81-
DomainMigrationDump,
82-
ParticipantPartyMigrator,
83-
}
84-
import org.lfdecentralizedtrust.splice.validator.store.{
85-
ValidatorConfigProvider,
86-
ValidatorInternalStore,
87-
ValidatorStore,
88-
}
65+
import org.lfdecentralizedtrust.splice.validator.migration.{DomainMigrationDump, ParticipantPartyMigrator}
66+
import org.lfdecentralizedtrust.splice.validator.store.{ValidatorConfigProvider, ValidatorInternalStore, ValidatorStore}
8967
import org.lfdecentralizedtrust.splice.validator.util.{ValidatorScanConnection, ValidatorUtil}
90-
import org.lfdecentralizedtrust.splice.wallet.admin.http.{
91-
HttpExternalWalletHandler,
92-
HttpStatusWalletHandler,
93-
HttpWalletHandler,
94-
UserWalletAuthExtractor,
95-
}
68+
import org.lfdecentralizedtrust.splice.wallet.admin.http.{HttpExternalWalletHandler, HttpStatusWalletHandler, HttpWalletHandler, UserWalletAuthExtractor}
9669
import org.lfdecentralizedtrust.splice.wallet.automation.UserWalletAutomationService
9770
import org.lfdecentralizedtrust.splice.wallet.util.ValidatorTopupConfig
9871
import org.lfdecentralizedtrust.splice.wallet.{ExternalPartyWalletManager, UserWalletManager}
@@ -200,9 +173,9 @@ class ValidatorApp(
200173
participantId,
201174
validatorParty = ParticipantPartyMigrator.toPartyId(
202175
config.validatorPartyHint
203-
.getOrElse(
204-
BaseLedgerConnection.sanitizeUserIdToPartyString(config.ledgerApiUser)
205-
),
176+
.getOrElse(
177+
BaseLedgerConnection.sanitizeUserIdToPartyString(config.ledgerApiUser)
178+
),
206179
participantId,
207180
),
208181
storage,

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/ValidatorConfigProvider.scala

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,19 @@
44
package org.lfdecentralizedtrust.splice.validator.store
55

66
import cats.data.OptionT
7-
import cats.implicits.{catsSyntaxOptionId, toBifunctorOps}
7+
import cats.implicits.toBifunctorOps
88
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
99
import com.digitalasset.canton.topology.PartyId
1010
import com.digitalasset.canton.tracing.TraceContext
1111
import io.circe.Codec
12+
1213
import scala.concurrent.Future
1314
import io.circe.generic.semiauto.deriveCodec
14-
import io.circe.Decoder
15+
import org.lfdecentralizedtrust.splice.store.KeyValueStore
16+
1517
import scala.concurrent.ExecutionContext
1618

17-
class ValidatorConfigProvider(config: ValidatorInternalStore, val loggerFactory: NamedLoggerFactory)
19+
class ValidatorConfigProvider(config: KeyValueStore, val loggerFactory: NamedLoggerFactory)
1820
extends NamedLogging {
1921

2022
private val scanInternalConfigKey = "validator_scan_internal_config_key"
@@ -24,50 +26,34 @@ class ValidatorConfigProvider(config: ValidatorInternalStore, val loggerFactory:
2426
final def setScanUrlInternalConfig(
2527
value: Seq[ScanUrlInternalConfig]
2628
)(implicit tc: TraceContext): Future[Unit] = {
27-
config.setConfig(scanInternalConfigKey, value)
29+
config.setValue(scanInternalConfigKey, value)
2830
}
2931

3032
final def getScanUrlInternalConfig(
3133
)(implicit
3234
tc: TraceContext,
3335
ec: ExecutionContext,
3436
): OptionT[Future, Seq[ScanUrlInternalConfig]] = {
35-
readConfigAndLogOnDecodingFailure(scanInternalConfigKey)
37+
config.readValueAndLogOnDecodingFailure(scanInternalConfigKey)
3638
}
3739

3840
def clearPartiesToMigrate()(implicit tc: TraceContext): Future[Unit] = {
3941
logger.info("Clearing parties that were migrated")
40-
config.deleteConfig(migratingPartiesInternalConfigKey)
42+
config.deleteKey(migratingPartiesInternalConfigKey)
4143
}
4244

4345
def getPartiesToMigrate()(implicit
4446
tc: TraceContext,
4547
ec: ExecutionContext,
4648
): OptionT[Future, Set[PartyId]] = {
47-
readConfigAndLogOnDecodingFailure[PartiesBeingMigrated](migratingPartiesInternalConfigKey).map(
49+
config.readValueAndLogOnDecodingFailure[PartiesBeingMigrated](migratingPartiesInternalConfigKey).map(
4850
_.parties
4951
)
5052
}
5153

52-
private def readConfigAndLogOnDecodingFailure[T: Decoder](
53-
key: String
54-
)(implicit tc: TraceContext, ec: ExecutionContext) = {
55-
config
56-
.getConfig[T](key)
57-
.subflatMap(
58-
_.fold(
59-
{ failure =>
60-
logger.warn(s"Failed to decode $key from the db $failure")
61-
None
62-
},
63-
_.some,
64-
)
65-
)
66-
}
67-
6854
def setPartiesToMigrate(parties: Set[PartyId])(implicit tc: TraceContext): Future[Unit] = {
6955
logger.info(s"Storing parties that will be migrated $parties")
70-
config.setConfig(migratingPartiesInternalConfigKey, PartiesBeingMigrated(parties))
56+
config.setValue(migratingPartiesInternalConfigKey, PartiesBeingMigrated(parties))
7157
}
7258
}
7359

0 commit comments

Comments
 (0)