Skip to content

Commit ba16a10

Browse files
authored
refactor: extract KeyValueStore to common (#3621)
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent e7a9f36 commit ba16a10

File tree

9 files changed

+288
-176
lines changed

9 files changed

+288
-176
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ALTER TABLE validator_internal_config
2+
RENAME TO key_value_store;
3+
4+
ALTER TABLE key_value_store
5+
RENAME COLUMN config_key TO key;
6+
7+
ALTER TABLE key_value_store
8+
RENAME COLUMN config_value TO value;
9+
10+
ALTER TABLE key_value_store
11+
RENAME CONSTRAINT uc_validator_internal_config TO uc_key_value_store;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.store
5+
6+
import cats.data.OptionT
7+
import cats.implicits.catsSyntaxOptionId
8+
import com.digitalasset.canton.lifecycle.CloseContext
9+
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
10+
import com.digitalasset.canton.resource.DbStorage
11+
import com.digitalasset.canton.tracing.TraceContext
12+
import io.circe.{Decoder, Encoder}
13+
import org.lfdecentralizedtrust.splice.store.db.{DbKeyValueStore, StoreDescriptor}
14+
15+
import scala.concurrent.{ExecutionContext, Future}
16+
17+
trait KeyValueStore extends NamedLogging {
18+
19+
def setValue[T](key: String, value: T)(implicit
20+
tc: TraceContext,
21+
encoder: Encoder[T],
22+
): Future[Unit]
23+
24+
def getValue[T](
25+
key: String
26+
)(implicit tc: TraceContext, decoder: Decoder[T]): OptionT[Future, Decoder.Result[T]]
27+
28+
def deleteKey(key: String)(implicit
29+
tc: TraceContext
30+
): Future[Unit]
31+
32+
def readValueAndLogOnDecodingFailure[T: Decoder](
33+
key: String
34+
)(implicit tc: TraceContext, ec: ExecutionContext): OptionT[Future, T] = {
35+
getValue[T](key)
36+
.subflatMap(
37+
_.fold(
38+
{ failure =>
39+
logger.warn(s"Failed to decode $key from the db $failure")
40+
None
41+
},
42+
_.some,
43+
)
44+
)
45+
}
46+
47+
}
48+
49+
object KeyValueStore {
50+
51+
def apply(
52+
descriptor: StoreDescriptor,
53+
storage: DbStorage,
54+
loggerFactory: NamedLoggerFactory,
55+
)(implicit
56+
ec: ExecutionContext,
57+
lc: ErrorLoggingContext,
58+
cc: CloseContext,
59+
tc: TraceContext,
60+
): Future[KeyValueStore] = {
61+
storage match {
62+
case storage: DbStorage =>
63+
DbKeyValueStore(
64+
descriptor,
65+
storage,
66+
loggerFactory,
67+
)
68+
69+
case storageType => throw new RuntimeException(s"Unsupported storage type $storageType")
70+
}
71+
}
72+
}

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/db/DbValidatorInternalStore.scala renamed to apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbKeyValueStore.scala

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,47 @@
11
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package org.lfdecentralizedtrust.splice.validator.store.db
4+
package org.lfdecentralizedtrust.splice.store.db
55

66
import cats.data.OptionT
7+
import com.digitalasset.canton.discard.Implicits.DiscardOps
78
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
89
import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory, NamedLogging}
9-
import com.digitalasset.canton.discard.Implicits.DiscardOps
1010
import com.digitalasset.canton.resource.DbStorage
1111
import com.digitalasset.canton.tracing.TraceContext
12-
import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture
13-
import com.digitalasset.canton.topology.{ParticipantId, PartyId}
14-
import org.lfdecentralizedtrust.splice.validator.store.ValidatorInternalStore
1512
import io.circe.syntax.EncoderOps
1613
import io.circe.{Decoder, Encoder, Json}
17-
import org.lfdecentralizedtrust.splice.store.db.AcsJdbcTypes
14+
import org.lfdecentralizedtrust.splice.store.KeyValueStore
15+
import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture
1816
import slick.jdbc.JdbcProfile
1917
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
18+
2019
import scala.concurrent.{ExecutionContext, Future}
21-
import org.lfdecentralizedtrust.splice.store.db.{StoreDescriptor, StoreDescriptorStore}
2220

23-
class DbValidatorInternalStore private (
21+
class DbKeyValueStore private (
2422
storage: DbStorage,
2523
val storeId: Int,
2624
val loggerFactory: NamedLoggerFactory,
2725
)(implicit
2826
val ec: ExecutionContext,
2927
val loggingContext: ErrorLoggingContext,
3028
val closeContext: CloseContext,
31-
) extends ValidatorInternalStore
29+
) extends KeyValueStore
3230
with AcsJdbcTypes
3331
with NamedLogging {
3432

3533
val profile: JdbcProfile = storage.profile.jdbc
3634

37-
override def setConfig[T](key: String, value: T)(implicit
35+
override def setValue[T](key: String, value: T)(implicit
3836
tc: TraceContext,
3937
encoder: Encoder[T],
4038
): Future[Unit] = {
4139
val jsonValue: Json = value.asJson
4240

43-
val action = sql"""INSERT INTO validator_internal_config (config_key, config_value, store_id)
41+
val action = sql"""INSERT INTO key_value_store (key, value, store_id)
4442
VALUES ($key, $jsonValue, $storeId)
45-
ON CONFLICT (store_id, config_key) DO UPDATE
46-
SET config_value = excluded.config_value""".asUpdate
47-
43+
ON CONFLICT (store_id, key) DO UPDATE
44+
SET value = excluded.value""".asUpdate
4845
val updateAction = storage.update(action, "set-validator-internal-config")
4946

5047
logger.debug(
@@ -53,15 +50,15 @@ class DbValidatorInternalStore private (
5350
updateAction.map(_ => ())
5451
}
5552

56-
override def getConfig[T](
53+
override def getValue[T](
5754
key: String
5855
)(implicit tc: TraceContext, decoder: Decoder[T]): OptionT[Future, Decoder.Result[T]] = {
5956

6057
logger.debug(s"Retrieving config key $key")
6158

62-
val queryAction = sql"""SELECT config_value
63-
FROM validator_internal_config
64-
WHERE config_key = $key AND store_id = $storeId
59+
val queryAction = sql"""SELECT value
60+
FROM key_value_store
61+
WHERE key = $key AND store_id = $storeId
6562
""".as[Json].headOption
6663

6764
val jsonOptionT: OptionT[FutureUnlessShutdown, Json] =
@@ -87,50 +84,38 @@ class DbValidatorInternalStore private (
8784
OptionT(futureUnlessShutdownToFuture(resultOptionT.value))
8885
}
8986

90-
override def deleteConfig(key: String)(implicit
87+
override def deleteKey(key: String)(implicit
9188
tc: TraceContext
9289
): Future[Unit] = {
9390
logger.debug(
9491
s"Deleting config key $key"
9592
)
9693
storage
9794
.update(
98-
sqlu"delete from validator_internal_config WHERE config_key = $key AND store_id = $storeId",
95+
sqlu"delete from key_value_store WHERE key = $key AND store_id = $storeId",
9996
"delete config key",
10097
)
10198
.map(_.discard)
10299
}
103100
}
104101

105-
object DbValidatorInternalStore {
102+
object DbKeyValueStore {
106103

107104
def apply(
108-
participant: ParticipantId,
109-
validatorParty: PartyId,
105+
storeDescriptor: StoreDescriptor,
110106
storage: DbStorage,
111107
loggerFactory: NamedLoggerFactory,
112108
)(implicit
113109
ec: ExecutionContext,
114110
lc: ErrorLoggingContext,
115111
cc: CloseContext,
116112
tc: TraceContext,
117-
): Future[DbValidatorInternalStore] = {
113+
): Future[DbKeyValueStore] = {
118114

119115
StoreDescriptorStore
120-
.getStoreIdForDescriptor(
121-
StoreDescriptor(
122-
version = 2,
123-
name = "DbValidatorInternalConfigStore",
124-
party = validatorParty,
125-
participant = participant,
126-
key = Map(
127-
"validatorParty" -> validatorParty.toProtoPrimitive
128-
),
129-
),
130-
storage,
131-
)
116+
.getStoreIdForDescriptor(storeDescriptor, storage)
132117
.map(storeId => {
133-
new DbValidatorInternalStore(
118+
new DbKeyValueStore(
134119
storage,
135120
storeId,
136121
loggerFactory,
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.store
5+
6+
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
7+
import com.digitalasset.canton.resource.DbStorage
8+
import com.digitalasset.canton.tracing.TraceContext
9+
import com.digitalasset.canton.HasExecutionContext
10+
import org.lfdecentralizedtrust.splice.store.db.{SplicePostgresTest, StoreDescriptor}
11+
import org.scalatest.matchers.should.Matchers
12+
13+
class KeyValueStoreTest
14+
extends StoreTest
15+
with Matchers
16+
with HasExecutionContext
17+
with SplicePostgresTest {
18+
private val storeDescriptor =
19+
StoreDescriptor(
20+
version = 1,
21+
name = "DbMultiDomainAcsStoreTest",
22+
party = dsoParty,
23+
participant = mkParticipantId("participant"),
24+
key = Map(),
25+
)
26+
private val storeDescriptor2 =
27+
StoreDescriptor(
28+
version = 2,
29+
name = "DbMultiDomainAcsStoreTest",
30+
party = dsoParty,
31+
participant = mkParticipantId("participant"),
32+
key = Map(),
33+
)
34+
35+
private def mkStore(descriptor: StoreDescriptor) = KeyValueStore(
36+
descriptor,
37+
storage,
38+
loggerFactory,
39+
)
40+
41+
"KeyValueStore" should {
42+
43+
val configKey = "key1"
44+
val configValue = "jsonPayload1"
45+
val otherKey = "key2"
46+
val otherValue = "jsonPayload2"
47+
48+
"set and get a payload successfully" in {
49+
for {
50+
store <- mkStore(storeDescriptor)
51+
_ <- store.setValue(configKey, configValue)
52+
retrievedValue <- store.getValue[String](configKey).value
53+
} yield {
54+
retrievedValue.value.value shouldBe configValue
55+
}
56+
}
57+
58+
"return None for a non-existent key" in {
59+
for {
60+
store <- mkStore(storeDescriptor)
61+
retrievedValue <- store.getValue[String]("non-existent-key").value
62+
} yield {
63+
retrievedValue shouldBe None
64+
}
65+
}
66+
67+
"update an existing payload" in {
68+
for {
69+
store <- mkStore(storeDescriptor)
70+
_ <- store.setValue(configKey, configValue)
71+
_ <- store.setValue(configKey, otherValue)
72+
retrievedValue <- store.getValue[String](configKey).value
73+
} yield {
74+
retrievedValue.value.value shouldBe otherValue
75+
}
76+
}
77+
78+
"handle multiple different keys independently" in {
79+
for {
80+
store <- mkStore(storeDescriptor)
81+
_ <- store.setValue(configKey, configValue)
82+
_ <- store.setValue(otherKey, otherValue)
83+
84+
configKeyValue <- store.getValue[String](configKey).value
85+
otherKeyValue <- store.getValue[String](otherKey).value
86+
} yield {
87+
configKeyValue.value.value shouldBe configValue
88+
otherKeyValue.value.value shouldBe otherValue
89+
}
90+
}
91+
92+
"delete single key" in {
93+
for {
94+
store <- mkStore(storeDescriptor)
95+
_ <- store.setValue(configKey, configValue)
96+
_ <- store.setValue(otherKey, otherValue)
97+
98+
_ <- store.deleteKey(configKey)
99+
configKeyValue <- store.getValue[String](configKey).value
100+
otherKeyValue <- store.getValue[String](otherKey).value
101+
} yield {
102+
configKeyValue shouldBe None
103+
otherKeyValue.value.value shouldBe otherValue
104+
}
105+
}
106+
107+
"different storeDescriptors should not interfere" in {
108+
for {
109+
store1 <- mkStore(storeDescriptor)
110+
store2 <- mkStore(storeDescriptor2)
111+
_ <- store1.setValue("key", "value1")
112+
_ <- store2.setValue("key", "value2")
113+
_ <- store2.setValue("key2", "something")
114+
115+
k1s1 <- store1.getValue[String]("key").value
116+
k2s1 <- store1.getValue[String]("key2").value
117+
118+
k1s2 <- store2.getValue[String]("key").value
119+
k2s2 <- store2.getValue[String]("key2").value
120+
} yield {
121+
k1s1.value.value shouldBe "value1"
122+
k2s1 shouldBe None
123+
k1s2.value.value shouldBe "value2"
124+
k2s2.value.value shouldBe "something"
125+
}
126+
}
127+
}
128+
129+
override protected def cleanDb(
130+
storage: DbStorage
131+
)(implicit traceContext: TraceContext): FutureUnlessShutdown[?] =
132+
resetAllAppTables(storage)
133+
}

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/SpliceDbTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ trait SpliceDbTest extends DbTest with BeforeAndAfterAll { this: Suite =>
9292
acs_snapshot,
9393
scan_verdict_store,
9494
scan_verdict_transaction_view_store,
95-
validator_internal_config
95+
key_value_store
9696
RESTART IDENTITY CASCADE""".asUpdate
9797
_ <- debugPrintPgActivity()
9898
} yield (),

0 commit comments

Comments
 (0)