Skip to content

Commit e040089

Browse files
authored
support querying next snapshot after a timestamp (#3552)
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent 733f29e commit e040089

File tree

7 files changed

+183
-20
lines changed

7 files changed

+183
-20
lines changed

apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/ScanAppReference.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,16 @@ abstract class ScanAppReference(
394394
)
395395
}
396396

397+
def getDateOfFirstSnapshotAfter(after: CantonTimestamp, migrationId: Long) =
398+
consoleEnvironment.run {
399+
httpCommand(
400+
HttpScanAppClient.GetDateOfFirstSnapshotAfter(
401+
after.toInstant.atOffset(java.time.ZoneOffset.UTC),
402+
migrationId,
403+
)
404+
)
405+
}
406+
397407
def getAcsSnapshotAt(
398408
at: CantonTimestamp,
399409
migrationId: Long,

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTimeBasedIntegrationTest.scala

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -446,11 +446,24 @@ class ScanTimeBasedIntegrationTest
446446
}
447447
}
448448

449-
val snapshotBefore = sv1ScanBackend.getDateOfMostRecentSnapshotBefore(
450-
getLedgerTime,
451-
migrationId,
449+
val startTime = getLedgerTime
450+
451+
advanceTime(
452+
java.time.Duration
453+
.ofHours(sv1ScanBackend.config.acsSnapshotPeriodHours.toLong)
454+
.plusSeconds(1L)
452455
)
453456

457+
val snapshot1 = eventually() {
458+
val snapshot1 = sv1ScanBackend.getDateOfMostRecentSnapshotBefore(
459+
getLedgerTime,
460+
migrationId,
461+
)
462+
snapshot1 should not be None
463+
snapshot1.value.toInstant shouldBe >(startTime.toInstant)
464+
snapshot1
465+
}
466+
454467
createAnsEntry(
455468
aliceAnsExternalClient,
456469
perTestCaseName("snapshot"),
@@ -469,10 +482,15 @@ class ScanTimeBasedIntegrationTest
469482
getLedgerTime,
470483
migrationId,
471484
)
472-
snapshotBefore should not(be(snapshotAfter))
485+
snapshot1 should not(be(snapshotAfter))
473486
snapshotAfter
474487
}
475488

489+
sv1ScanBackend.getDateOfFirstSnapshotAfter(startTime, 0).value shouldBe snapshot1.value
490+
sv1ScanBackend
491+
.getDateOfFirstSnapshotAfter(CantonTimestamp.tryFromInstant(snapshot1.value.toInstant), 0)
492+
.value shouldBe snapshotAfter.value
493+
476494
val snapshotAfterData = sv1ScanBackend.getAcsSnapshotAt(
477495
CantonTimestamp.assertFromInstant(snapshotAfter.value.toInstant),
478496
migrationId,

apps/scan/src/main/openapi/scan.yaml

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,45 @@ paths:
445445
"500":
446446
$ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/500"
447447

448+
/v0/state/acs/snapshot-timestamp-after:
449+
get:
450+
tags: [external, scan]
451+
x-jvm-package: scan
452+
operationId: "getDateOfFirstSnapshotAfter"
453+
description: |
454+
Returns the timestamp of the first snapshot after the given date, for the given migration_id or larger.
455+
parameters:
456+
- name: "after"
457+
in: "query"
458+
required: true
459+
schema:
460+
type: string
461+
format: date-time
462+
description: |
463+
The endpoint will return the record time of the first snapshot after this parameter.
464+
- name: "migration_id"
465+
in: "query"
466+
required: true
467+
schema:
468+
type: integer
469+
format: int64
470+
description: |
471+
The endpoint will return the record time of the first snapshot for this migration id or larger.
472+
responses:
473+
"200":
474+
description: ok
475+
content:
476+
application/json:
477+
schema:
478+
$ref: "#/components/schemas/AcsSnapshotTimestampResponse"
479+
"400":
480+
$ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/400"
481+
"404":
482+
$ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/404"
483+
"500":
484+
$ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/500"
485+
486+
448487
/v0/state/acs:
449488
post:
450489
tags: [external, scan]
@@ -1180,12 +1219,12 @@ paths:
11801219
tags: [deprecated, scan]
11811220
x-jvm-package: scan
11821221
operationId: "getTotalAmuletBalance"
1183-
description: |
1184-
**Deprecated**, use /registry/metadata/v1/instruments/Amulet token standard metadata API endpoint, see
1222+
description: |
1223+
**Deprecated**, use /registry/metadata/v1/instruments/Amulet token standard metadata API endpoint, see
11851224
https://docs.sync.global/app_dev/token_standard/openapi/token_metadata.html.
1186-
1187-
**This endpoint will be removed in a future release**
1188-
1225+
1226+
**This endpoint will be removed in a future release**
1227+
11891228
Get the total balance of Amulet in the network.
11901229
parameters:
11911230
- in: query
@@ -1211,10 +1250,10 @@ paths:
12111250
x-jvm-package: scan
12121251
operationId: "getWalletBalance"
12131252
description: |
1214-
**Deprecated**, use /v0/holdings/summary with /v0/state/acs/snapshot-timestamp instead.
1215-
1253+
**Deprecated**, use /v0/holdings/summary with /v0/state/acs/snapshot-timestamp instead.
1254+
12161255
**This endpoint will be removed in a future release**
1217-
1256+
12181257
Get the Amulet balance for a specific party at the end of a closed round.
12191258
parameters:
12201259
- in: query
@@ -2629,7 +2668,7 @@ components:
26292668
type: string
26302669
format: date-time
26312670
description: |
2632-
The timestamp at which the contract set was active.
2671+
The timestamp at which the contract set was active.
26332672
This needs to be an exact timestamp, i.e.,
26342673
needs to correspond to a timestamp reported by `/v0/state/acs/snapshot-timestamp` if `record_time_match` is set to `exact` (which is the default).
26352674
If `record_time_match` is set to `at_or_before`, this can be any timestamp, and the most recent snapshot at or before the given `record_time` will be returned.
@@ -2684,7 +2723,7 @@ components:
26842723
type: string
26852724
format: date-time
26862725
description: |
2687-
The timestamp at which the contract set was active.
2726+
The timestamp at which the contract set was active.
26882727
This needs to be an exact timestamp, i.e.,
26892728
needs to correspond to a timestamp reported by `/v0/state/acs/snapshot-timestamp` if `record_time_match` is set to `exact` (which is the default).
26902729
If `record_time_match` is set to `at_or_before`, this can be any timestamp, and the most recent snapshot at or before the given `record_time` will be returned.
@@ -2731,7 +2770,7 @@ components:
27312770
type: string
27322771
format: date-time
27332772
description: |
2734-
The timestamp at which the contract set was active.
2773+
The timestamp at which the contract set was active.
27352774
This needs to be an exact timestamp, i.e.,
27362775
needs to correspond to a timestamp reported by `/v0/state/acs/snapshot-timestamp` if `record_time_match` is set to `exact` (which is the default).
27372776
If `record_time_match` is set to `at_or_before`, this can be any timestamp, and the most recent snapshot at or before the given `record_time` will be returned.

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/commands/HttpScanAppClient.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.lfdecentralizedtrust.tokenstandard.{
3838
import org.lfdecentralizedtrust.splice.http.v0.scan.{
3939
ForceAcsSnapshotNowResponse,
4040
GetDateOfMostRecentSnapshotBeforeResponse,
41+
GetDateOfFirstSnapshotAfterResponse,
4142
}
4243
import org.lfdecentralizedtrust.splice.scan.admin.http.{
4344
CompactJsonScanHttpEncodings,
@@ -1016,6 +1017,32 @@ object HttpScanAppClient {
10161017
}
10171018
}
10181019

1020+
case class GetDateOfFirstSnapshotAfter(
1021+
after: java.time.OffsetDateTime,
1022+
migrationId: Long,
1023+
) extends InternalBaseCommand[
1024+
http.GetDateOfFirstSnapshotAfterResponse,
1025+
Option[java.time.OffsetDateTime],
1026+
] {
1027+
override def submitRequest(
1028+
client: ScanClient,
1029+
headers: List[HttpHeader],
1030+
): EitherT[Future, Either[Throwable, HttpResponse], GetDateOfFirstSnapshotAfterResponse] =
1031+
client.getDateOfFirstSnapshotAfter(after, migrationId, headers)
1032+
1033+
override protected def handleOk()(implicit
1034+
decoder: TemplateJsonDecoder
1035+
): PartialFunction[GetDateOfFirstSnapshotAfterResponse, Either[
1036+
String,
1037+
Option[java.time.OffsetDateTime],
1038+
]] = {
1039+
case http.GetDateOfFirstSnapshotAfterResponse.OK(value) =>
1040+
Right(Some(value.recordTime))
1041+
case http.GetDateOfFirstSnapshotAfterResponse.NotFound(_) =>
1042+
Right(None)
1043+
}
1044+
}
1045+
10191046
case class GetAcsSnapshotAt(
10201047
at: java.time.OffsetDateTime,
10211048
migrationId: Long,

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,6 +1428,31 @@ class HttpScanHandler(
14281428
}
14291429
}
14301430

1431+
override def getDateOfFirstSnapshotAfter(
1432+
respond: ScanResource.GetDateOfFirstSnapshotAfterResponse.type
1433+
)(after: OffsetDateTime, migrationId: Long)(
1434+
extracted: TraceContext
1435+
): Future[ScanResource.GetDateOfFirstSnapshotAfterResponse] = {
1436+
implicit val tc: TraceContext = extracted
1437+
withSpan(s"$workflowId.getDateOfFirstSnapshotAfter") { _ => _ =>
1438+
snapshotStore
1439+
.lookupSnapshotAfter(migrationId, Codec.tryDecode(Codec.OffsetDateTime)(after))
1440+
.map {
1441+
case Some(snapshot) =>
1442+
ScanResource.GetDateOfFirstSnapshotAfterResponseOK(
1443+
definitions
1444+
.AcsSnapshotTimestampResponse(
1445+
Codec.encode(snapshot.snapshotRecordTime)
1446+
)
1447+
)
1448+
case None =>
1449+
ScanResource.GetDateOfFirstSnapshotAfterResponseNotFound(
1450+
definitions.ErrorResponse(s"No snapshots found after $after")
1451+
)
1452+
}
1453+
}
1454+
}
1455+
14311456
override def forceAcsSnapshotNow(
14321457
respond: ScanResource.ForceAcsSnapshotNowResponse.type
14331458
)()(extracted: TraceContext): Future[ScanResource.ForceAcsSnapshotNowResponse] = {

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,34 @@ class AcsSnapshotStore(
6868
.value
6969
}
7070

71+
def lookupSnapshotAfter(
72+
migrationId: Long,
73+
after: CantonTimestamp,
74+
)(implicit tc: TraceContext): Future[Option[AcsSnapshot]] = {
75+
76+
val select =
77+
sql"select snapshot_record_time, migration_id, history_id, first_row_id, last_row_id, unlocked_amulet_balance, locked_amulet_balance "
78+
val orderLimit = sql" order by snapshot_record_time asc limit 1 "
79+
val sameMig = select ++ sql""" from acs_snapshot
80+
where snapshot_record_time > $after
81+
and migration_id = $migrationId
82+
and history_id = $historyId """ ++ orderLimit
83+
val largerMig = select ++ sql""" from acs_snapshot
84+
where migration_id > $migrationId
85+
and history_id = $historyId """ ++ orderLimit
86+
87+
val query =
88+
sql"select * from ((" ++ sameMig ++ sql") union all (" ++ largerMig ++ sql")) all_queries order by snapshot_record_time asc limit 1"
89+
90+
storage
91+
.querySingle(
92+
query.toActionBuilder.as[AcsSnapshot].headOption,
93+
"lookupSnapshotAfter",
94+
)
95+
.value
96+
97+
}
98+
7199
def insertNewSnapshot(
72100
lastSnapshot: Option[AcsSnapshot],
73101
migrationId: Long,

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,15 @@ class AcsSnapshotStoreTest
5858
for {
5959
updateHistory <- mkUpdateHistory()
6060
store = mkStore(updateHistory)
61-
result <- store.lookupSnapshotAtOrBefore(DefaultMigrationId, CantonTimestamp.MaxValue)
62-
} yield result should be(None)
61+
resultBefore <- store.lookupSnapshotAtOrBefore(
62+
DefaultMigrationId,
63+
CantonTimestamp.MaxValue,
64+
)
65+
resultAfter <- store.lookupSnapshotAfter(DefaultMigrationId, CantonTimestamp.MinValue)
66+
} yield {
67+
resultBefore should be(None)
68+
resultAfter should be(None)
69+
}
6370
}
6471

6572
"only return the last snapshot of the passed migration id" in {
@@ -95,7 +102,7 @@ class AcsSnapshotStoreTest
95102
} yield result should be(None)
96103
}
97104

98-
"return the latest snapshot before the given timestamp" in {
105+
"return correct snapshots before and after given timestamps" in {
99106
for {
100107
updateHistory <- mkUpdateHistory()
101108
store = mkStore(updateHistory)
@@ -109,8 +116,17 @@ class AcsSnapshotStoreTest
109116
snapshot <- store.insertNewSnapshot(None, DefaultMigrationId, timestamp)
110117
} yield snapshot
111118
}
112-
result <- store.lookupSnapshotAtOrBefore(DefaultMigrationId, timestamp4)
113-
} yield result.map(_.snapshotRecordTime) should be(Some(timestamp3))
119+
resultBefore4 <- store.lookupSnapshotAtOrBefore(DefaultMigrationId, timestamp4)
120+
firstResult <- store.lookupSnapshotAfter(DefaultMigrationId, CantonTimestamp.MinValue)
121+
secondResult <- store.lookupSnapshotAfter(
122+
DefaultMigrationId,
123+
firstResult.value.snapshotRecordTime,
124+
)
125+
} yield {
126+
resultBefore4.map(_.snapshotRecordTime) should be(Some(timestamp3))
127+
firstResult.map(_.snapshotRecordTime) should be(Some(timestamp1))
128+
secondResult.map(_.snapshotRecordTime) should be(Some(timestamp2))
129+
}
114130
}
115131

116132
}

0 commit comments

Comments
 (0)