Stop cancelling already-succeeded executor pods on Spark job completion#99
Conversation
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
38358b2 to
9c455e1
Compare
src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
|
Since this is a draft PR, I haven't studied all the details, but it looks pretty good to me now. Were you thinking of doing anything else before moving it out of draft status? |
|
@GeorgeJahad, I am thinking of removing the third-party plugins and later adding plugins / or coming up with our own plugins. Otherwise looks good to me. REMOVE |
| private val pendingExecutors = new mutable.HashSet[String]() | ||
|
|
||
| /** Tracks executors that have reached a terminal state (succeeded, failed, cancelled) */ | ||
| private val terminalExecutors: java.util.Set[String] = |
There was a problem hiding this comment.
NIT: I find this method name a bit misleading, "terminal" to me sounds like they are terminating but haven't yet terminated.
To me "terminatedExecutors" seems clearer. Feel free to ignore this comment if you disagree.
There was a problem hiding this comment.
To me terminalExecutors felt it intentionally to represent all final states (succeeded, failed, cancelled), not only forcefully terminated ones. terminated felt narrower to me. But happy to rename if terminatedExecutors sounds more clrear.
There was a problem hiding this comment.
no, if it is just me, then leave it.
| // Configure TLS | ||
| val useTls = conf.get(ARMADA_EVENT_WATCHER_USE_TLS) | ||
| val channelBuilder = NettyChannelBuilder.forAddress(host, port) | ||
|
|
||
| val channelBuilderWithTls = if (useTls) { | ||
| logInfo("Using TLS for event watcher gRPC channel") | ||
| channelBuilder.useTransportSecurity() | ||
| } else { | ||
| logInfo("Using plaintext for event watcher gRPC channel") | ||
| channelBuilder.usePlaintext() | ||
| } | ||
|
|
||
| val channel = token match { | ||
| case Some(t) => | ||
| val metadata = new Metadata() | ||
| metadata.put( | ||
| Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER), | ||
| "Bearer " + t | ||
| ) | ||
| channelBuilderWithTls | ||
| .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)) | ||
| .build() | ||
| case None => | ||
| channelBuilderWithTls.build() | ||
| } | ||
|
|
There was a problem hiding this comment.
I believe this code isn't used anymore and should have been removed when the armada client was updated. If possible, please remove it.
There was a problem hiding this comment.
Does the client library now handle event watcher TLS? Because, as far as I remember, without TLS event watcher won't work in C3.
| /** Mark an executor as having reached a terminal state and clean it from pending set. | ||
| */ | ||
| private def markTerminal(executorId: String): Unit = { | ||
| terminalExecutors.add(executorId) |
There was a problem hiding this comment.
this is going to leave time when an executor can be in both terminalExecutors and pendingExecutors. Unless there is a good reason why, I would prefer that not to be the case.
| backend.getPendingExecutorCount shouldBe 1 | ||
| } | ||
|
|
||
| test("thread safety of terminal executor tracking") { |
There was a problem hiding this comment.
Test is a bit confusing, how about a comment like:
"Use multiple threads to terminate half the jobs, then confirm the number of remaining active ones"
| CHANGED_FILES=$(git diff --name-only HEAD 2>/dev/null; git diff --name-only --cached HEAD 2>/dev/null; git ls-files --others --exclude-standard 2>/dev/null) | ||
| BUILD_FILES=$(echo "$CHANGED_FILES" | grep -E '\.(scala|java)$|pom\.xml' | head -1) | ||
| if [ -z "$BUILD_FILES" ]; then | ||
| echo '{"systemMessage": "Skipped build verification (no code changes detected)"}' |
There was a problem hiding this comment.
I've been running my instance with this hook, and I don't think I've ever seen this message, even though I don't change my files very often.
Do you see it in your runs when you don't change files?
There was a problem hiding this comment.
I always see this instead:
Stop says: Build verification passed
There was a problem hiding this comment.
oddly for me it seems to run a compile no matter what i ask:
❯ what is the current load average
● Bash(uptime)
⎿ 07:44:11 up 12:48, 2 users, load average: 0.23, 0.23, 0.09
● Load averages: 0.23, 0.23, 0.09 (1min, 5min, 15min). Pretty idle.
⎿ Stop says: Build verification passed
There was a problem hiding this comment.
I did updated it a bit in a later commit since the initial version. Did you pull the latest one?
| - **ArmadaEventWatcher** — Long-lived daemon thread with `volatile running` flag; 5s join timeout on shutdown | ||
| - **PodSpecConverter** — Bidirectional Fabric8 <-> Protobuf; hardcodes None/empty for version-incompatible fields (dnsConfig, ephemeralContainers, hostUsers, os, schedulingGates) | ||
| - **Config** — 100+ entries via Spark's `ConfigBuilder` API; all prefixed `spark.armada.*` | ||
|
|
There was a problem hiding this comment.
ArmadaClientApplication class?
There was a problem hiding this comment.
added ArmadaClientApplication
CLAUDE.md
Outdated
| ## Testing Standards | ||
|
|
||
| - **Framework:** ScalaTest 3.2.16 (`AnyFunSuite` style exclusively) | ||
| - **Mocking:** Mockito 5.12 (`mock(classOf[...])`, `when(...).thenReturn(...)`) |
There was a problem hiding this comment.
there are a lot of hard coded version numbers of dependencies in this file that have been copied over from the pom file.
Wouldn't it be better to tell claude to read the pom for the versions of these dependencies?
There was a problem hiding this comment.
Yeah reffered to pom.xml. If it drifts too much, we can always ask Claude to update the CLAUDE.md.
|
I've given you a bunch of nits to cleanup but the code is basically ready. I'll approve soon |
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
|
This all looks good, but in standup we said we would make the unofficial plugins a recommendation. Once that is done, I'll approve. |
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
|
Thanks, @GeorgeJahad, for the thorough review. As we continue using Claude, we will likely identify more optimal plugins for this project. For now, these three plugins look good to me, so I recommended them as optional. |
GeorgeJahad
left a comment
There was a problem hiding this comment.
lgtm! Very good work @sudiptob2!
thanks!

Fixes G-Research/spark#161
What
Skip cancelling executor jobs that already exited successfully during Spark application shutdown.
Why
When a Spark job completes, the shutdown sequence cancels all executor jobs in Armada -- including ones that already exited successfully. This makes successful pods appear as "Cancelled" in Armada's UI and logs, misrepresenting the actual job outcome.
Changes
Tests
New unit tests verify that getActiveExecutorIds correctly excludes failed, succeeded, cancelled, and unschedulable executors, that terminal executors are cleaned from the pending set, and that terminal tracking is thread-safe.
How to verify