-
Notifications
You must be signed in to change notification settings - Fork 59
Lock acs_snapshot_data while writing #2436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| // Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package org.lfdecentralizedtrust.splice.store.db | ||
|
|
||
| /** Registry for advisory lock identifiers used in splice applications. | ||
| */ | ||
| object AdvisoryLockIds { | ||
| // 0x73706c equals ASCII encoded "spl". Modeled after Canton's HaConfig, which uses ASCII "dml". | ||
| private val base: Long = 0x73706c00 | ||
|
|
||
| final val acsSnapshotDataInsert: Long = base + 1 | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.{ | |
| } | ||
| import org.lfdecentralizedtrust.splice.store.UpdateHistory.SelectFromCreateEvents | ||
| import org.lfdecentralizedtrust.splice.store.{HardLimit, Limit, LimitHelpers, UpdateHistory} | ||
| import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsQueries} | ||
| import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsQueries, AdvisoryLockIds} | ||
| import org.lfdecentralizedtrust.splice.util.{Contract, HoldingsSummary, PackageQualifiedName} | ||
| import com.digitalasset.canton.data.CantonTimestamp | ||
| import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown} | ||
|
|
@@ -25,7 +25,7 @@ import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLAc | |
| import com.digitalasset.canton.topology.PartyId | ||
| import com.digitalasset.canton.tracing.TraceContext | ||
| import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent | ||
| import slick.dbio.DBIOAction | ||
| import slick.dbio.{DBIOAction, Effect, NoStream} | ||
| import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton | ||
| import slick.jdbc.{GetResult, JdbcProfile} | ||
|
|
||
|
|
@@ -158,12 +158,42 @@ class AcsSnapshotStore( | |
| join creates_to_insert on inserted_rows.create_id = creates_to_insert.row_id | ||
| having min(inserted_rows.row_id) is not null; | ||
| """).toActionBuilder.asUpdate | ||
| storage.update(statement, "insertNewSnapshot") | ||
| storage.queryAndUpdate(withExclusiveSnapshotDataLock(statement), "insertNewSnapshot") | ||
| }.andThen { _ => | ||
| AcsSnapshotStore.PreventConcurrentSnapshotsSemaphore.release() | ||
| } | ||
| } | ||
|
|
||
| /** Wraps the given action in a transaction that holds an exclusive lock on the acs_snapshot_data table. | ||
rautenrieth-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * | ||
| * Note: The acs_snapshot_data table must not have interleaved rows from two different acs snapshots. | ||
| * In rare cases, it can happen that the application crashes while writing a snapshot, then | ||
| * restarts and starts writing a different snapshot while the previous statement is still running. | ||
rautenrieth-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * | ||
| * The exclusive lock prevents this. | ||
| * We use a transaction-scoped advisory lock, which is released when the transaction ends. | ||
| * Regular locks (e.g. obtained via `LOCK TABLE ... IN EXCLUSIVE MODE`) would conflict with harmless | ||
| * background operations like autovacuum or create index concurrently. | ||
rautenrieth-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * | ||
| * In case the application crashes while holding the lock, the server should close the connection | ||
| * and abort the transaction as soon as it detects a disconnect. | ||
| * See [[com.digitalasset.canton.platform.store.backend.postgresql.PostgresDataSourceConfig]] for our connection keepalive settings. | ||
| * With default settings, the server should detect a dead connection within ~15sec. | ||
|
||
| */ | ||
| private def withExclusiveSnapshotDataLock[T, E <: Effect]( | ||
| action: DBIOAction[T, NoStream, E] | ||
| ): DBIOAction[T, NoStream, Effect.Read & Effect.Transactional & E] = | ||
| (for { | ||
| lockResult <- sql"SELECT pg_try_advisory_xact_lock(${AdvisoryLockIds.acsSnapshotDataInsert})" | ||
| .as[Boolean] | ||
| .head | ||
| result <- lockResult match { | ||
| case true => action | ||
| // Lock conflicts should almost never happen. If they do, we fail immediately and rely on the trigger infrastructure to retry and log errors. | ||
rautenrieth-da marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case false => DBIOAction.failed(new Exception("Failed to acquire exclusive lock")) | ||
| } | ||
| } yield result).transactionally | ||
|
|
||
| def deleteSnapshot( | ||
| snapshot: AcsSnapshot | ||
| )(implicit | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.