Skip to content

Commit c3de43e

Browse files
Fix getHoldingsSummary returning bad counts when too many contracts (#1072)
Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent 9d1f7e0 commit c3de43e

File tree

6 files changed

+106
-19
lines changed

6 files changed

+106
-19
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/Limit.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,28 @@ trait LimitHelpers { _: NamedLogging =>
8989
}
9090
}
9191

92+
protected final def applyLimitOrFail[CC[_], C](
93+
name: String,
94+
limit: Limit,
95+
result: C & scala.collection.IterableOps[?, CC, C],
96+
): C = {
97+
limit match {
98+
case PageLimit(limit) =>
99+
result.take(limit.intValue())
100+
case HardLimit(limit) =>
101+
val resultSize = result.size
102+
if (resultSize > limit) {
103+
throw io.grpc.Status.FAILED_PRECONDITION
104+
.withDescription(
105+
s"Size of the result exceeded the limit in $name. Result size: ${resultSize.toLong}. Limit: ${limit.toLong}"
106+
)
107+
.asRuntimeException()
108+
} else {
109+
result
110+
}
111+
}
112+
}
113+
92114
protected def sqlLimit(limit: Limit): Int = {
93115
limit match {
94116
case HardLimit(limit) => limit + 1

apps/scan/src/main/openapi/scan.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2619,6 +2619,7 @@ components:
26192619
type: array
26202620
items:
26212621
type: string
2622+
minItems: 1
26222623
description: |
26232624
Filters by contracts in which these party_ids are the owners of the amulets.
26242625
@@ -2644,6 +2645,7 @@ components:
26442645
type: array
26452646
items:
26462647
type: string
2648+
minItems: 1
26472649
description: |
26482650
The owners for which to compute the summary.
26492651
as_of_round:

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
package org.lfdecentralizedtrust.splice.scan.admin.http
55

6-
import cats.data.OptionT
6+
import cats.data.{NonEmptyVector, OptionT}
77
import cats.implicits.toTraverseOps
88
import cats.syntax.either.*
99
import cats.syntax.traverseFilter.*
@@ -1361,7 +1361,7 @@ class HttpScanHandler(
13611361
CantonTimestamp.assertFromInstant(recordTime.toInstant),
13621362
after,
13631363
PageLimit.tryCreate(pageSize),
1364-
ownerPartyIds.map(PartyId.tryFromProtoPrimitive),
1364+
nonEmptyOrFail("ownerPartyIds", ownerPartyIds).map(PartyId.tryFromProtoPrimitive),
13651365
)
13661366
.map { result =>
13671367
ScanResource.GetHoldingsStateAtResponseOK(
@@ -1410,7 +1410,7 @@ class HttpScanHandler(
14101410
.getHoldingsSummary(
14111411
migrationId,
14121412
CantonTimestamp.assertFromInstant(recordTime.toInstant),
1413-
partyIds.map(PartyId.tryFromProtoPrimitive),
1413+
nonEmptyOrFail("partyIds", partyIds).map(PartyId.tryFromProtoPrimitive),
14141414
round,
14151415
)
14161416
} yield ScanResource.GetHoldingsSummaryAtResponse.OK(
@@ -1438,6 +1438,18 @@ class HttpScanHandler(
14381438
}
14391439
}
14401440

1441+
private def nonEmptyOrFail[A](fieldName: String, vec: Vector[A]): NonEmptyVector[A] = {
1442+
NonEmptyVector
1443+
.fromVector(vec)
1444+
.getOrElse(
1445+
throw io.grpc.Status.INVALID_ARGUMENT
1446+
.withDescription(
1447+
s"Expected '$fieldName' to contain at least one item, but contained none."
1448+
)
1449+
.asRuntimeException()
1450+
)
1451+
}
1452+
14411453
override def getAggregatedRounds(respond: ScanResource.GetAggregatedRoundsResponse.type)()(
14421454
extracted: com.digitalasset.canton.tracing.TraceContext
14431455
): Future[ScanResource.GetAggregatedRoundsResponse] = {

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package org.lfdecentralizedtrust.splice.scan.store
55

6+
import cats.data.NonEmptyVector
67
import com.daml.ledger.javaapi.data.CreatedEvent
78
import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.{Amulet, LockedAmulet}
89
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.{
@@ -240,7 +241,8 @@ class AcsSnapshotStore(
240241
"queryAcsSnapshot.getCreatedEvents",
241242
)
242243
} yield {
243-
val eventsInPage = applyLimit("queryAcsSnapshot", limit, events.map(_._2.toCreatedEvent))
244+
val eventsInPage =
245+
applyLimitOrFail("queryAcsSnapshot", limit, events.map(_._2.toCreatedEvent))
244246
val afterToken = if (eventsInPage.size == limit.limit) events.lastOption.map(_._1) else None
245247
QueryAcsSnapshotResult(
246248
migrationId = migrationId,
@@ -256,18 +258,19 @@ class AcsSnapshotStore(
256258
snapshot: CantonTimestamp,
257259
after: Option[Long],
258260
limit: Limit,
259-
partyIds: Seq[PartyId],
261+
partyIds: NonEmptyVector[PartyId],
260262
)(implicit tc: TraceContext): Future[QueryAcsSnapshotResult] = {
261263
this
262264
.queryAcsSnapshot(
263265
migrationId,
264266
snapshot,
265267
after,
266268
limit,
267-
partyIds,
269+
partyIds.toVector,
268270
AcsSnapshotStore.holdingsTemplates,
269271
)
270272
.map { result =>
273+
val partyIdsSet = partyIds.toVector.toSet
271274
QueryAcsSnapshotResult(
272275
result.migrationId,
273276
result.snapshotRecordTime,
@@ -277,8 +280,10 @@ class AcsSnapshotStore(
277280
.decodeHoldingContract(createdEvent.event)
278281
.fold(
279282
locked =>
280-
partyIds.contains(PartyId.tryFromProtoPrimitive(locked.payload.amulet.owner)),
281-
amulet => partyIds.contains(PartyId.tryFromProtoPrimitive(amulet.payload.owner)),
283+
partyIdsSet
284+
.contains(PartyId.tryFromProtoPrimitive(locked.payload.amulet.owner)),
285+
amulet =>
286+
partyIdsSet.contains(PartyId.tryFromProtoPrimitive(amulet.payload.owner)),
282287
)
283288
},
284289
result.afterToken,
@@ -289,15 +294,15 @@ class AcsSnapshotStore(
289294
def getHoldingsSummary(
290295
migrationId: Long,
291296
recordTime: CantonTimestamp,
292-
partyIds: Seq[PartyId],
297+
partyIds: NonEmptyVector[PartyId],
293298
asOfRound: Long,
294299
)(implicit tc: TraceContext): Future[AcsSnapshotStore.HoldingsSummaryResult] = {
295300
this
296301
.getHoldingsState(
297302
migrationId,
298303
recordTime,
299304
None,
300-
// assumption: the number of contracts is small enough that it will fit in memory
305+
// if the limit is exceeded by the results from the DB, an exception will be thrown
301306
HardLimit.tryCreate(Limit.MaxPageSize),
302307
partyIds,
303308
)

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
11
package org.lfdecentralizedtrust.splice.store.db
22

3+
import cats.data.NonEmptyVector
34
import com.daml.ledger.javaapi.data.Unit as damlUnit
45
import com.daml.ledger.javaapi.data.codegen.ContractId
56
import org.lfdecentralizedtrust.splice.environment.DarResources
67
import org.lfdecentralizedtrust.splice.environment.ledger.api.TransactionTreeUpdate
78
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
89
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
9-
import org.lfdecentralizedtrust.splice.store.{PageLimit, StoreErrors, StoreTest, UpdateHistory}
10+
import org.lfdecentralizedtrust.splice.store.{
11+
HardLimit,
12+
PageLimit,
13+
StoreErrors,
14+
StoreTest,
15+
UpdateHistory,
16+
}
1017
import org.lfdecentralizedtrust.splice.util.{Contract, HoldingsSummary, PackageQualifiedName}
1118
import com.digitalasset.canton.data.CantonTimestamp
1219
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
1320
import com.digitalasset.canton.resource.DbStorage
14-
import com.digitalasset.canton.topology.{SynchronizerId, PartyId}
21+
import com.digitalasset.canton.topology.{PartyId, SynchronizerId}
1522
import com.digitalasset.canton.tracing.TraceContext
1623
import com.digitalasset.canton.util.MonadUtil
1724
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
25+
import io.grpc.StatusRuntimeException
1826
import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement
1927
import org.scalatest.Succeeded
2028

@@ -490,14 +498,14 @@ class AcsSnapshotStoreTest
490498
timestamp1,
491499
None,
492500
PageLimit.tryCreate(10),
493-
Seq(dsoParty),
501+
NonEmptyVector.of(dsoParty),
494502
)
495503
resultWanteds <- store.getHoldingsState(
496504
DefaultMigrationId,
497505
timestamp1,
498506
None,
499507
PageLimit.tryCreate(10),
500-
Seq(wantedParty1, wantedParty2),
508+
NonEmptyVector.of(wantedParty1, wantedParty2),
501509
)
502510
} yield {
503511
resultDso.createdEventsInPage should be(empty)
@@ -526,14 +534,14 @@ class AcsSnapshotStoreTest
526534
timestamp1,
527535
None,
528536
PageLimit.tryCreate(10),
529-
Seq(owner),
537+
NonEmptyVector.of(owner),
530538
)
531539
resultHolder <- store.getHoldingsState(
532540
DefaultMigrationId,
533541
timestamp1,
534542
None,
535543
PageLimit.tryCreate(10),
536-
Seq(holder),
544+
NonEmptyVector.of(holder),
537545
)
538546
} yield {
539547
resultHolder.createdEventsInPage should be(empty)
@@ -543,6 +551,39 @@ class AcsSnapshotStoreTest
543551
}
544552
}
545553

554+
"fail if too many contracts were returned by HardLimit" in {
555+
val owner = providerParty(1)
556+
val holder = providerParty(2)
557+
val amulet1 = lockedAmulet(owner, 10, 1L, 0.5)
558+
val amulet2 = lockedAmulet(owner, 10, 2L, 0.5)
559+
(for {
560+
updateHistory <- mkUpdateHistory()
561+
store = mkStore(updateHistory)
562+
_ <- ingestCreate(
563+
updateHistory,
564+
amulet1,
565+
timestamp1.minusSeconds(10L),
566+
Seq(owner, holder),
567+
)
568+
_ <- ingestCreate(
569+
updateHistory,
570+
amulet2,
571+
timestamp1.minusSeconds(10L),
572+
Seq(owner, holder),
573+
)
574+
_ <- store.insertNewSnapshot(None, DefaultMigrationId, timestamp1)
575+
_result <- store.getHoldingsState(
576+
DefaultMigrationId,
577+
timestamp1,
578+
None,
579+
HardLimit.tryCreate(1),
580+
NonEmptyVector.of(owner),
581+
)
582+
} yield fail("should not get here, call should've failed")).failed.futureValue shouldBe a[
583+
StatusRuntimeException
584+
]
585+
}
586+
546587
}
547588

548589
"getHoldingsSummary" should {
@@ -581,19 +622,19 @@ class AcsSnapshotStoreTest
581622
summaryAtRound3 <- store.getHoldingsSummary(
582623
DefaultMigrationId,
583624
timestamp1,
584-
Seq(wantedParty1, wantedParty2),
625+
NonEmptyVector.of(wantedParty1, wantedParty2),
585626
asOfRound = 3L,
586627
)
587628
summaryAtRound10 <- store.getHoldingsSummary(
588629
DefaultMigrationId,
589630
timestamp1,
590-
Seq(wantedParty1, wantedParty2),
631+
NonEmptyVector.of(wantedParty1, wantedParty2),
591632
asOfRound = 10L,
592633
)
593634
summaryAtRound100 <- store.getHoldingsSummary(
594635
DefaultMigrationId,
595636
timestamp1,
596-
Seq(wantedParty1, wantedParty2),
637+
NonEmptyVector.of(wantedParty1, wantedParty2),
597638
asOfRound = 100L,
598639
)
599640
} yield {

docs/src/release_notes.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ Upcoming
1919
The `/v2/updates` endpoints no longer return the `offset` field in responses,
2020
and `events_by_id` are now lexicographically ordered by ID for conveniently viewing JSON results.
2121

22+
- Scan
23+
24+
- Fix a bug where the ``/v0/holdings/summary`` endpoint would return incomplete results when the requested parties had more than 1000 holdings.
25+
Additionally, that endpoint and ``/v0/holdings/state`` will now fail if an empty list of parties is provided.
26+
2227
0.4.1
2328
-----
2429

0 commit comments

Comments
 (0)