Skip to content

Commit 7f0e956

Browse files
[release-line-3.4] Update 2025-12-30.21 (#418)
Reference commit: 54b52623bc
1 parent 95f0b34 commit 7f0e956

File tree

108 files changed

+1586
-1545
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+1586
-1545
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package com.digitalasset.canton.util
5+
6+
import java.util.concurrent.locks.ReentrantLock
7+
8+
/** Lock to be used instead of blocking synchronized
9+
*
10+
* The fork join pool is leaking threads potentially with every invocation of blocking. Therefore,
11+
* only invoke it if really necessary.
12+
*/
13+
class Mutex {
14+
private val lock = new ReentrantLock()
15+
16+
@deprecated("use harmonize, not synchronize", since = "3.4")
17+
def synchronized[T](body: => T): T = sys.error("use harmonize to distinguish")
18+
19+
def exclusive[T](f: => T): T = {
20+
def runAndUnlock() =
21+
try {
22+
f
23+
} finally {
24+
lock.unlock()
25+
}
26+
if (lock.tryLock()) {
27+
runAndUnlock()
28+
} else {
29+
// this may trigger a ForkJoinPool.managedBlock
30+
scala.concurrent.blocking {
31+
lock.lock()
32+
runAndUnlock()
33+
}
34+
}
35+
}
36+
37+
}
38+
39+
object Mutex {
40+
def apply(): Mutex = new Mutex
41+
}

community/app-base/src/main/scala/com/digitalasset/canton/admin/api/client/commands/AdminCommand.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ import com.digitalasset.canton.admin.api.client.commands.GrpcAdminCommand.{
1010
}
1111
import com.digitalasset.canton.config.NonNegativeDuration
1212
import com.digitalasset.canton.discard.Implicits.DiscardOps
13+
import com.digitalasset.canton.util.Mutex
1314
import io.grpc.stub.{AbstractStub, StreamObserver}
1415
import io.grpc.{Context, ManagedChannel, Status, StatusException, StatusRuntimeException}
1516

1617
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
1718
import scala.collection.mutable.ListBuffer
1819
import scala.concurrent.duration.FiniteDuration
19-
import scala.concurrent.{Future, Promise, blocking}
20+
import scala.concurrent.{Future, Promise}
2021

2122
trait AdminCommand[Req, Res, Result] {
2223

@@ -104,9 +105,10 @@ object GrpcAdminCommand {
104105
): Future[Seq[Result]] = {
105106
val promise = Promise[Seq[Result]]()
106107
val buffer = ListBuffer[Result]()
108+
val lock = new Mutex()
107109
val context = Context.ROOT.withCancellation()
108110

109-
def success(): Unit = blocking(buffer.synchronized {
111+
def success(): Unit = (lock.exclusive {
110112
promise.trySuccess(buffer.toList).discard[Boolean]
111113
context.close()
112114
})
@@ -117,7 +119,7 @@ object GrpcAdminCommand {
117119
new StreamObserver[Response]() {
118120
override def onNext(value: Response): Unit = {
119121
val extracted = extract(value)
120-
blocking(buffer.synchronized {
122+
(lock.exclusive {
121123
if (buffer.lengthCompare(expected) < 0) {
122124
buffer ++= extracted
123125
if (buffer.lengthCompare(expected) >= 0) {

community/app-base/src/main/scala/com/digitalasset/canton/console/AmmoniteCacheLock.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import ammonite.runtime.Storage
77
import cats.syntax.either.*
88
import com.digitalasset.canton.logging.TracedLogger
99
import com.digitalasset.canton.tracing.TraceContext
10+
import com.digitalasset.canton.util.Mutex
1011

1112
import java.io.{File, RandomAccessFile}
1213
import java.nio.channels.OverlappingFileLockException
1314
import java.nio.file.Files
14-
import scala.concurrent.blocking
1515
import scala.util.control.NonFatal
1616

1717
trait AmmoniteCacheLock {
@@ -22,6 +22,8 @@ trait AmmoniteCacheLock {
2222

2323
object AmmoniteCacheLock {
2424

25+
private val lock = new Mutex()
26+
2527
// Don't change this to lazy val, as the underlying InMemory storage is not thread safe.
2628
def InMemory: AmmoniteCacheLock = new AmmoniteCacheLock {
2729
override def release(): Unit = ()
@@ -56,7 +58,7 @@ object AmmoniteCacheLock {
5658
lock <- go(0)
5759
} yield lock
5860
attempt match {
59-
case Right(lock) => lock
61+
case Right(cacheLock) => cacheLock
6062
case Left(err) =>
6163
logger.warn(
6264
s"Failed to acquire ammonite cache directory due to $err. Will use in-memory instead."
@@ -87,7 +89,7 @@ object AmmoniteCacheLock {
8789

8890
private def acquireLock(logger: TracedLogger, path: os.Path, isRepl: Boolean)(implicit
8991
traceContext: TraceContext
90-
): Either[Throwable, Option[AmmoniteCacheLock]] = blocking(synchronized {
92+
): Either[Throwable, Option[AmmoniteCacheLock]] = (lock.exclusive {
9193
try {
9294
val myLockFile = path / "lock"
9395
logger.debug(s"Attempting to obtain lock $myLockFile")
@@ -97,12 +99,12 @@ object AmmoniteCacheLock {
9799
logger.debug(s"Failed to acquire lock for $myLockFile")
98100
out.close()
99101
Right(None)
100-
case Some(lock) =>
102+
case Some(channelLock) =>
101103
Right(Some(new AmmoniteCacheLock {
102104
override def release(): Unit =
103105
try {
104106
logger.debug(s"Releasing lock $myLockFile...")
105-
lock.release()
107+
channelLock.release()
106108
out.close()
107109
if (!Files.deleteIfExists(myLockFile.toNIO)) { // throws when the file exists but cannot be deleted
108110
logger.warn(s"Failed to delete lock file $myLockFile because it did not exist")

community/app-base/src/main/scala/com/digitalasset/canton/console/GrpcAdminCommandRunner.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ import com.digitalasset.canton.networking.grpc.{
2929
GrpcManagedChannel,
3030
}
3131
import com.digitalasset.canton.tracing.{Spanning, TraceContext}
32+
import com.digitalasset.canton.util.Mutex
3233
import io.opentelemetry.api.trace.Tracer
3334

3435
import scala.collection.concurrent.TrieMap
3536
import scala.concurrent.duration.{Duration, FiniteDuration}
36-
import scala.concurrent.{ExecutionContextExecutor, Future, blocking}
37+
import scala.concurrent.{ExecutionContextExecutor, Future}
3738

3839
/** Attempt to run a grpc admin-api command against whatever is pointed at in the config
3940
* @param commandTimeouts
@@ -58,6 +59,7 @@ class GrpcAdminCommandRunner(
5859
loggerFactory,
5960
)
6061
private val channels = TrieMap[(String, String, Port), GrpcManagedChannel]()
62+
private val lock = new Mutex()
6163

6264
def runCommandAsync[Result](
6365
instanceName: String,
@@ -151,7 +153,7 @@ class GrpcAdminCommandRunner(
151153
instanceName: String,
152154
clientConfig: ClientConfig,
153155
): GrpcManagedChannel =
154-
blocking(synchronized {
156+
(lock.exclusive {
155157
val addr = (instanceName, clientConfig.address, clientConfig.port)
156158
channels.getOrElseUpdate(
157159
addr,
@@ -168,7 +170,7 @@ class GrpcAdminCommandRunner(
168170
override def onFirstClose(): Unit =
169171
closeChannels()
170172

171-
def closeChannels(): Unit = blocking(synchronized {
173+
def closeChannels(): Unit = (lock.exclusive {
172174
channels.values.foreach(_.close())
173175
channels.clear()
174176
})

community/app-base/src/main/scala/com/digitalasset/canton/environment/Environment.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import com.digitalasset.canton.time.*
4343
import com.digitalasset.canton.tracing.TraceContext.withNewTraceContext
4444
import com.digitalasset.canton.tracing.{NoTracing, TraceContext, TracerProvider}
4545
import com.digitalasset.canton.util.FutureInstances.parallelFuture
46-
import com.digitalasset.canton.util.{MonadUtil, PekkoUtil, SingleUseCell}
46+
import com.digitalasset.canton.util.{MonadUtil, Mutex, PekkoUtil, SingleUseCell}
4747
import com.google.common.annotations.VisibleForTesting
4848
import io.circe.Encoder
4949
import io.circe.generic.semiauto.deriveEncoder
@@ -53,8 +53,8 @@ import org.slf4j.bridge.SLF4JBridgeHandler
5353
import java.util.concurrent.ScheduledExecutorService
5454
import java.util.concurrent.atomic.AtomicReference
5555
import scala.collection.mutable.ListBuffer
56+
import scala.concurrent.Future
5657
import scala.concurrent.duration.DurationInt
57-
import scala.concurrent.{Future, blocking}
5858
import scala.util.control.NonFatal
5959

6060
/** Holds all significant resources held by this process.
@@ -209,6 +209,8 @@ class Environment(
209209
private val deadlockConfig = config.monitoring.deadlockDetection
210210
protected def timeouts: ProcessingTimeout = config.parameters.timeouts.processing
211211

212+
private val lock = new Mutex()
213+
212214
protected val futureSupervisor =
213215
if (config.monitoring.logging.logSlowFutures)
214216
new FutureSupervisor.Impl(timeouts.slowFutureWarn, loggerFactory)
@@ -567,7 +569,7 @@ class Environment(
567569

568570
def addUserCloseable(closeable: AutoCloseable): Unit = userCloseables.append(closeable)
569571

570-
override def close(): Unit = blocking(this.synchronized {
572+
override def close(): Unit = (lock.exclusive {
571573
val closeActorSystem: AutoCloseable =
572574
LifeCycle.toCloseableActorSystem(actorSystem, logger, timeouts)
573575

community/app-base/src/main/scala/com/digitalasset/canton/environment/Nodes.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ import com.digitalasset.canton.synchronizer.sequencer.config.{
3131
}
3232
import com.digitalasset.canton.synchronizer.sequencer.{SequencerNode, SequencerNodeBootstrap}
3333
import com.digitalasset.canton.tracing.TraceContext
34+
import com.digitalasset.canton.util.Mutex
3435
import com.digitalasset.canton.util.Thereafter.syntax.*
3536

3637
import scala.collection.concurrent.TrieMap
37-
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
38+
import scala.concurrent.{ExecutionContext, Future, Promise}
3839

3940
/** Group of CantonNodes of the same type (mediators, participants, sequencers). */
4041
trait Nodes[+Node <: CantonNode, +NodeBootstrap <: CantonNodeBootstrap[Node]]
@@ -128,6 +129,7 @@ class ManagedNodes[
128129
with HasCloseContext
129130
with FlagCloseableAsync {
130131

132+
private val lock = new Mutex()
131133
private val nodes = TrieMap[InstanceName, ManagedNodeStage[NodeBootstrap]]()
132134
override def names(): Seq[InstanceName] = configs.keys.toSeq
133135

@@ -259,8 +261,8 @@ class ManagedNodes[
259261
params = parametersFor(name)
260262
} yield (config, params)
261263

262-
override def migrateDatabase(name: InstanceName): Either[StartupError, Unit] = blocking(
263-
synchronized {
264+
override def migrateDatabase(name: InstanceName): Either[StartupError, Unit] = (
265+
lock.exclusive {
264266
for {
265267
cAndP <- configAndParams(name)
266268
(config, params) = cAndP
@@ -269,8 +271,8 @@ class ManagedNodes[
269271
}
270272
)
271273

272-
override def repairDatabaseMigration(name: InstanceName): Either[StartupError, Unit] = blocking(
273-
synchronized {
274+
override def repairDatabaseMigration(name: InstanceName): Either[StartupError, Unit] = (
275+
lock.exclusive {
274276
for {
275277
cAndP <- configAndParams(name)
276278
(config, params) = cAndP

community/app-base/src/main/scala/com/digitalasset/canton/metrics/CsvReporter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package com.digitalasset.canton.metrics
66
import com.digitalasset.canton.data.CantonTimestamp
77
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
88
import com.digitalasset.canton.tracing.NoTracing
9+
import com.digitalasset.canton.util.Mutex
910
import io.opentelemetry.sdk.common.CompletableResultCode
1011
import io.opentelemetry.sdk.metrics.InstrumentType
1112
import io.opentelemetry.sdk.metrics.`export`.MetricExporter
@@ -15,7 +16,6 @@ import java.io.{BufferedWriter, File, FileWriter}
1516
import java.util
1617
import java.util.concurrent.atomic.AtomicBoolean
1718
import scala.collection.concurrent.TrieMap
18-
import scala.concurrent.blocking
1919
import scala.jdk.CollectionConverters.*
2020
import scala.util.{Failure, Success, Try}
2121

@@ -26,7 +26,7 @@ class CsvReporter(config: MetricsReporterConfig.Csv, val loggerFactory: NamedLog
2626

2727
private val running = new AtomicBoolean(true)
2828
private val files = new TrieMap[String, (FileWriter, BufferedWriter)]
29-
private val lock = new Object()
29+
private val lock = new Mutex()
3030

3131
def getAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality =
3232
AggregationTemporality.CUMULATIVE
@@ -52,8 +52,8 @@ class CsvReporter(config: MetricsReporterConfig.Csv, val loggerFactory: NamedLog
5252
}
5353

5454
def `export`(metrics: util.Collection[MetricData]): CompletableResultCode = {
55-
blocking {
56-
lock.synchronized {
55+
{
56+
lock.exclusive {
5757
if (running.get()) {
5858
val ts = CantonTimestamp.now()
5959
MetricValue.allFromMetricData(metrics.asScala).foreach { case (value, metadata) =>

community/app/src/test/scala/com/digitalasset/canton/integration/tests/examples/ExampleIntegrationTest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ import com.digitalasset.canton.integration.{
1414
IsolatedEnvironments,
1515
}
1616
import com.digitalasset.canton.logging.NamedLogging
17-
import com.digitalasset.canton.util.ConcurrentBufferedLogger
1817
import com.digitalasset.canton.util.ShowUtil.*
19-
20-
import scala.concurrent.blocking
18+
import com.digitalasset.canton.util.{ConcurrentBufferedLogger, Mutex}
2119

2220
abstract class ExampleIntegrationTest(configPaths: File*)
2321
extends BaseIntegrationTest
@@ -64,6 +62,7 @@ trait HasConsoleScriptRunner { this: NamedLogging =>
6462
}
6563

6664
object `ExampleIntegrationTest` {
65+
private val lock = new Mutex()
6766
lazy val dockerImagesPath: File = "docker" / "canton" / "images"
6867
lazy val examplesPath: File = "community" / "app" / "src" / "pack" / "examples"
6968
lazy val simpleTopology: File = examplesPath / "01-simple-topology"
@@ -75,7 +74,8 @@ object `ExampleIntegrationTest` {
7574
lazy val advancedConfTestEnv: File =
7675
"community" / "app" / "src" / "test" / "resources" / "advancedConfDef.env"
7776
lazy val bftSequencerConfigurationFolder: File = examplesPath / "11-bft-sequencer"
78-
def ensureSystemProperties(kvs: (String, String)*): Unit = blocking(synchronized {
77+
78+
def ensureSystemProperties(kvs: (String, String)*): Unit = (lock.exclusive {
7979
kvs.foreach { case (key, value) =>
8080
Option(System.getProperty(key)) match {
8181
case Some(oldValue) =>

community/app/src/test/scala/com/digitalasset/canton/util/ReleaseUtils.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ import com.digitalasset.canton.version.{
1919

2020
import java.nio.file.{Files, Paths}
2121
import scala.collection.concurrent.TrieMap
22-
import scala.concurrent.{ExecutionContext, Future, blocking}
22+
import scala.concurrent.{ExecutionContext, Future}
2323

2424
/** A collection of small utilities for tests that have no obvious home */
25+
@SuppressWarnings(Array("com.digitalasset.canton.RequireBlocking"))
2526
object ReleaseUtils {
2627
final case class TestedRelease(
2728
releaseVersion: ReleaseVersion,
@@ -124,6 +125,7 @@ object ReleaseUtils {
124125
* synchronize between different requests for the same release.
125126
*/
126127
private val releasesRetrieval: TrieMap[ReleaseVersion, Future[String]] = TrieMap.empty
128+
private val lock = new Mutex()
127129

128130
/** If the .tar.gz corresponding to release is not found locally, attempts to download it from
129131
* artifactory. Then, extract the .tar.gz file.
@@ -136,15 +138,13 @@ object ReleaseUtils {
136138
def retrieve(
137139
release: ReleaseVersion
138140
)(implicit elc: ErrorLoggingContext, ec: ExecutionContext): Future[String] =
139-
blocking {
140-
synchronized {
141-
releasesRetrieval.get(release) match {
142-
case Some(releaseRetrieval) => releaseRetrieval
143-
case None =>
144-
val releaseRetrieval = Future(downloadAndExtract(release))
145-
releasesRetrieval.put(release, releaseRetrieval).discard
146-
releaseRetrieval
147-
}
141+
lock.exclusive {
142+
releasesRetrieval.get(release) match {
143+
case Some(releaseRetrieval) => releaseRetrieval
144+
case None =>
145+
val releaseRetrieval = Future(downloadAndExtract(release))
146+
releasesRetrieval.put(release, releaseRetrieval).discard
147+
releaseRetrieval
148148
}
149149
}
150150

community/base/src/main/scala/com/digitalasset/canton/auth/AsyncForwardingListener.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
package com.digitalasset.canton.auth
55

6+
import com.digitalasset.canton.util.Mutex
67
import io.grpc.ServerCall
78

89
import scala.collection.mutable
9-
import scala.concurrent.blocking
1010

1111
/** This listener buffers all messages until `setNextListener` is called, at which point all
1212
* buffered messages are sent to the given listener. From then on, all future messages are sent
@@ -18,18 +18,18 @@ import scala.concurrent.blocking
1818
@SuppressWarnings(Array("org.wartremover.warts.Var"))
1919
abstract class AsyncForwardingListener[ReqT] extends ServerCall.Listener[ReqT] {
2020
protected type Listener = ServerCall.Listener[ReqT]
21-
private[this] val lock = new Object
21+
private[this] val lock = new Mutex()
2222
private[this] val stash: mutable.ListBuffer[Listener => Unit] = new mutable.ListBuffer
2323
private[this] var nextListener: Option[Listener] = None
2424

25-
private[this] def enqueueOrProcess(msg: Listener => Unit): Unit = blocking(lock.synchronized {
25+
private[this] def enqueueOrProcess(msg: Listener => Unit): Unit = (lock.exclusive {
2626
val _ = nextListener.fold {
2727
val _ = stash.append(msg)
2828
()
2929
}(msg)
3030
})
3131

32-
protected def setNextListener(listener: Listener): Unit = blocking(lock.synchronized {
32+
protected def setNextListener(listener: Listener): Unit = (lock.exclusive {
3333
nextListener = Some(listener)
3434
stash.foreach(msg => msg(listener))
3535
})

0 commit comments

Comments
 (0)