Skip to content

Commit 6feff47

Browse files
authored
[ci] Parse offset checkpoint in get update tree response (#1053)
1 parent 3fce9d9 commit 6feff47

File tree

3 files changed

+52
-15
lines changed

3 files changed

+52
-15
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ledger/api/LedgerClient.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,16 @@ import com.daml.ledger.api.v2.admin.party_management_service.{
1919
}
2020
import com.daml.ledger.api.v2.interactive.interactive_submission_service.InteractiveSubmissionServiceGrpc
2121
import com.daml.ledger.api.v2.command_service.CommandServiceGrpc
22+
import com.daml.ledger.api.v2.offset_checkpoint.OffsetCheckpoint.toJavaProto
2223
import com.daml.ledger.api.v2.package_reference.PackageReference
2324
import com.daml.ledger.api.v2.package_service.{ListPackagesRequest, PackageServiceGrpc}
24-
import com.daml.ledger.javaapi.data.{Command, CreateUserResponse, ListUserRightsResponse, User}
25+
import com.daml.ledger.javaapi.data.{
26+
Command,
27+
CreateUserResponse,
28+
ListUserRightsResponse,
29+
OffsetCheckpoint,
30+
User,
31+
}
2532
import com.daml.ledger.javaapi.data.codegen.ContractId
2633
import com.daml.ledger.javaapi.data.User.Right
2734
import org.lfdecentralizedtrust.splice.auth.AuthToken
@@ -819,7 +826,14 @@ object LedgerClient {
819826
)
820827
)
821828

822-
case TU.OffsetCheckpoint(_) => None
829+
case TU.OffsetCheckpoint(offset) =>
830+
Some(
831+
GetTreeUpdatesResponse(
832+
TreeUpdateOrOffsetCheckpoint.Checkpoint(
833+
OffsetCheckpoint.fromProto(toJavaProto(offset))
834+
)
835+
)
836+
)
823837

824838
case TU.Empty => sys.error("uninitialized update service result (update)")
825839
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,12 +321,25 @@ class UpdateHistory(
321321
)
322322
case Some(lastIngestedOffset) =>
323323
if (offset <= lastIngestedOffset) {
324-
logger.warn(
325-
s"Update offset $offset <= last ingested offset $lastIngestedOffset for ${description()}, skipping database actions. " +
326-
"This is expected if the SQL query was automatically retried after a transient database error. " +
327-
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
328-
"ingesting into the same logical database."
329-
)
324+
updateOrCheckpoint match {
325+
case _: TreeUpdateOrOffsetCheckpoint.Update =>
326+
logger.warn(
327+
s"Update offset $offset <= last ingested offset $lastIngestedOffset for ${description()}, skipping database actions. " +
328+
"This is expected if the SQL query was automatically retried after a transient database error. " +
329+
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
330+
"ingesting into the same logical database."
331+
)
332+
case _: TreeUpdateOrOffsetCheckpoint.Checkpoint =>
333+
// we can receive an offset equal to the last ingested and that can be safely ignore
334+
if (offset < lastIngestedOffset) {
335+
logger.warn(
336+
s"Checkpoint offset $offset < last ingested offset $lastIngestedOffset for ${description()}, skipping database actions. " +
337+
"This is expected if the SQL query was automatically retried after a transient database error. " +
338+
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
339+
"ingesting into the same logical database."
340+
)
341+
}
342+
}
330343
DBIO.successful(())
331344
} else {
332345
logger.debug(

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ final class DbMultiDomainAcsStore[TXE](
995995
private def ingestUpdateAtOffset[E <: Effect](
996996
offset: Long,
997997
action: DBIOAction[?, NoStream, Effect.Read & Effect.Write],
998+
isOffsetCheckpoint: Boolean = false,
998999
)(implicit
9991000
tc: TraceContext
10001001
): DBIOAction[Unit, NoStream, Effect.Read & Effect.Write & Effect.Transactional] = {
@@ -1004,12 +1005,18 @@ final class DbMultiDomainAcsStore[TXE](
10041005
action.andThen(updateOffset(offset))
10051006
case Some(lastIngestedOffset) =>
10061007
if (offset <= lastIngestedOffset) {
1007-
logger.warn(
1008-
s"Update offset $offset <= last ingested offset $lastIngestedOffset for DbMultiDomainAcsStore(storeId=$acsStoreId), skipping database actions. " +
1009-
"This is expected if the SQL query was automatically retried after a transient database error. " +
1010-
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
1011-
"ingesting into the same logical database."
1012-
)
1008+
/* we can receive an offset equal to the last ingested and that can be safely ignore */
1009+
if (isOffsetCheckpoint) {
1010+
if (offset < lastIngestedOffset) {
1011+
logger.warn(
1012+
s"Checkpoint offset $offset < last ingested offset $lastIngestedOffset for DbMultiDomainAcsStore(storeId=$acsStoreId), skipping database actions. This is expected if the SQL query was automatically retried after a transient database error. Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances ingesting into the same logical database."
1013+
)
1014+
}
1015+
} else {
1016+
logger.warn(
1017+
s"Update offset $offset <= last ingested offset $lastIngestedOffset for DbMultiDomainAcsStore(storeId=$acsStoreId), skipping database actions. This is expected if the SQL query was automatically retried after a transient database error. Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances ingesting into the same logical database."
1018+
)
1019+
}
10131020
DBIO.successful(())
10141021
} else {
10151022
action.andThen(updateOffset(offset))
@@ -1183,7 +1190,10 @@ final class DbMultiDomainAcsStore[TXE](
11831190
case TreeUpdateOrOffsetCheckpoint.Checkpoint(checkpoint) =>
11841191
val offset = checkpoint.getOffset
11851192
storage
1186-
.queryAndUpdate(ingestUpdateAtOffset(offset, DBIO.unit), "ingestOffsetCheckpoint")
1193+
.queryAndUpdate(
1194+
ingestUpdateAtOffset(offset, DBIO.unit, isOffsetCheckpoint = true),
1195+
"ingestOffsetCheckpoint",
1196+
)
11871197
.map { _ =>
11881198
state
11891199
.getAndUpdate(s => s.withUpdate(s.acsSize, offset))

0 commit comments

Comments
 (0)