Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.store.db

/** Registry for advisory lock identifiers used in splice applications.
*/
object AdvisoryLockIds {
// 0x73706c equals ASCII encoded "spl". Modeled after Canton's HaConfig, which uses ASCII "dml".
private val base: Long = 0x73706c00

final val acsSnapshotDataInsert: Long = base + 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.{
}
import org.lfdecentralizedtrust.splice.store.UpdateHistory.SelectFromCreateEvents
import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, LimitHelpers, UpdateHistory}
import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsQueries}
import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsQueries, AdvisoryLockIds}
import org.lfdecentralizedtrust.splice.util.{Contract, HoldingsSummary, PackageQualifiedName}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
Expand All @@ -25,7 +25,7 @@ import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLAc
import com.digitalasset.canton.topology.PartyId
import com.digitalasset.canton.tracing.TraceContext
import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent
import slick.dbio.DBIOAction
import slick.dbio.{DBIOAction, Effect, NoStream}
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
import slick.jdbc.{GetResult, JdbcProfile}

Expand Down Expand Up @@ -158,12 +158,41 @@ class AcsSnapshotStore(
join creates_to_insert on inserted_rows.create_id = creates_to_insert.row_id
having min(inserted_rows.row_id) is not null;
""").toActionBuilder.asUpdate
storage.update(statement, "insertNewSnapshot")
storage.queryAndUpdate(withExclusiveSnapshotDataLock(statement), "insertNewSnapshot")
}.andThen { _ =>
AcsSnapshotStore.PreventConcurrentSnapshotsSemaphore.release()
}
}

/** Wraps the given action in a transaction that holds an exclusive lock on the acs_snapshot_data table.
*
* Note: The acs_snapshot_data table must not have interleaved rows from two different acs snapshots.
* In rare cases, it can happen that the application crashes while writing a snapshot, then
* restarts and starts writing a different snapshot while the previous statement is still running.
*
* The exclusive lock prevents this.
* We use a transaction-scoped advisory lock, which is released when the transaction ends.
* Regular locks (e.g. obtained via `LOCK TABLE ... IN EXCLUSIVE MODE`) would conflict with harmless
* background operations like autovacuum or create index concurrently.
*
* In case the application crashes while holding the lock, the server _should_ close the connection
* and abort the transaction as soon as it detects a disconnect.
* TODO(#2488): Verify that the server indeed closes connections in a reasonable time.
*/
private def withExclusiveSnapshotDataLock[T, E <: Effect](
action: DBIOAction[T, NoStream, E]
): DBIOAction[T, NoStream, Effect.Read & Effect.Transactional & E] =
(for {
lockResult <- sql"SELECT pg_try_advisory_xact_lock(${AdvisoryLockIds.acsSnapshotDataInsert})"
.as[Boolean]
.head
result <- lockResult match {
case true => action
// Lock conflicts should almost never happen. If they do, we fail immediately and rely on the trigger infrastructure to retry and log errors.
case false => DBIOAction.failed(new Exception("Failed to acquire exclusive lock"))
}
} yield result).transactionally

def deleteSnapshot(
snapshot: AcsSnapshot
)(implicit
Expand Down
Loading