Skip to content

Commit ea6c560

Browse files
committed
.
1 parent 3df6898 commit ea6c560

7 files changed

Lines changed: 112 additions & 237 deletions

File tree

core/api/daemon/src/mill/api/daemon/internal/EvaluatorApi.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ trait EvaluatorApi extends AutoCloseable {
5656
private[mill] def withIsFinalDepth(isFinalDepth: Boolean): EvaluatorApi = this
5757
}
5858
object EvaluatorApi {
59-
final case class SelectiveReuseDecision(reusable: Boolean, nextMetadata: String)
59+
private[mill] final case class SelectiveReuseDecision(reusable: Boolean, nextMetadata: String)
6060

6161
trait Result[T] {
6262
def watchable: Seq[Watchable]

core/constants/src/mill/constants/EnvVars.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class EnvVars {
3434
public static final String MILL_BSP_OUTPUT_DIR = "MILL_BSP_OUTPUT_DIR";
3535

3636
/**
37-
* If set to "1", Mill will re-use the regular out/ folder instead of
37+
* If set to "1", Mill will re-use the regular @{Link OutFiles#out} folder instead of
3838
* using a separate one for BSP output.
3939
*/
4040
public static final String MILL_NO_SEPARATE_BSP_OUTPUT_DIR = "MILL_NO_SEPARATE_BSP_OUTPUT_DIR";

core/eval/src/mill/eval/EvaluatorImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ final class EvaluatorImpl(
319319
selectiveExecution: Boolean = false
320320
): Evaluator.Result[T] = {
321321
val selectiveExecutionEnabled = selectiveExecution && !tasks.exists(_.isExclusiveCommand)
322+
322323
val (selectedTasks, selectiveResults, maybeNewMetadata) =
323324
if (!selectiveExecutionEnabled) (tasks, Map.empty, None)
324325
else {

core/exec/src/mill/exec/Execution.scala

Lines changed: 71 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ case class Execution(
4545
) extends GroupExecution with AutoCloseable {
4646

4747
// Track nesting depth of executeTasks calls to only show final status on outermost call
48-
private val executionNestingDepth = new AtomicInteger(0)
48+
val executionNestingDepth = new AtomicInteger(0)
4949

5050
// Lazily computed worker dependency graph, cached for the duration of the execution. It's
5151
// ok to take a snapshot of the cache, since the workerCache entries we may want to remove
@@ -126,70 +126,6 @@ case class Execution(
126126

127127
def withIsFinalDepth(newIsFinalDepth: Boolean) = this.copy(isFinalDepth = newIsFinalDepth)
128128

129-
private def newDownstreamTracker(
130-
indexToTerminal: Array[Task[?]],
131-
interGroupDeps: Map[Task[?], Seq[Task[?]]]
132-
): Execution.LeaseTracker = {
133-
val retainedLeasesByTask =
134-
new ConcurrentHashMap[Task[?], java.util.concurrent.ConcurrentLinkedQueue[
135-
LauncherLocking.Lease
136-
]]()
137-
val pendingCount = new ConcurrentHashMap[Task[?], AtomicInteger]()
138-
for (t <- indexToTerminal) {
139-
pendingCount.put(t, new AtomicInteger(0))
140-
retainedLeasesByTask.put(
141-
t,
142-
new java.util.concurrent.ConcurrentLinkedQueue[LauncherLocking.Lease]()
143-
)
144-
}
145-
for ((_, deps) <- interGroupDeps; dep <- deps) {
146-
val c = pendingCount.get(dep)
147-
if (c != null) c.incrementAndGet()
148-
}
149-
150-
def releaseLeasesFor(task: Task[?]): Unit = {
151-
val q = retainedLeasesByTask.remove(task)
152-
if (q != null) {
153-
var lease = q.poll()
154-
while (lease != null) {
155-
try lease.close()
156-
catch { case _: Throwable => () }
157-
lease = q.poll()
158-
}
159-
}
160-
}
161-
162-
new Execution.LeaseTracker {
163-
override def retain(task: Task[?], lease: LauncherLocking.Lease): Unit = {
164-
val q = retainedLeasesByTask.get(task)
165-
if (q != null) q.add(lease)
166-
else
167-
try lease.close()
168-
catch { case _: Throwable => () }
169-
}
170-
171-
override def onCompleted(terminal: Task[?]): Unit = {
172-
for (dep <- interGroupDeps.getOrElse(terminal, Nil)) {
173-
val c = pendingCount.get(dep)
174-
if (c != null && c.decrementAndGet() == 0) releaseLeasesFor(dep)
175-
}
176-
val ownCount = pendingCount.get(terminal)
177-
if (ownCount != null && ownCount.get() == 0) releaseLeasesFor(terminal)
178-
}
179-
180-
override def drain(): Unit = {
181-
import scala.jdk.CollectionConverters.*
182-
retainedLeasesByTask.values().asScala
183-
.flatMap(q => Iterator.continually(q.poll()).takeWhile(_ != null))
184-
.foreach { lease =>
185-
try lease.close()
186-
catch { case _: Throwable => () }
187-
}
188-
retainedLeasesByTask.clear()
189-
}
190-
}
191-
}
192-
193129
/**
194130
* @param goals The tasks that need to be evaluated
195131
* @param reporter A function that will accept a module id and provide a listener for build problems in that module
@@ -208,7 +144,9 @@ case class Execution(
208144
PathRef.validatedPaths.withValue(new PathRef.ValidatedPaths()) {
209145
execute0(goals, logger, reporter, testReporter, serialCommandExec)
210146
}
211-
} finally executionNestingDepth.decrementAndGet()
147+
} finally {
148+
executionNestingDepth.decrementAndGet()
149+
}
212150
}
213151

214152
private def execute0(
@@ -283,7 +221,7 @@ case class Execution(
283221
downstreamEdges.getOrElse(t, Set())
284222
)
285223

286-
val tracker = newDownstreamTracker(indexToTerminal, interGroupDeps)
224+
val tracker = new Execution.LeaseTracker(indexToTerminal, interGroupDeps)
287225
def onTerminalCompleted(t: Task[?]): Unit = tracker.onCompleted(t)
288226
try {
289227

@@ -379,13 +317,13 @@ case class Execution(
379317
leaseTracker = tracker
380318
)
381319

320+
// Count new failures - if there are upstream failures, tasks should be skipped, not failed
382321
val newFailures = res.newResults.values.count(r => r.asFailing.isDefined)
383322

384-
// Count new failures: tasks with upstream failures should be skipped, not failed.
385323
rootFailedCount.addAndGet(newFailures)
386324
completedCount.incrementAndGet()
387325

388-
// Always show the completed count in the header after a task finishes.
326+
// Always show completed count in header after task finishes
389327
logger.prompt.setPromptHeaderPrefix(formatHeaderPrefix())
390328

391329
if (failFast && res.newResults.values.exists(_.asSuccess.isEmpty))
@@ -411,14 +349,14 @@ case class Execution(
411349
}
412350
}
413351
} catch {
414-
case e: mill.api.daemon.StopWithResponse[?] =>
415-
// Let StopWithResponse propagate: it is a controlled shutdown signal.
416-
throw e
352+
// Let StopWithResponse propagate - it's a controlled shutdown signal
353+
case e: mill.api.daemon.StopWithResponse[?] => throw e
354+
// Wrapping the fatal error in a non-fatal exception, so it would be caught by Scala's Future
355+
// infrastructure, rather than silently terminating the future and leaving downstream Awaits hanging.
417356
case e: Throwable if !mill.api.daemon.internal.NonFatal(e) =>
418-
// Wrap fatal errors so Scala's Future machinery reports them instead of
419-
// silently terminating the future and leaving downstream Awaits hanging.
420357
val nonFatal = new Exception(s"fatal exception occurred: $e", e)
421-
// Preserve the original stack trace, since that points at the real failure.
358+
// Set the stack trace of the non-fatal exception to the original exception's stack trace
359+
// as it actually indicates the location of the error.
422360
nonFatal.setStackTrace(e.getStackTrace)
423361
throw nonFatal
424362
} finally {
@@ -429,17 +367,17 @@ case class Execution(
429367
}
430368

431369
// Make sure we wait for all tasks from this batch to finish before starting the next
432-
// one, so we don't mix up exclusive and non-exclusive tasks running at the same time.
370+
// one, so we don't mix up exclusive and non-exclusive tasks running at the same time
433371
terminals.map(t => (t, Await.result(futures(t), duration.Duration.Inf)))
434372
}
435373

436-
// Run all non-command tasks according to the configured thread count,
437-
// but run exclusive commands in linear order.
438374
val (nonExclusiveTasks, leafExclusiveCommands) = indexToTerminal.partition {
439375
case t: Task.Named[_] => !downstreamOfExclusive.contains(t)
440376
case _ => !serialCommandExec
441377
}
442378

379+
// Run all non-command tasks according to the threads
380+
// given but run the commands in linear order
443381
val nonExclusiveResults = evaluateTerminals(nonExclusiveTasks, exclusive = false)
444382

445383
val exclusiveResults = evaluateTerminals(leafExclusiveCommands, exclusive = true)
@@ -511,10 +449,61 @@ case class Execution(
511449

512450
object Execution {
513451

514-
trait LeaseTracker {
515-
def retain(task: Task[?], lease: LauncherLocking.Lease): Unit
516-
def onCompleted(terminal: Task[?]): Unit
517-
def drain(): Unit
452+
453+
/**
454+
* Tracks per-task read leases on the workspace lock and releases them once
455+
* every downstream task that depends on the holder has completed.
456+
*/
457+
final class LeaseTracker(
458+
indexToTerminal: Array[Task[?]],
459+
interGroupDeps: Map[Task[?], Seq[Task[?]]]
460+
) {
461+
final class State {
462+
val pending = new AtomicInteger(0)
463+
val leases = new java.util.concurrent.ConcurrentLinkedQueue[LauncherLocking.Lease]()
464+
}
465+
466+
val states = new ConcurrentHashMap[Task[?], State]()
467+
for (t <- indexToTerminal) states.put(t, new State)
468+
for ((_, deps) <- interGroupDeps; dep <- deps) {
469+
val s = states.get(dep)
470+
if (s != null) s.pending.incrementAndGet()
471+
}
472+
473+
def closeQuietly(lease: LauncherLocking.Lease): Unit =
474+
try lease.close()
475+
catch { case _: Throwable => () }
476+
477+
def releaseLeasesFor(task: Task[?]): Unit = {
478+
val s = states.remove(task)
479+
if (s != null) {
480+
var lease = s.leases.poll()
481+
while (lease != null) {
482+
closeQuietly(lease)
483+
lease = s.leases.poll()
484+
}
485+
}
486+
}
487+
488+
def retain(task: Task[?], lease: LauncherLocking.Lease): Unit = {
489+
val s = states.get(task)
490+
if (s != null) s.leases.add(lease)
491+
else closeQuietly(lease)
492+
}
493+
494+
def onCompleted(terminal: Task[?]): Unit = {
495+
for (dep <- interGroupDeps.getOrElse(terminal, Nil)) {
496+
val s = states.get(dep)
497+
if (s != null && s.pending.decrementAndGet() == 0) releaseLeasesFor(dep)
498+
}
499+
val own = states.get(terminal)
500+
if (own != null && own.pending.get() == 0) releaseLeasesFor(terminal)
501+
}
502+
503+
def drain(): Unit = {
504+
import scala.jdk.CollectionConverters.*
505+
states.keys().asScala.toList.foreach(releaseLeasesFor)
506+
}
518507
}
519508

520509
/**

core/exec/src/mill/exec/ExecutionContexts.scala

Lines changed: 36 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -11,64 +11,6 @@ import mill.api.Logger
1111
import mill.api.daemon.internal.NonFatal
1212

1313
object ExecutionContexts {
14-
private object RunnablePriority {
15-
private val priorityMethod = new ClassValue[Option[java.lang.reflect.Method]] {
16-
override def computeValue(clazz: Class[?]): Option[java.lang.reflect.Method] =
17-
try {
18-
val method = clazz.getMethod("priority")
19-
method.trySetAccessible()
20-
Some(method)
21-
} catch {
22-
case _: ReflectiveOperationException => None
23-
}
24-
}
25-
26-
def apply(runnable: Runnable): Int =
27-
priorityMethod.get(runnable.getClass) match {
28-
case Some(method) => method.invoke(runnable).asInstanceOf[Int]
29-
case None => 0
30-
}
31-
}
32-
33-
private final class QueuedRunnable(
34-
runnable: Runnable,
35-
val priority: Int,
36-
val submissionIndex: Long
37-
) extends Runnable
38-
with Comparable[QueuedRunnable] {
39-
def run(): Unit = runnable.run()
40-
41-
override def compareTo(other: QueuedRunnable): Int =
42-
priority.compareTo(other.priority) match {
43-
case 0 =>
44-
// Comparable needs a total ordering, so break ties using submission order.
45-
submissionIndex.compareTo(other.submissionIndex)
46-
case n => n
47-
}
48-
}
49-
50-
private final class PriorityThreadPoolExecutor(
51-
threadCount: Int,
52-
threadFactory: ThreadFactory
53-
) extends ThreadPoolExecutor(
54-
threadCount,
55-
threadCount,
56-
60 * 1000,
57-
TimeUnit.SECONDS,
58-
new PriorityBlockingQueue[Runnable](),
59-
threadFactory
60-
) {
61-
private val submissionCount = new java.util.concurrent.atomic.AtomicLong()
62-
63-
override def execute(command: Runnable): Unit =
64-
super.execute(
65-
new QueuedRunnable(
66-
runnable = command,
67-
priority = RunnablePriority(command),
68-
submissionIndex = submissionCount.getAndIncrement()
69-
)
70-
)
71-
}
7214

7315
/**
7416
* Execution context that runs code immediately when scheduled, without
@@ -134,15 +76,29 @@ object ExecutionContexts {
13476
def reportFailure(t: Throwable): Unit = {}
13577
def close(): Unit = executor.shutdown()
13678

79+
val priorityRunnableCount = java.util.concurrent.atomic.AtomicLong()
80+
13781
/**
13882
* Subclass of [[java.lang.Runnable]] that assigns a priority to execute it
13983
*
14084
* Priority 0 is the default priority of all Mill task, priorities <0 can be used to
14185
* prioritize this runnable over most other tasks, while priorities >0 can be used to
14286
* de-prioritize it.
14387
*/
144-
class PriorityRunnable(val priority: Int, run0: () => Unit) extends Runnable {
145-
def run(): Unit = run0()
88+
class PriorityRunnable(val priority: Int, run0: () => Unit) extends Runnable
89+
with Comparable[PriorityRunnable] {
90+
def run() = run0()
91+
val priorityRunnableIndex: Long = priorityRunnableCount.getAndIncrement()
92+
override def compareTo(o: PriorityRunnable): Int = priority.compareTo(o.priority) match {
93+
case 0 =>
94+
// `Comparable` wants a *total* ordering, so we need to use `priorityRunnableIndex`
95+
// to break ties between instances with the same priority. This index is assigned
96+
// when a task is submitted, so it should more or less follow insertion order,
97+
// and is a `Long` which should be big enough never to overflow
98+
assert(this == o || this.priorityRunnableIndex != o.priorityRunnableIndex)
99+
this.priorityRunnableIndex.compareTo(o.priorityRunnableIndex)
100+
case n => n
101+
}
146102
}
147103

148104
/**
@@ -195,15 +151,25 @@ object ExecutionContexts {
195151
def createExecutor(threadCount: Int): ThreadPoolExecutor = {
196152
val executorIndex = executorCounter.incrementAndGet()
197153
val threadCounter = new AtomicInteger
198-
val threadFactory: ThreadFactory = runnable => {
199-
val threadIndex = threadCounter.incrementAndGet()
200-
val t = new Thread(
201-
runnable,
202-
s"execution-contexts-threadpool-$executorIndex-thread-$threadIndex"
203-
)
204-
t.setDaemon(true)
205-
t
206-
}
207-
new PriorityThreadPoolExecutor(threadCount, threadFactory)
154+
new ThreadPoolExecutor(
155+
threadCount,
156+
threadCount,
157+
60 * 1000,
158+
TimeUnit.SECONDS,
159+
// Use a `Deque` rather than a normal `Queue`, with the various `poll`/`take`
160+
// operations reversed, providing elements in a LIFO order. This ensures that
161+
// child `fork.async` tasks always take priority over parent tasks, avoiding
162+
// large numbers of blocked parent tasks from piling up
163+
new PriorityBlockingQueue[Runnable](),
164+
runnable => {
165+
val threadIndex = threadCounter.incrementAndGet()
166+
val t = new Thread(
167+
runnable,
168+
s"execution-contexts-threadpool-$executorIndex-thread-$threadIndex"
169+
)
170+
t.setDaemon(true)
171+
t
172+
}
173+
)
208174
}
209175
}

0 commit comments

Comments
 (0)