Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import slick.jdbc.{GetResult, JdbcProfile}
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLActionBuilderChain
import com.digitalasset.canton.resource.DbStorage.SQLActionBuilderChain
import com.digitalasset.canton.util.MonadUtil
import org.lfdecentralizedtrust.splice.store.events.SpliceCreatedEvent
import org.lfdecentralizedtrust.splice.store.ImportUpdatesBackfilling.{
DestinationImportUpdates,
Expand Down Expand Up @@ -713,9 +714,11 @@ class UpdateHistory(
)
migrationsWithCorruptSnapshots()
.flatMap { migrations =>
Future.sequence(migrations.map { migrationId =>
deleteAcsSnapshotsAfter(historyId, migrationId, CantonTimestamp.MinValue)
})
// Using sequential traverse to avoid running many expensive delete queries in parallel
MonadUtil
.sequentialTraverse(migrations.toSeq) { migrationId =>
deleteAcsSnapshotsAfter(historyId, migrationId, CantonTimestamp.MinValue)
}
}
.andThen { case _ =>
logger.info(s"Finished checking for corrupt ACS snapshots")
Expand Down Expand Up @@ -819,11 +822,14 @@ class UpdateHistory(
where history_id = $historyId and migration_id = $migrationId and snapshot_record_time > $recordTime
returning first_row_id, last_row_id
""".asUpdateReturning[(Long, Long)]
expectedDataRows = dataToDelete.foldLeft(0L)((total, r) => total + (r._2 - r._1 + 1))
mergedDataToDelete = mergeRowRanges(dataToDelete)
expectedDataRows = mergedDataToDelete.foldLeft(0L)((total, r) => total + (r._2 - r._1 + 1))
_ = logger.info(
s"Deleted ${dataToDelete.size} rows from acs_snapshot, expecting to delete $expectedDataRows rows from acs_snapshot_data"
s"Deleted ${dataToDelete.size} rows from acs_snapshot, " +
s"expecting to delete $expectedDataRows rows from acs_snapshot_data" +
s" in ${mergedDataToDelete.size} ranges."
)
deletedDataRows <- DBIO.traverse(dataToDelete) { case (first_row, last_row) =>
deletedDataRows <- DBIO.traverse(mergedDataToDelete) { case (first_row, last_row) =>
sqlu"""
delete from acs_snapshot_data
where row_id between $first_row and $last_row
Expand Down Expand Up @@ -2378,6 +2384,21 @@ object UpdateHistory {
// so we read them back as an arbitrary value.
private def missingString: String = ""
private def missingStringSeq: Seq[String] = Seq.empty

private[store] def mergeRowRanges(
rowRanges: Seq[(Long, Long)]
): Seq[(Long, Long)] = {
rowRanges
.sortBy(_._1)
.foldLeft(Seq.empty[(Long, Long)]) {
case (Nil, current) =>
Seq(current)
case (acc :+ last, current) if last._2 + 1 == current._1 =>
acc :+ (last._1, current._2)
case (acc, current) =>
acc :+ current
}
}
}

final case class TreeUpdateWithMigrationId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,5 +690,37 @@ class UpdateHistoryTest extends UpdateHistoryTestBase {
}
}
}

"mergeRowRanges" should {
"merge empty sequence" in {
val result = UpdateHistory.mergeRowRanges(Seq.empty)
result shouldBe Seq.empty
}
"merge sequence with one element" in {
val result = UpdateHistory.mergeRowRanges(Seq((1L, 2L)))
result shouldBe Seq((1L, 2L))
}
"merge sequence with two adjacent ranges" in {
val result = UpdateHistory.mergeRowRanges(Seq((1L, 2L), (3L, 4L)))
result shouldBe Seq((1L, 4L))
}
"merge sequence with two adjacent ranges in reverse order" in {
val result = UpdateHistory.mergeRowRanges(Seq((3L, 4L), (1L, 2L)))
result shouldBe Seq((1L, 4L))
}
"merge sequence with two non-adjacent ranges" in {
val result = UpdateHistory.mergeRowRanges(Seq((1L, 2L), (4L, 5L)))
result shouldBe Seq((1L, 2L), (4L, 5L))
}
"merge sequence with many adjacent ranges" in {
val result = UpdateHistory.mergeRowRanges(Seq((5L, 6L), (1L, 2L), (3L, 4L), (7L, 8L)))
result shouldBe Seq((1L, 8L))
}
"merge sequence with several groups of ranges" in {
val result =
UpdateHistory.mergeRowRanges(Seq((6L, 7L), (1L, 2L), (9L, 10L), (3L, 4L), (11L, 12L)))
result shouldBe Seq((1L, 4L), (6L, 7L), (9L, 12L))
}
}
}
}