Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ import com.daml.ledger.api.v2.admin.party_management_service.{
}
import com.daml.ledger.api.v2.interactive.interactive_submission_service.InteractiveSubmissionServiceGrpc
import com.daml.ledger.api.v2.command_service.CommandServiceGrpc
import com.daml.ledger.api.v2.offset_checkpoint.OffsetCheckpoint.toJavaProto
import com.daml.ledger.api.v2.package_reference.PackageReference
import com.daml.ledger.api.v2.package_service.{ListPackagesRequest, PackageServiceGrpc}
import com.daml.ledger.javaapi.data.{Command, CreateUserResponse, ListUserRightsResponse, User}
import com.daml.ledger.javaapi.data.{
Command,
CreateUserResponse,
ListUserRightsResponse,
OffsetCheckpoint,
User,
}
import com.daml.ledger.javaapi.data.codegen.ContractId
import com.daml.ledger.javaapi.data.User.Right
import org.lfdecentralizedtrust.splice.auth.AuthToken
Expand Down Expand Up @@ -819,7 +826,14 @@ object LedgerClient {
)
)

case TU.OffsetCheckpoint(_) => None
case TU.OffsetCheckpoint(offset) =>
Some(
GetTreeUpdatesResponse(
TreeUpdateOrOffsetCheckpoint.Checkpoint(
OffsetCheckpoint.fromProto(toJavaProto(offset))
)
)
)

case TU.Empty => sys.error("uninitialized update service result (update)")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,25 @@ class UpdateHistory(
)
case Some(lastIngestedOffset) =>
if (offset <= lastIngestedOffset) {
logger.warn(
s"Update offset $offset <= last ingested offset $lastIngestedOffset for ${description()}, skipping database actions. " +
"This is expected if the SQL query was automatically retried after a transient database error. " +
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
"ingesting into the same logical database."
)
updateOrCheckpoint match {
case _: TreeUpdateOrOffsetCheckpoint.Update =>
logger.warn(
s"Update offset $offset <= last ingested offset $lastIngestedOffset for ${description()}, skipping database actions. " +
"This is expected if the SQL query was automatically retried after a transient database error. " +
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
"ingesting into the same logical database."
)
case _: TreeUpdateOrOffsetCheckpoint.Checkpoint =>
// we can receive an offset equal to the last ingested and that can be safely ignore
if (offset < lastIngestedOffset) {
logger.warn(
s"Checkpoint offset $offset < last ingested offset $lastIngestedOffset for ${description()}, skipping database actions. " +
"This is expected if the SQL query was automatically retried after a transient database error. " +
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
"ingesting into the same logical database."
)
}
}
DBIO.successful(())
} else {
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ final class DbMultiDomainAcsStore[TXE](
private def ingestUpdateAtOffset[E <: Effect](
offset: Long,
action: DBIOAction[?, NoStream, Effect.Read & Effect.Write],
isOffsetCheckpoint: Boolean = false,
)(implicit
tc: TraceContext
): DBIOAction[Unit, NoStream, Effect.Read & Effect.Write & Effect.Transactional] = {
Expand All @@ -1004,12 +1005,18 @@ final class DbMultiDomainAcsStore[TXE](
action.andThen(updateOffset(offset))
case Some(lastIngestedOffset) =>
if (offset <= lastIngestedOffset) {
logger.warn(
s"Update offset $offset <= last ingested offset $lastIngestedOffset for DbMultiDomainAcsStore(storeId=$acsStoreId), skipping database actions. " +
"This is expected if the SQL query was automatically retried after a transient database error. " +
"Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances " +
"ingesting into the same logical database."
)
/* we can receive an offset equal to the last ingested and that can be safely ignore */
if (isOffsetCheckpoint) {
if (offset < lastIngestedOffset) {
logger.warn(
s"Checkpoint offset $offset < last ingested offset $lastIngestedOffset for DbMultiDomainAcsStore(storeId=$acsStoreId), skipping database actions. This is expected if the SQL query was automatically retried after a transient database error. Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances ingesting into the same logical database."
)
}
} else {
logger.warn(
s"Update offset $offset <= last ingested offset $lastIngestedOffset for DbMultiDomainAcsStore(storeId=$acsStoreId), skipping database actions. This is expected if the SQL query was automatically retried after a transient database error. Otherwise, this is unexpected and most likely caused by two identical UpdateIngestionService instances ingesting into the same logical database."
)
}
DBIO.successful(())
} else {
action.andThen(updateOffset(offset))
Expand Down Expand Up @@ -1183,7 +1190,10 @@ final class DbMultiDomainAcsStore[TXE](
case TreeUpdateOrOffsetCheckpoint.Checkpoint(checkpoint) =>
val offset = checkpoint.getOffset
storage
.queryAndUpdate(ingestUpdateAtOffset(offset, DBIO.unit), "ingestOffsetCheckpoint")
.queryAndUpdate(
ingestUpdateAtOffset(offset, DBIO.unit, isOffsetCheckpoint = true),
"ingestOffsetCheckpoint",
)
.map { _ =>
state
.getAndUpdate(s => s.withUpdate(s.acsSize, offset))
Expand Down