Skip to content

Support independent CPU and GPU workloads on the same node #331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions buildSrc/src/main/kotlin/testing-conventions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ plugins {
tasks.test {
useJUnitPlatform()

minHeapSize = "512m"
maxHeapSize = "3072m"
jvmArgs = listOf("-XX:MaxMetaspaceSize=512m")

reports {
html.required.set(true)
junitXml.required.set(true)
Expand All @@ -42,26 +46,26 @@ dependencies {
testRuntimeOnly(versionCatalog["junit.jupiter.engine"])
}

tasks.register<Test>("testsOn18") {
javaLauncher.set(javaToolchains.launcherFor {
languageVersion.set(JavaLanguageVersion.of(18))
})

useJUnitPlatform()

minHeapSize = "512m"
maxHeapSize = "1024m"
jvmArgs = listOf("-XX:MaxMetaspaceSize=512m")
}

tasks.register<Test>("testsOn19") {
javaLauncher.set(javaToolchains.launcherFor {
languageVersion.set(JavaLanguageVersion.of(19))
})

useJUnitPlatform()

minHeapSize = "512m"
maxHeapSize = "1024m"
jvmArgs = listOf("-XX:MaxMetaspaceSize=512m")
}
//tasks.register<Test>("testsOn18") {
// javaLauncher.set(javaToolchains.launcherFor {
// languageVersion.set(JavaLanguageVersion.of(18))
// })
//
// useJUnitPlatform()
//
// minHeapSize = "512m"
// maxHeapSize = "1024m"
// jvmArgs = listOf("-XX:MaxMetaspaceSize=512m")
//}
//
//tasks.register<Test>("testsOn19") {
// javaLauncher.set(javaToolchains.launcherFor {
// languageVersion.set(JavaLanguageVersion.of(19))
// })
//
// useJUnitPlatform()
//
// minHeapSize = "512m"
// maxHeapSize = "1024m"
// jvmArgs = listOf("-XX:MaxMetaspaceSize=512m")
//}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class SimHost(
private val engine: FlowEngine,
private val machineModel: MachineModel,
private val cpuPowerModel: CpuPowerModel,
private val accelPowerModel: CpuPowerModel,
private val embodiedCarbon: Double,
private val expectedLifetime: Double,
private val powerDistributor: FlowDistributor,
Expand Down Expand Up @@ -136,6 +137,7 @@ public class SimHost(
this.machineModel,
this.powerDistributor,
this.cpuPowerModel,
this.accelPowerModel,
) { cause ->
hostState = if (cause != null) HostState.ERROR else HostState.DOWN
}
Expand Down Expand Up @@ -340,6 +342,11 @@ public class SimHost(
return guest.getCpuStats()
}

public fun getAccelStats(task: ServiceTask): GuestCpuStats {
val guest = requireNotNull(taskToGuestMap[task]) { "Unknown task ${task.name} at host $name" }
return guest.getAccelStats()
}

override fun hashCode(): Int = name.hashCode()

override fun equals(other: Any?): Boolean {
Expand All @@ -352,7 +359,11 @@ public class SimHost(
* Convert flavor to machine model.
*/
private fun Flavor.toMachineModel(): MachineModel {
return MachineModel(simMachine!!.machineModel.cpuModel, MemoryUnit("Generic", "Generic", 3200.0, memorySize))
return MachineModel(
simMachine!!.machineModel.cpuModel,
simMachine!!.machineModel.accelModel,
MemoryUnit("Generic", "Generic", 3200.0, memorySize),
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,25 @@ public class Guest(
)
}

/**
* Obtain the GPU statistics of this guest.
*/
public fun getAccelStats(): GuestCpuStats {
virtualMachine!!.updateCounters(this.clock.millis())
val counters = virtualMachine!!.performanceCounters

return GuestCpuStats(
counters.cpuActiveTime / 1000L,
counters.cpuIdleTime / 1000L,
counters.cpuStealTime / 1000L,
counters.cpuLostTime / 1000L,
counters.cpuCapacity,
counters.cpuSupply,
counters.cpuDemand,
counters.cpuSupply / cpuLimit,
)
}

/**
* Helper function to track the uptime and downtime of the guest.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class HostsProvisioningStep internal constructor(
engine,
hostSpec.model,
hostSpec.cpuPowerModel,
hostSpec.accelPowerModel,
hostSpec.embodiedCarbon,
hostSpec.expectedLifetime,
hostDistributor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ public interface TaskTableReader : Exportable {
*/
public val cpuDemand: Double

/**
* The GPU given to this task (in MHz).
*/
public val accelUsage: Double

/**
* The GPU demanded by this task (in MHz).
*/
public val accelDemand: Double

/**
* The duration (in seconds) that a CPU was active in the task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ public class TaskTableReaderImpl(
get() = _cpuDemand
private var _cpuDemand = 0.0

override val accelUsage: Double
get() = _accelUsage
private var _accelUsage = 0.0

override val accelDemand: Double
get() = _accelDemand
private var _accelDemand = 0.0

override val cpuActiveTime: Long
get() = _cpuActiveTime - previousCpuActiveTime
private var _cpuActiveTime = 0L
Expand Down Expand Up @@ -186,6 +194,7 @@ public class TaskTableReaderImpl(
}

val cpuStats = _host?.getCpuStats(task)
val accelStats = _host?.getAccelStats(task)
val sysStats = _host?.getSystemStats(task)

_timestamp = now
Expand All @@ -194,6 +203,8 @@ public class TaskTableReaderImpl(
_cpuLimit = cpuStats?.capacity ?: 0.0
_cpuDemand = cpuStats?.demand ?: 0.0
_cpuUsage = cpuStats?.usage ?: 0.0
_accelDemand = accelStats?.demand ?: 0.0
_accelUsage = accelStats?.usage ?: 0.0
_cpuActiveTime = cpuStats?.activeTime ?: _cpuActiveTime
_cpuIdleTime = cpuStats?.idleTime ?: _cpuIdleTime
_cpuStealTime = cpuStats?.stealTime ?: _cpuStealTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,51 @@ private fun HostJSONSpec.toHostSpec(clusterName: String): HostSpec {
)
}

val accelUnits =
if (accel == null) {
List(1) {
CpuModel(
globalCoreId++,
0,
0.0,
)
}
} else {
List(accel.count) {
CpuModel(
globalCoreId++,
accel.coreCount,
accel.coreSpeed.toMHz(),
)
}
}

val unknownMemoryUnit = MemoryUnit(memory.vendor, memory.modelName, memory.memorySpeed.toMHz(), memory.memorySize.toMiB().toLong())
val machineModel =
MachineModel(
units,
accelUnits,
unknownMemoryUnit,
)

val powerModel =
getPowerModel(powerModel.modelType, powerModel.power.toWatts(), powerModel.maxPower.toWatts(), powerModel.idlePower.toWatts())

val accelPowerModel =
getPowerModel(
accelPowerModel.modelType,
accelPowerModel.power.toWatts(),
accelPowerModel.maxPower.toWatts(),
accelPowerModel.idlePower.toWatts(),
)

val hostSpec =
HostSpec(
createUniqueName(this.name, hostNames),
clusterName,
machineModel,
powerModel,
accelPowerModel,
)
return hostSpec
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public data class HostSpec(
val clusterName: String,
val model: MachineModel,
val cpuPowerModel: CpuPowerModel,
val accelPowerModel: CpuPowerModel,
val embodiedCarbon: Double = 1000.0,
val expectedLifetime: Double = 5.0,
)
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ public data class ClusterJSONSpec(
public data class HostJSONSpec(
val name: String = "Host",
val cpu: CPUJSONSpec,
val accel: CPUJSONSpec? = null,
val memory: MemoryJSONSpec,
val powerModel: PowerModelSpec = PowerModelSpec.DFLT,
val accelPowerModel: PowerModelSpec = PowerModelSpec.NONE,
val count: Int = 1,
)

Expand Down Expand Up @@ -139,6 +141,14 @@ public data class PowerModelSpec(
maxPower = Power.ofWatts(400.0),
idlePower = Power.ofWatts(200.0),
)

public val NONE: PowerModelSpec =
PowerModelSpec(
modelType = "constant",
power = Power.ofWatts(0),
maxPower = Power.ofWatts(0),
idlePower = Power.ofWatts(0),
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.opendc.trace.conv.resourceMemCapacity
import org.opendc.trace.conv.resourceNature
import org.opendc.trace.conv.resourceStateCpuUsage
import org.opendc.trace.conv.resourceStateDuration
import org.opendc.trace.conv.resourceStateIsGpu
import org.opendc.trace.conv.resourceSubmissionTime
import java.io.File
import java.lang.ref.SoftReference
Expand Down Expand Up @@ -79,6 +80,7 @@ public class ComputeWorkloadLoader(
val durationCol = reader.resolve(resourceStateDuration)
val coresCol = reader.resolve(resourceCpuCount)
val usageCol = reader.resolve(resourceStateCpuUsage)
val isGpuCol = reader.resolve(resourceStateIsGpu)

val fragments = mutableMapOf<String, Builder>()

Expand All @@ -88,12 +90,13 @@ public class ComputeWorkloadLoader(
val durationMs = reader.getDuration(durationCol)!!
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)
val isGpu = reader.getBoolean(isGpuCol)

val builder =
fragments.computeIfAbsent(
id,
) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy, id) }
builder.add(durationMs, cpuUsage, cores)
builder.add(durationMs, cpuUsage, cores, isGpu)
}

fragments
Expand Down Expand Up @@ -231,10 +234,11 @@ public class ComputeWorkloadLoader(
duration: Duration,
usage: Double,
cores: Int,
isGpu: Boolean,
) {
totalLoad += (usage * duration.toMillis()) / 1000 // avg MHz * duration = MFLOPs

builder.add(duration.toMillis(), usage, cores)
builder.add(duration.toMillis(), usage, cores, isGpu)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public suspend fun ComputeService.replay(
}

val workload = entry.trace
val meta = mutableMapOf<String, Any>("workload" to workload)
// val meta = mutableMapOf<String, Any>("workload" to workload)
val meta = mutableMapOf<String, Any>()

val nature =
if (entry.nature == "deferrable") {
Expand Down
Loading