Skip to content

Commit c04209f

Browse files
authored
Support dumping updates to bulk storage (#3495)
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent 9473c1c commit c04209f

File tree

16 files changed

+541
-162
lines changed

16 files changed

+541
-162
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/performance/tests/StoreIngestionPerformanceTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ abstract class StoreIngestionPerformanceTest(
101101
dump.transactions.zipWithIndex.collect {
102102
// deliberately ignoring reassignments
103103
case (UpdateHistoryItemV2.members.UpdateHistoryTransactionV2(update), index) =>
104-
CompactJsonScanHttpEncodings.httpToLapiTransaction(update, index.toLong)
104+
CompactJsonScanHttpEncodings().httpToLapiTransaction(update, index.toLong)
105105
}
106106
}
107107

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ trait UpdateHistoryTestUtil extends TestCommon {
208208
),
209209
encoding = CompactJson,
210210
)
211-
.map(CompactJsonScanHttpEncodings.httpToLapiUpdate)
211+
.map(CompactJsonScanHttpEncodings().httpToLapiUpdate)
212212
.map(_.update)
213213
.map(dropTrailingNones)
214214

@@ -233,7 +233,7 @@ trait UpdateHistoryTestUtil extends TestCommon {
233233

234234
updatesFromScanApi.headOption.foreach(fromHistory => {
235235
val fromPointwiseLookup =
236-
CompactJsonScanHttpEncodings.httpToLapiUpdate(
236+
CompactJsonScanHttpEncodings().httpToLapiUpdate(
237237
scanClient.getUpdate(fromHistory.update.updateId, encoding = CompactJson)
238238
)
239239
fromPointwiseLookup.update shouldBe fromHistory

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,16 +1031,18 @@ class UpdateHistory(
10311031
private def afterFilters(
10321032
afterO: Option[(Long, CantonTimestamp)],
10331033
includeImportUpdates: Boolean,
1034+
afterIsInclusive: Boolean,
10341035
): NonEmptyList[SQLActionBuilder] = {
1035-
val gt = if (includeImportUpdates) ">=" else ">"
1036+
val gtMin = if (includeImportUpdates) ">=" else ">"
1037+
val gtAfter = if (afterIsInclusive) ">=" else ">"
10361038
afterO match {
10371039
case None =>
1038-
NonEmptyList.of(sql"migration_id >= 0 and record_time #$gt ${CantonTimestamp.MinValue}")
1040+
NonEmptyList.of(sql"migration_id >= 0 and record_time #$gtMin ${CantonTimestamp.MinValue}")
10391041
case Some((afterMigrationId, afterRecordTime)) =>
10401042
// This makes it so that the two queries use updt_hist_tran_hi_mi_rt_di,
10411043
NonEmptyList.of(
1042-
sql"migration_id = ${afterMigrationId} and record_time > ${afterRecordTime} ",
1043-
sql"migration_id > ${afterMigrationId} and record_time #$gt ${CantonTimestamp.MinValue}",
1044+
sql"migration_id = ${afterMigrationId} and record_time #$gtAfter ${afterRecordTime} ",
1045+
sql"migration_id > ${afterMigrationId} and record_time #$gtMin ${CantonTimestamp.MinValue}",
10441046
)
10451047
}
10461048
}
@@ -1075,7 +1077,7 @@ class UpdateHistory(
10751077
private def updatesQuery(
10761078
filters: NonEmptyList[SQLActionBuilder],
10771079
orderBy: SQLActionBuilder,
1078-
limit: PageLimit,
1080+
limit: Limit,
10791081
makeSubQuery: SQLActionBuilder => SQLActionBuilderChain,
10801082
) = {
10811083
if (filters.size == 1) {
@@ -1094,7 +1096,7 @@ class UpdateHistory(
10941096
private def getTxUpdates(
10951097
filters: NonEmptyList[SQLActionBuilder],
10961098
orderBy: SQLActionBuilder,
1097-
limit: PageLimit,
1099+
limit: Limit,
10981100
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
10991101
def makeSubQuery(afterFilter: SQLActionBuilder): SQLActionBuilderChain = {
11001102
sql"""
@@ -1141,7 +1143,7 @@ class UpdateHistory(
11411143
private def getAssignmentUpdates(
11421144
filters: NonEmptyList[SQLActionBuilder],
11431145
orderBy: SQLActionBuilder,
1144-
limit: PageLimit,
1146+
limit: Limit,
11451147
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
11461148

11471149
def makeSubQuery(afterFilter: SQLActionBuilder): SQLActionBuilderChain = {
@@ -1188,7 +1190,7 @@ class UpdateHistory(
11881190
private def getUnassignmentUpdates(
11891191
filters: NonEmptyList[SQLActionBuilder],
11901192
orderBy: SQLActionBuilder,
1191-
limit: PageLimit,
1193+
limit: Limit,
11921194
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
11931195

11941196
def makeSubQuery(afterFilter: SQLActionBuilder): SQLActionBuilderChain = {
@@ -1224,9 +1226,10 @@ class UpdateHistory(
12241226

12251227
def getUpdatesWithoutImportUpdates(
12261228
afterO: Option[(Long, CantonTimestamp)],
1227-
limit: PageLimit,
1229+
limit: Limit,
1230+
afterIsInclusive: Boolean,
12281231
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
1229-
val filters = afterFilters(afterO, includeImportUpdates = false)
1232+
val filters = afterFilters(afterO, includeImportUpdates = false, afterIsInclusive)
12301233
val orderBy = sql"migration_id, record_time, domain_id"
12311234
for {
12321235
txs <- getTxUpdates(filters, orderBy, limit)
@@ -1237,11 +1240,17 @@ class UpdateHistory(
12371240
}
12381241
}
12391242

1243+
def getUpdatesWithoutImportUpdates(
1244+
afterO: Option[(Long, CantonTimestamp)],
1245+
limit: Limit,
1246+
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] =
1247+
getUpdatesWithoutImportUpdates(afterO, limit, false)
1248+
12401249
def getAllUpdates(
12411250
afterO: Option[(Long, CantonTimestamp)],
12421251
limit: PageLimit,
12431252
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
1244-
val filters = afterFilters(afterO, includeImportUpdates = true)
1253+
val filters = afterFilters(afterO, includeImportUpdates = true, afterIsInclusive = false)
12451254
// With import updates, we have to include the update id to get a deterministic order.
12461255
// We don't have an index for this order, but this is only used in test code and deprecated scan endpoints.
12471256
val orderBy = sql"migration_id, record_time, domain_id, update_id"

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/StoreTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,7 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
908908
workflowId: String,
909909
recordTime: Instant = defaultEffectiveAt,
910910
createdEventObservers: Seq[PartyId] = Seq.empty,
911+
updateId: String = nextUpdateId(),
911912
): Transaction = mkCreateTxWithInterfaces(
912913
offset,
913914
createRequests.map(cr =>
@@ -919,6 +920,7 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
919920
workflowId,
920921
recordTime,
921922
createdEventObservers,
923+
updateId,
922924
)
923925

924926
protected def mkCreateTxWithInterfaces(
@@ -932,6 +934,7 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
932934
workflowId: String,
933935
recordTime: Instant = defaultEffectiveAt,
934936
createdEventObservers: Seq[PartyId] = Seq.empty,
937+
updateId: String = nextUpdateId(),
935938
): Transaction = mkTx(
936939
offset,
937940
createRequests.map[Event] { case (contract, implementedInterfaces, failedInterfaces) =>
@@ -947,6 +950,7 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
947950
effectiveAt,
948951
workflowId,
949952
recordTime = recordTime,
953+
updateId = updateId,
950954
)
951955

952956
protected def acs(
@@ -1276,8 +1280,8 @@ abstract class StoreTest extends AsyncWordSpec with BaseTest {
12761280
workflowId: String = "",
12771281
commandId: String = "",
12781282
recordTime: Instant = defaultEffectiveAt,
1283+
updateId: String = nextUpdateId(),
12791284
): Transaction = {
1280-
val updateId = nextUpdateId()
12811285
val eventsWithId = events.zipWithIndex.map { case (e, i) =>
12821286
withNodeId(e, i)
12831287
}

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/commands/HttpScanAppClient.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2201,7 +2201,7 @@ object HttpScanAppClient {
22012201
)
22022202
.setSynchronizerId(disclosedContract.synchronizerId)
22032203
.setTemplateId(
2204-
CompactJsonScanHttpEncodings.parseTemplateId(disclosedContract.templateId).toProto
2204+
CompactJsonScanHttpEncodings().parseTemplateId(disclosedContract.templateId).toProto
22052205
)
22062206
.build()
22072207
}
@@ -2219,7 +2219,7 @@ object HttpScanAppClient {
22192219
)
22202220
.setSynchronizerId(disclosedContract.synchronizerId)
22212221
.setTemplateId(
2222-
CompactJsonScanHttpEncodings.parseTemplateId(disclosedContract.templateId).toProto
2222+
CompactJsonScanHttpEncodings().parseTemplateId(disclosedContract.templateId).toProto
22232223
)
22242224
.build()
22252225
}
@@ -2237,7 +2237,7 @@ object HttpScanAppClient {
22372237
)
22382238
.setSynchronizerId(disclosedContract.synchronizerId)
22392239
.setTemplateId(
2240-
CompactJsonScanHttpEncodings.parseTemplateId(disclosedContract.templateId).toProto
2240+
CompactJsonScanHttpEncodings().parseTemplateId(disclosedContract.templateId).toProto
22412241
)
22422242
.build()
22432243
}

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,7 +1550,7 @@ class HttpScanHandler(
15501550
migrationId,
15511551
result.createdEventsInPage
15521552
.map(event =>
1553-
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(
1553+
CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(
15541554
event.eventId,
15551555
event.event,
15561556
)
@@ -1609,7 +1609,7 @@ class HttpScanHandler(
16091609
migrationId,
16101610
result.createdEventsInPage
16111611
.map(event =>
1612-
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(
1612+
CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(
16131613
event.eventId,
16141614
event.event,
16151615
)

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/ScanHttpEncodings.scala

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ sealed trait ScanHttpEncodings {
320320
httpToJavaExercisedEvent(nodesWithChildren, exercisedHttp)
321321
}
322322

323-
private def httpToJavaCreatedEvent(http: httpApi.CreatedEvent): javaApi.CreatedEvent = {
323+
def httpToJavaCreatedEvent(http: httpApi.CreatedEvent): javaApi.CreatedEvent = {
324324
val templateId = parseTemplateId(http.templateId)
325325
new javaApi.CreatedEvent(
326326
/*witnessParties = */ java.util.Collections.emptyList(),
@@ -514,7 +514,7 @@ object ScanHttpEncodings {
514514
ScanHttpEncodings.makeConsistentAcrossSvs(update)
515515
}
516516
val encodings: ScanHttpEncodings = encoding match {
517-
case definitions.DamlValueEncoding.members.CompactJson => CompactJsonScanHttpEncodings
517+
case definitions.DamlValueEncoding.members.CompactJson => CompactJsonScanHttpEncodings()
518518
case definitions.DamlValueEncoding.members.ProtobufJson => ProtobufJsonScanHttpEncodings
519519
}
520520
// v0 always returns the update ids as `#` prefixed,as that's the way they were encoded in canton. v1 returns it without the `#`
@@ -672,8 +672,41 @@ object ScanHttpEncodings {
672672
}
673673
}
674674

675-
// A lossy, but much easier to process, encoding. Should be used for all endpoints not used for backfilling Scan.
676-
case object CompactJsonScanHttpEncodings extends ScanHttpEncodings {
675+
object RemoveFieldLabels {
676+
def record(value: javaApi.DamlRecord): javaApi.DamlRecord = recordWithoutFieldLabels(value)
677+
def value(value: javaApi.Value): javaApi.Value = valueWithoutFieldLabels(value)
678+
679+
/** Recursively removes all field labels from a value.
680+
* ValueJsonCodecCodegen returns values with field labels, but we generally don't store field labels in databases.
681+
* The labels are removed to make values comparable.
682+
*/
683+
private def valueWithoutFieldLabels(value: javaApi.Value): javaApi.Value = {
684+
value match {
685+
case record: javaApi.DamlRecord => recordWithoutFieldLabels(record)
686+
case list: javaApi.DamlList => javaApi.DamlList.of(list.toList(valueWithoutFieldLabels))
687+
case tmap: javaApi.DamlTextMap => javaApi.DamlTextMap.of(tmap.toMap(valueWithoutFieldLabels))
688+
case gmap: javaApi.DamlGenMap =>
689+
javaApi.DamlGenMap.of(gmap.toMap(valueWithoutFieldLabels, valueWithoutFieldLabels))
690+
case opt: javaApi.DamlOptional =>
691+
javaApi.DamlOptional.of(opt.getValue.map(valueWithoutFieldLabels))
692+
case variant: javaApi.Variant =>
693+
new javaApi.Variant(variant.getConstructor, valueWithoutFieldLabels(variant.getValue))
694+
case _ => value
695+
}
696+
}
697+
private def recordWithoutFieldLabels(value: javaApi.DamlRecord): javaApi.DamlRecord = {
698+
val fields = value.getFields.asScala.toList
699+
val fieldsWithoutLabels = fields.map { f =>
700+
new javaApi.DamlRecord.Field(valueWithoutFieldLabels(f.getValue))
701+
}
702+
new javaApi.DamlRecord(fieldsWithoutLabels.asJava)
703+
}
704+
}
705+
706+
case class CompactJsonScanHttpEncodings(
707+
transformValue: javaApi.Value => javaApi.Value,
708+
transformRecord: javaApi.DamlRecord => javaApi.DamlRecord,
709+
) extends ScanHttpEncodings {
677710
import org.lfdecentralizedtrust.splice.util.ValueJsonCodecCodegen
678711
override def encodeContractPayload(
679712
event: javaApi.CreatedEvent
@@ -725,7 +758,7 @@ case object CompactJsonScanHttpEncodings extends ScanHttpEncodings {
725758
throw new RuntimeException(
726759
s"Failed to decode contract payload '${json.noSpaces}': $error"
727760
),
728-
withoutFieldLabels,
761+
transformRecord,
729762
)
730763

731764
override def decodeChoiceArgument(
@@ -741,7 +774,7 @@ case object CompactJsonScanHttpEncodings extends ScanHttpEncodings {
741774
throw new RuntimeException(
742775
s"Failed to decode choice argument '${json.noSpaces}': $error"
743776
),
744-
withoutFieldLabels,
777+
transformValue,
745778
)
746779

747780
override def decodeExerciseResult(
@@ -755,34 +788,13 @@ case object CompactJsonScanHttpEncodings extends ScanHttpEncodings {
755788
.fold(
756789
error =>
757790
throw new RuntimeException(s"Failed to decode choice result '${json.noSpaces}': $error"),
758-
withoutFieldLabels,
791+
transformValue,
759792
)
793+
}
760794

761-
/** Recursively removes all field labels from a value.
762-
* ValueJsonCodecCodegen returns values with field labels, but we generally don't store field labels in databases.
763-
* The labels are removed to make values comparable.
764-
*/
765-
private def withoutFieldLabels(value: javaApi.Value): javaApi.Value = {
766-
value match {
767-
case record: javaApi.DamlRecord => withoutFieldLabels(record)
768-
case list: javaApi.DamlList => javaApi.DamlList.of(list.toList(withoutFieldLabels))
769-
case tmap: javaApi.DamlTextMap => javaApi.DamlTextMap.of(tmap.toMap(withoutFieldLabels))
770-
case gmap: javaApi.DamlGenMap =>
771-
javaApi.DamlGenMap.of(gmap.toMap(withoutFieldLabels, withoutFieldLabels))
772-
case opt: javaApi.DamlOptional =>
773-
javaApi.DamlOptional.of(opt.getValue.map(withoutFieldLabels))
774-
case variant: javaApi.Variant =>
775-
new javaApi.Variant(variant.getConstructor, withoutFieldLabels(variant.getValue))
776-
case _ => value
777-
}
778-
}
779-
private def withoutFieldLabels(value: javaApi.DamlRecord): javaApi.DamlRecord = {
780-
val fields = value.getFields.asScala.toList
781-
val fieldsWithoutLabels = fields.map { f =>
782-
new javaApi.DamlRecord.Field(withoutFieldLabels(f.getValue))
783-
}
784-
new javaApi.DamlRecord(fieldsWithoutLabels.asJava)
785-
}
795+
// A lossy, but much easier to process, encoding. Should be used for all endpoints not used for backfilling Scan.
796+
object CompactJsonScanHttpEncodings {
797+
def apply() = new CompactJsonScanHttpEncodings(RemoveFieldLabels.value, RemoveFieldLabels.record)
786798
}
787799

788800
// A lossless, but harder to process, encoding. Should be used only for backfilling Scan.

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,6 @@ import scala.concurrent.duration.FiniteDuration
2727

2828
import Position.*
2929

30-
case class BulkStorageConfig(
31-
dbReadChunkSize: Int,
32-
maxFileSize: Long,
33-
)
34-
35-
object BulkStorageConfigs {
36-
val bulkStorageConfigV1 = BulkStorageConfig(
37-
1000,
38-
64L * 1024 * 1024,
39-
)
40-
}
41-
4230
object Position {
4331
sealed trait Position
4432

@@ -73,7 +61,7 @@ class AcsSnapshotBulkStorage(
7361
)
7462
} yield {
7563
val encoded = snapshot.createdEventsInPage.map(event =>
76-
CompactJsonScanHttpEncodings.javaToHttpCreatedEvent(event.eventId, event.event)
64+
CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(event.eventId, event.event)
7765
)
7866
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
7967
val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8))
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.scan.store.bulk
5+
6+
case class BulkStorageConfig(
7+
dbReadChunkSize: Int,
8+
maxFileSize: Long,
9+
)
10+
11+
object BulkStorageConfigs {
12+
val bulkStorageConfigV1 = BulkStorageConfig(
13+
1000,
14+
64L * 1024 * 1024,
15+
)
16+
}

0 commit comments

Comments
 (0)