Skip to content

Commit d9c3665

Browse files
authored
Use the new Armada Scala Client job watcher API method (#95)
Replace the local logic for watching a job with the new `jobWatcher()` method from the Armada Scala Client. Also, add the `-p` option to the `nc` command invocation for checking for server reachability/readiness - this fixes an issue with running in C3 where some older versions of `nc` did not correctly use the given port number - with this, the `nc` listener process we run works with both newer and older versions of nc. --------- Signed-off-by: Rich Scott <richscott@sent.com>
1 parent 607ac5d commit d9c3665

File tree

5 files changed

+10
-51
lines changed

5 files changed

+10
-51
lines changed

.github/workflows/build-snapshots.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
matrix:
1515
include:
1616
- scala_version: "2.13.16"
17-
spark_version: "4.1.0-SNAPSHOT"
17+
spark_version: "4.1.2-SNAPSHOT"
1818
java_version: "17"
1919

2020
steps:
@@ -25,4 +25,4 @@ jobs:
2525
with:
2626
spark_version: ${{ matrix.spark_version }}
2727
scala_version: ${{ matrix.scala_version }}
28-
java_version: ${{ matrix.java_version }}
28+
java_version: ${{ matrix.java_version }}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@
153153
<dependency>
154154
<groupId>io.armadaproject</groupId>
155155
<artifactId>armada-scala-client_${scala.binary.version}</artifactId>
156-
<version>0.2.0</version>
156+
<version>0.3.0</version>
157157
</dependency>
158158

159159
<!-- Test dependencies -->

src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import api.submit.JobCancelRequest
2727
import io.armadaproject.armada.ArmadaClient
2828
import io.grpc.netty.NettyChannelBuilder
2929
import io.grpc.stub.MetadataUtils
30-
import io.grpc.{ManagedChannel, ManagedChannelBuilder, Metadata}
30+
import io.grpc.Metadata
3131

3232
import org.apache.spark.SparkContext
3333
import org.apache.spark.deploy.armada.Config._
@@ -85,7 +85,6 @@ private[spark] class ArmadaClusterManagerBackend(
8585
}
8686

8787
/** Armada connection details */
88-
private var grpcChannel: Option[ManagedChannel] = None
8988
private var armadaClient: Option[ArmadaClient] = None
9089
private var eventWatcher: Option[ArmadaEventWatcher] = None
9190
private var queue: Option[String] = None
@@ -207,10 +206,8 @@ private[spark] class ArmadaClusterManagerBackend(
207206
channelBuilderWithTls.build()
208207
}
209208

210-
grpcChannel = Some(channel)
211-
212209
// Start event watcher
213-
val watcher = new ArmadaEventWatcher(channel, q, jsId, this, jobIdToExecutor, client)
210+
val watcher = new ArmadaEventWatcher(q, jsId, this, jobIdToExecutor, client)
214211
eventWatcher = Some(watcher)
215212
watcher.start()
216213
logInfo(s"Armada Event Watcher started for queue=$q jobSetId=$jsId at $host:$port")
@@ -262,15 +259,6 @@ private[spark] class ArmadaClusterManagerBackend(
262259
ThreadUtils.shutdown(executorService)
263260
}
264261

265-
// Shutdown gRPC channel
266-
Utils.tryLogNonFatalError {
267-
grpcChannel.foreach { ch =>
268-
ch.shutdown()
269-
ch.awaitTermination(5, TimeUnit.SECONDS)
270-
}
271-
}
272-
273-
grpcChannel = None
274262
eventWatcher = None
275263
armadaClient = None
276264
executorAllocator = None

src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaEventWatcher.scala

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import scala.concurrent.Await
2323
import scala.concurrent.duration._
2424

2525
import api.event.{
26-
EventGrpc,
27-
WatchRequest,
2826
EventMessage,
2927
JobSubmittedEvent,
3028
JobQueuedEvent,
@@ -35,16 +33,14 @@ import api.event.{
3533
JobCancelledEvent,
3634
JobPreemptingEvent,
3735
JobPreemptedEvent,
38-
JobTerminatedEvent,
39-
JobUnableToScheduleEvent
36+
JobTerminatedEvent
4037
}
4138
import io.grpc.ManagedChannel
4239
import org.apache.spark.internal.Logging
4340

4441
/** Keeps track of the armada event stream for the jobset
4542
*/
4643
private[spark] class ArmadaEventWatcher(
47-
grpcChannel: ManagedChannel,
4844
queue: String,
4945
jobSetId: String,
5046
backend: ArmadaClusterManagerBackend,
@@ -89,17 +85,11 @@ private[spark] class ArmadaEventWatcher(
8985

9086
while (running) {
9187
try {
92-
logInfo(s"Connecting to Armada event stream: queue=$queue, jobSetId=$jobSetId")
88+
val watcher = submitClient.jobWatcher(queue, jobSetId, lastMessageId)
9389

94-
val eventStub = EventGrpc.blockingStub(grpcChannel)
95-
val watchRequest = WatchRequest(queue = queue, jobSetId = jobSetId, fromId = lastMessageId)
96-
97-
logInfo("Connected to Armada event stream (blocking)")
98-
99-
val iterator = eventStub.watch(watchRequest)
100-
while (running && iterator.hasNext) {
90+
while (running && watcher.hasNext) {
10191
try {
102-
val streamMessage = iterator.next()
92+
val streamMessage = watcher.next()
10393
if (streamMessage != null) {
10494
lastMessageId = streamMessage.id
10595
streamMessage.message.foreach(processEventMessage)
@@ -183,9 +173,6 @@ private[spark] class ArmadaEventWatcher(
183173
case EventMessage.Events.Preempted(event) =>
184174
handlePreempted(event)
185175

186-
case EventMessage.Events.UnableToSchedule(event) =>
187-
handleUnableToSchedule(event)
188-
189176
case EventMessage.Events.Leased(_) =>
190177
logInfo("Job leased event received")
191178

@@ -311,20 +298,4 @@ private[spark] class ArmadaEventWatcher(
311298
}
312299
}
313300

314-
private def handleUnableToSchedule(event: JobUnableToScheduleEvent): Unit = {
315-
val jobId = event.jobId
316-
val executorId = jobIdToExecutor.get(jobId)
317-
if (executorId != null) {
318-
val reason = Option(event.reason).filter(_.nonEmpty).getOrElse("Unable to schedule")
319-
val nodeName =
320-
Option(event.nodeName).filter(_.nonEmpty).map(n => s" on node $n").getOrElse("")
321-
val podName = Option(event.podName).filter(_.nonEmpty).map(p => s" (pod: $p)").getOrElse("")
322-
val fullReason = s"$reason$nodeName$podName"
323-
logWarning(s"Executor $executorId (job $jobId) unable to schedule: $fullReason")
324-
backend.onExecutorUnableToSchedule(jobId, executorId, fullReason)
325-
} else {
326-
logInfo(s"Received unable to schedule event for unknown job $jobId")
327-
}
328-
}
329-
330301
}

src/test/scala/org/apache/spark/deploy/armada/submit/ArmadaUtilsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private class ArmadaUtilsSuite
4343
test("Confirm initContainer sh command succeeds with server") {
4444
// start server
4545
val serverPort = "54525"
46-
val serverCommand = Seq("nc", "-l", serverPort)
46+
val serverCommand = Seq("nc", "-l", "-p", serverPort)
4747
val server = Process.apply(serverCommand).run
4848

4949
// start client

0 commit comments

Comments
 (0)