diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e008127f9..1fcbe9189 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -61,10 +61,10 @@ jobs: ./**/build/distributions/**/*.zip retention-days: 5 - name: Upload code coverage + if: matrix.os == 'ubuntu-22.04' uses: codecov/codecov-action@v5 with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./build/reports/jacoco/codeCoverageReport/codeCoverageReport.xml + files: /home/runner/work/opendc/opendc/build/reports/jacoco/codeCoverageReport/codeCoverageReport.xml build-docker: name: Build Docker Images runs-on: ubuntu-22.04 diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java index eddde87e7..672b21c1a 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceFlavor.java @@ -40,7 +40,8 @@ public final class ServiceFlavor implements Flavor { private final long memorySize; private final Map meta; - ServiceFlavor(ComputeService service, UUID uid, String name, int coreCount, long memorySize, Map meta) { + public ServiceFlavor( + ComputeService service, UUID uid, String name, int coreCount, long memorySize, Map meta) { this.service = service; this.uid = uid; this.name = name; diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java index 06d6535de..30b17f7d9 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java @@ -64,7 +64,7 @@ public class ServiceTask { private int numFailures = 0; - ServiceTask( + public ServiceTask( ComputeService service, UUID uid, String name, diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt index 132ad227a..6ac04ffc1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/host/SimHost.kt @@ -42,15 +42,18 @@ import java.time.Instant import java.time.InstantSource /** - * A [SimHost] implementation that simulates virtual machines on a physical machine. + * The `SimHost` class represents a simulated host that operates as a hypervisor for running virtualized tasks. + * It manages its state, running tasks, associated virtual machines, and system metrics, while providing host + * lifecycle management and resource usage computation. * - * @param name The name of the host. - * @param clock The (virtual) clock used to track time. - * @param graph The Flow Graph that the Host is part of - * @param machineModel The static model of the host - * @param cpuPowerModel The power model of the host - * @param powerDistributor The power distributor to which the host is connected - * @constructor Create empty Sim host + * @constructor Initializes the SimHost with required parameters. + * @param name The unique name of the simulated host. + * @param clusterName The cluster name to which this host belongs. + * @param clock The clock source to simulate time within the system. + * @param graph The data-flow graph used by the simulation for task operations. + * @param machineModel The hardware model attributes of the host. + * @param cpuPowerModel The CPU power characteristics of the host. + * @param powerDistributor The power distributor managing system energy allocations. */ public class SimHost( private val name: String, @@ -191,8 +194,32 @@ public class SimHost( return this.guests } + /** + * Calculates the total memory used by the currently running tasks on the host. + * + * Iterates through the tasks mapped to guests in `taskToGuestMap`. For tasks that are in the + * `TaskState.RUNNING` state, their memory consumption is summed up. + * + * @return Total memory used by tasks currently in the RUNNING state, in bytes. + */ + private fun usedMemoryByRunningTasks(): Long { + var usedMemory: Long = 0 + for (vm in this.taskToGuestMap) { + if (vm.value.state == TaskState.RUNNING) { + usedMemory += vm.key.flavor.memorySize + } + } + return usedMemory + } + + /** + * Determines if the given task can fit on this host based on its resource requirements. + * + * @param task The task to be checked. It includes the resource requirements such as memory size and core count. + * @return True if the task can fit on this host considering available memory, CPU cores, and machine model compatibility; false otherwise. + */ public fun canFit(task: ServiceTask): Boolean { - val sufficientMemory = model.memoryCapacity >= task.flavor.memorySize + val sufficientMemory = (model.memoryCapacity - this.usedMemoryByRunningTasks()) >= task.flavor.memorySize val enoughCpus = model.coreCount >= task.flavor.coreCount val canFit = simMachine!!.canFit(task.flavor.toMachineModel()) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt index 29e9d541a..cec664b68 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeMonitorProvisioningStep.kt @@ -25,6 +25,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.telemetry.ComputeMetricReader import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.OutputFiles import java.time.Duration /** @@ -36,6 +37,14 @@ public class ComputeMonitorProvisioningStep( private val monitor: ComputeMonitor, private val exportInterval: Duration, private val startTime: Duration = Duration.ofMillis(0), + private val filesToExport: Map = + mapOf( + OutputFiles.HOST to true, + OutputFiles.TASK to true, + OutputFiles.SERVICE to true, + OutputFiles.POWER_SOURCE to true, + OutputFiles.BATTERY to true, + ), ) : ProvisioningStep { override fun apply(ctx: ProvisioningContext): AutoCloseable { val service = @@ -49,6 +58,7 @@ public class ComputeMonitorProvisioningStep( monitor, exportInterval, startTime, + filesToExport, ) return metricReader } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt index 7d9cae600..c72e8944d 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/provisioner/ComputeSteps.kt @@ -26,6 +26,7 @@ package org.opendc.compute.simulator.provisioner import org.opendc.compute.simulator.scheduler.ComputeScheduler import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.OutputFiles import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.topology.specs.HostSpec import java.time.Duration @@ -59,8 +60,16 @@ public fun registerComputeMonitor( monitor: ComputeMonitor, exportInterval: Duration = Duration.ofMinutes(5), startTime: Duration = Duration.ofMillis(0), + filesToExport: Map = + mapOf( + OutputFiles.HOST to true, + OutputFiles.TASK to true, + OutputFiles.SERVICE to true, + OutputFiles.POWER_SOURCE to true, + OutputFiles.BATTERY to true, + ), ): ProvisioningStep { - return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime) + return ComputeMonitorProvisioningStep(serviceDomain, monitor, exportInterval, startTime, filesToExport) } /** diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt index 10bc889b1..91748454c 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/ComputeMetricReader.kt @@ -56,6 +56,14 @@ public class ComputeMetricReader( private val monitor: ComputeMonitor, private val exportInterval: Duration = Duration.ofMinutes(5), private val startTime: Duration = Duration.ofMillis(0), + private val toMonitor: Map = + mapOf( + OutputFiles.HOST to true, + OutputFiles.TASK to true, + OutputFiles.POWER_SOURCE to true, + OutputFiles.BATTERY to true, + OutputFiles.SERVICE to true, + ), ) : AutoCloseable { private val logger = KotlinLogging.logger {} private val scope = CoroutineScope(dispatcher.asCoroutineDispatcher()) @@ -116,31 +124,35 @@ public class ComputeMetricReader( try { val now = this.clock.instant() - for (host in this.service.hosts) { - val reader = - this.hostTableReaders.computeIfAbsent(host) { - HostTableReaderImpl( - it, - startTime, - ) - } - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.HOST] == true) { + for (host in this.service.hosts) { + val reader = + this.hostTableReaders.computeIfAbsent(host) { + HostTableReaderImpl( + it, + startTime, + ) + } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } - for (task in this.service.tasks) { - val reader = - this.taskTableReaders.computeIfAbsent(task) { - TaskTableReaderImpl( - service, - it, - startTime, - ) - } - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.TASK] == true) { + for (task in this.service.tasks) { + val reader = + this.taskTableReaders.computeIfAbsent(task) { + TaskTableReaderImpl( + service, + it, + startTime, + ) + } + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } for (task in this.service.tasksToRemove) { @@ -148,36 +160,42 @@ public class ComputeMetricReader( } this.service.clearTasksToRemove() - for (simPowerSource in this.service.powerSources) { - val reader = - this.powerSourceTableReaders.computeIfAbsent(simPowerSource) { - PowerSourceTableReaderImpl( - it, - startTime, - ) - } - - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.POWER_SOURCE] == true) { + for (simPowerSource in this.service.powerSources) { + val reader = + this.powerSourceTableReaders.computeIfAbsent(simPowerSource) { + PowerSourceTableReaderImpl( + it, + startTime, + ) + } + + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } - for (simBattery in this.service.batteries) { - val reader = - this.batteryTableReaders.computeIfAbsent(simBattery) { - BatteryTableReaderImpl( - it, - startTime, - ) - } - - reader.record(now) - this.monitor.record(reader.copy()) - reader.reset() + if (toMonitor[OutputFiles.BATTERY] == true) { + for (simBattery in this.service.batteries) { + val reader = + this.batteryTableReaders.computeIfAbsent(simBattery) { + BatteryTableReaderImpl( + it, + startTime, + ) + } + + reader.record(now) + this.monitor.record(reader.copy()) + reader.reset() + } } - this.serviceTableReader.record(now) - monitor.record(this.serviceTableReader.copy()) + if (toMonitor[OutputFiles.SERVICE] == true) { + this.serviceTableReader.record(now) + monitor.record(this.serviceTableReader.copy()) + } if (loggCounter >= 100) { var loggString = "\n\t\t\t\t\tMetrics after ${now.toEpochMilli() / 1000 / 60 / 60} hours:\n" diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt new file mode 100644 index 000000000..6b747a94f --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/OutputFiles.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.telemetry + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable +public enum class OutputFiles { + @SerialName("host") + HOST, + + @SerialName("task") + TASK, + + @SerialName("powerSource") + POWER_SOURCE, + + @SerialName("battery") + BATTERY, + + @SerialName("service") + SERVICE, +} diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt index 7d2b93633..a626c41b2 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/ParquetComputeMonitor.kt @@ -23,6 +23,7 @@ package org.opendc.compute.simulator.telemetry.parquet import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.OutputFiles import org.opendc.compute.simulator.telemetry.table.battery.BatteryTableReader import org.opendc.compute.simulator.telemetry.table.host.HostTableReader import org.opendc.compute.simulator.telemetry.table.powerSource.PowerSourceTableReader @@ -37,38 +38,38 @@ import java.io.File * A [ComputeMonitor] that logs the events to a Parquet file. */ public class ParquetComputeMonitor( - private val hostExporter: Exporter, - private val taskExporter: Exporter, - private val powerSourceExporter: Exporter, - private val batteryExporter: Exporter, - private val serviceExporter: Exporter, + private val hostExporter: Exporter?, + private val taskExporter: Exporter?, + private val powerSourceExporter: Exporter?, + private val batteryExporter: Exporter?, + private val serviceExporter: Exporter?, ) : ComputeMonitor, AutoCloseable { override fun record(reader: HostTableReader) { - hostExporter.write(reader) + hostExporter?.write(reader) } override fun record(reader: TaskTableReader) { - taskExporter.write(reader) + taskExporter?.write(reader) } override fun record(reader: PowerSourceTableReader) { - powerSourceExporter.write(reader) + powerSourceExporter?.write(reader) } override fun record(reader: BatteryTableReader) { - batteryExporter.write(reader) + batteryExporter?.write(reader) } override fun record(reader: ServiceTableReader) { - serviceExporter.write(reader) + serviceExporter?.write(reader) } override fun close() { - hostExporter.close() - taskExporter.close() - powerSourceExporter.close() - batteryExporter.close() - serviceExporter.close() + hostExporter?.close() + taskExporter?.close() + powerSourceExporter?.close() + batteryExporter?.close() + serviceExporter?.close() } public companion object { @@ -83,12 +84,14 @@ public class ParquetComputeMonitor( base: File, partition: String, bufferSize: Int, + filesToExport: Map, computeExportConfig: ComputeExportConfig, ): ParquetComputeMonitor = invoke( base = base, partition = partition, bufferSize = bufferSize, + filesToExport = filesToExport, hostExportColumns = computeExportConfig.hostExportColumns, taskExportColumns = computeExportConfig.taskExportColumns, powerSourceExportColumns = computeExportConfig.powerSourceExportColumns, @@ -109,6 +112,7 @@ public class ParquetComputeMonitor( base: File, partition: String, bufferSize: Int, + filesToExport: Map, hostExportColumns: Collection>? = null, taskExportColumns: Collection>? = null, powerSourceExportColumns: Collection>? = null, @@ -118,37 +122,72 @@ public class ParquetComputeMonitor( // Loads the fields in case they need to be retrieved if optional params are omitted. ComputeExportConfig.loadDfltColumns() - return ParquetComputeMonitor( - hostExporter = + val hostExporter = + if (filesToExport[OutputFiles.HOST] == true) { Exporter( outputFile = File(base, "$partition/host.parquet").also { it.parentFile.mkdirs() }, columns = hostExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - taskExporter = + ) + } else { + null + } + + val taskExporter = + if (filesToExport[OutputFiles.TASK] == true) { Exporter( outputFile = File(base, "$partition/task.parquet").also { it.parentFile.mkdirs() }, columns = taskExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - powerSourceExporter = + ) + } else { + null + } + + val powerSourceExporter = + if (filesToExport[OutputFiles.POWER_SOURCE] == true) { Exporter( outputFile = File(base, "$partition/powerSource.parquet").also { it.parentFile.mkdirs() }, columns = powerSourceExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - batteryExporter = + ) + } else { + null + } + + val batteryExporter = + if (filesToExport[OutputFiles.BATTERY] == true) { Exporter( outputFile = File(base, "$partition/battery.parquet").also { it.parentFile.mkdirs() }, columns = batteryExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), - serviceExporter = + ) + } else { + null + } + + val serviceExporter = + if (filesToExport[OutputFiles.SERVICE] == true) { Exporter( outputFile = File(base, "$partition/service.parquet").also { it.parentFile.mkdirs() }, columns = serviceExportColumns ?: Exportable.getAllLoadedColumns(), bufferSize = bufferSize, - ), + ) + } else { + null + } + + return ParquetComputeMonitor( + hostExporter = + hostExporter, + taskExporter = + taskExporter, + powerSourceExporter = + powerSourceExporter, + batteryExporter = + batteryExporter, + serviceExporter = + serviceExporter, ) } } diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostTest.kt new file mode 100644 index 000000000..ba68540e8 --- /dev/null +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/host/SimHostTest.kt @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.compute.simulator.host + +import io.mockk.Runs +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.common.Dispatcher +import org.opendc.compute.api.TaskState +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.service.ServiceFlavor +import org.opendc.compute.simulator.service.ServiceTask +import org.opendc.simulator.compute.cpu.CpuPowerModels +import org.opendc.simulator.compute.models.CpuModel +import org.opendc.simulator.compute.models.MachineModel +import org.opendc.simulator.compute.models.MemoryUnit +import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.Workload +import org.opendc.simulator.engine.engine.FlowEngine +import org.opendc.simulator.engine.graph.FlowDistributor +import org.opendc.simulator.engine.graph.FlowGraph +import java.time.Clock +import java.time.InstantSource +import java.util.UUID + +/** + * Test suite for the [SimHost]. + */ +internal class SimHostTest { + /** + * Initialize a SimHost for testing purposes. The dispatcher and the FlowGraph are mocked. + */ + private fun initSimHost(): SimHost { + val cpuModel = CpuModel(1, 8, 3000.0, "TEST", "TEST", "x86") + val memoryUnit = MemoryUnit("TEST", "TEST", 1000.0, 320000) + val machineModel = MachineModel(cpuModel, memoryUnit) + val dispatcher = mockk() + every { dispatcher.getTimeSource() } returns InstantSource.system() + every { dispatcher.schedule(any(), any()) } returns Unit + val engine = FlowEngine.create(dispatcher) + val engine2 = FlowEngine.create(dispatcher) + val flowGraph = mockk() + every { flowGraph.engine } returns engine + every { flowGraph.addNode(any()) } just Runs + every { flowGraph.removeNode(any()) } just Runs + every { flowGraph.addEdge(any(), any()) } returns null + val flowGraph2 = FlowGraph(engine2) + val clock = Clock.systemUTC() + val cpuPowerModel = CpuPowerModels.constant(500.0) + val flowDistributor = FlowDistributor(flowGraph2) + val host = SimHost("test", "C1", clock, flowGraph, machineModel, cpuPowerModel, flowDistributor) + return host + } + + /** + * Initialize Workload + */ + private fun initVmWorkload( + coreCount: Int = 1, + memorySize: Long = 1024, + ): ServiceTask { + val computeService = mockk() + every { computeService.clock } returns InstantSource.system() + val vmID = UUID.randomUUID() + val vmName = "name_of_a_vm" + val meta = mockk>() + val serviceFlavor = ServiceFlavor(computeService, vmID, vmName, coreCount, memorySize, meta) + val workload = mockk() + val simWorkload = mockk() + every { workload.startWorkload(any(), any()) } returns simWorkload + val serviceTask = ServiceTask(computeService, vmID, vmName, serviceFlavor, workload, meta) + return serviceTask + } + + @Test + fun testInitSimHost() { + val host = initSimHost() + assertEquals("C1", host.getClusterName()) + assertEquals( + "SimHost[uid=test,name=test,model=HostModel[cpuCapacity=24000.0, coreCount=8, memoryCapacity=320000]]", + host.toString(), + ) + assertEquals("test", host.getName()) + assertEquals("HostModel[cpuCapacity=24000.0, coreCount=8, memoryCapacity=320000]", host.getModel().toString()) + assertFalse(host.equals("TEST-TEST")) + } + + @Test + fun testGetState() { + val host = initSimHost() + assertEquals(HostState.UP, host.getState()) + } + + @Test + fun testCloseState() { + val host = initSimHost() + host.close() + assertEquals(HostState.DOWN, host.getState()) + } + + @Test + fun testFail() { + val host = initSimHost() + val serviceTask = initVmWorkload() + host.spawn(serviceTask) + host.start(serviceTask) + assertTrue(host.contains(serviceTask)) + val hostStats = host.getSystemStats() + val vms = host.getGuests() + val vm = vms[0] + assertEquals(hostStats.guestsRunning, 1) + host.fail() + assertEquals(HostState.ERROR, host.getState()) + assertEquals(TaskState.FAILED, vm.state) + } + + @Test + fun testRecoverAfterFail() { + val host = initSimHost() + val serviceTask = initVmWorkload() + host.spawn(serviceTask) + host.start(serviceTask) + assertTrue(host.contains(serviceTask)) + val hostStats = host.getSystemStats() + val vms = host.getGuests() + val vm = vms[0] + assertEquals(hostStats.guestsRunning, 1) + host.fail() + assertEquals(HostState.ERROR, host.getState()) + assertEquals(TaskState.FAILED, vm.state) + host.recover() + assertEquals(HostState.UP, host.getState()) + val hostStatsAfterRecover = host.getSystemStats() + assertEquals(hostStatsAfterRecover.guestsRunning, 0) + } + + @Test + fun testGetSystemStats() { + val host = initSimHost() + val hostStats = host.getSystemStats() + assertEquals(hostStats.powerDraw, 0.0) + assertEquals(hostStats.energyUsage, 0.0) + assertEquals(hostStats.guestsTerminated, 0) + assertEquals(hostStats.guestsRunning, 0) + assertEquals(hostStats.guestsError, 0) + assertEquals(hostStats.guestsInvalid, 0) + } + + @Test + fun testFitVmWorkloadOnSimHostWorkloadFits() { + val host = initSimHost() + val serviceTask = initVmWorkload() + val fit = host.canFit(serviceTask) + assertTrue(fit) + } + + @Test + fun testSpawnVmWorkload() { + val host = initSimHost() + val serviceTask = initVmWorkload() + host.spawn(serviceTask) + assertTrue(host.contains(serviceTask)) + val hostStats = host.getSystemStats() + assertEquals(hostStats.guestsRunning, 1) + } + + @Test + fun testFitVmWorkloadOnSimHostWorkloadDoesNotFit() { + val host = initSimHost() + val serviceTask = initVmWorkload(12) + val fit = host.canFit(serviceTask) + assertFalse(fit) + } + + @Test + fun testFitVmWorkloadOnSimHostSecondWorkloadDoesNotFit() { + val host = initSimHost() + val serviceTask1 = initVmWorkload() + val serviceTask2 = initVmWorkload(8, 320000) + assertTrue(host.canFit(serviceTask1)) + assertTrue(host.canFit(serviceTask2)) + host.spawn(serviceTask1) + host.start(serviceTask1) + assertFalse(host.canFit(serviceTask2)) + val exception = + assertThrows { + host.spawn(serviceTask2) + } + assertEquals(exception.message, "Task does not fit") + } + + @Test + fun testHostCpuStats() { + val host = initSimHost() + val serviceTask = initVmWorkload() + host.spawn(serviceTask) + host.start(serviceTask) + assertTrue(host.contains(serviceTask)) + val hostStats = host.getSystemStats() + assertEquals(hostStats.guestsRunning, 1) + val cpuStats = host.getCpuStats() + assertEquals(24000.0, cpuStats.capacity) + assertEquals(0, cpuStats.activeTime) + assertEquals(0, cpuStats.stealTime) + assertEquals(0, cpuStats.lostTime) + assertEquals(0.0, cpuStats.demand) + assertEquals(0.0, cpuStats.usage) + assertEquals(0.0, cpuStats.utilization) + assertTrue(cpuStats.idleTime > 1) + } + + @Test + fun testRemoveTask() { + val host = initSimHost() + val serviceTask = initVmWorkload() + host.spawn(serviceTask) + host.start(serviceTask) + assertTrue(host.contains(serviceTask)) + var hostStats = host.getSystemStats() + assertEquals(hostStats.guestsRunning, 1) + host.removeTask(serviceTask) + hostStats = host.getSystemStats() + assertEquals(hostStats.guestsRunning, 0) + } + + @Test + fun testStopTask() { + val host = initSimHost() + val serviceTask = initVmWorkload() + host.spawn(serviceTask) + host.start(serviceTask) + assertTrue(host.contains(serviceTask)) + var hostStats = host.getSystemStats() + val vms = host.getGuests() + val vm = vms[0] + assertEquals(hostStats.guestsRunning, 1) + host.stop(serviceTask) + hostStats = host.getSystemStats() + assertEquals(hostStats.guestsRunning, 0) + assertEquals(TaskState.COMPLETED, vm.state) + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/ExperimentFactories.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/ExperimentFactories.kt index 5d158ea31..ecb0c69f4 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/ExperimentFactories.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/ExperimentFactories.kt @@ -75,7 +75,6 @@ public fun getExperiment(experimentSpec: ExperimentSpec): List { outputFolder = outputFolder, runs = experimentSpec.runs, initialSeed = experimentSpec.initialSeed, - computeExportConfig = scenarioSpec.computeExportConfig, topologySpec = scenarioSpec.topology, workloadSpec = scenarioSpec.workload, allocationPolicySpec = scenarioSpec.allocationPolicy, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/Scenario.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/Scenario.kt index a99bd0611..64ff51ad0 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/Scenario.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/Scenario.kt @@ -22,7 +22,6 @@ package org.opendc.experiments.base.experiment -import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig import org.opendc.experiments.base.experiment.specs.AllocationPolicySpec import org.opendc.experiments.base.experiment.specs.CheckpointModelSpec import org.opendc.experiments.base.experiment.specs.ExportModelSpec @@ -50,7 +49,6 @@ public data class Scenario( val outputFolder: String = "output", val runs: Int = 1, val initialSeed: Int = 0, - val computeExportConfig: ComputeExportConfig, val topologySpec: ScenarioTopologySpec, val workloadSpec: WorkloadSpec, val allocationPolicySpec: AllocationPolicySpec, diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExperimentSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExperimentSpec.kt index 6d8c8ebf1..91d0b986e 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExperimentSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExperimentSpec.kt @@ -96,7 +96,6 @@ public data class ExperimentSpec( id, name, outputFolder, - computeExportConfig = computeExportConfig, topologyList[(i / topologyDiv) % topologyList.size], workloadList[(i / workloadDiv) % workloadList.size], allocationPolicyList[(i / allocationDiv) % allocationPolicyList.size], diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExportModelSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExportModelSpec.kt index 62f1ea4b8..a345fcb92 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExportModelSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ExportModelSpec.kt @@ -23,6 +23,8 @@ package org.opendc.experiments.base.experiment.specs import kotlinx.serialization.Serializable +import org.opendc.compute.simulator.telemetry.OutputFiles +import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig /** * specification describing how the results should be exported @@ -32,8 +34,17 @@ import kotlinx.serialization.Serializable @Serializable public data class ExportModelSpec( val exportInterval: Long = 5 * 60, + val computeExportConfig: ComputeExportConfig = ComputeExportConfig.ALL_COLUMNS, + val filesToExport: List = OutputFiles.entries.toList(), + var filesToExportDict: MutableMap = OutputFiles.entries.associateWith { false }.toMutableMap(), ) { init { require(exportInterval > 0) { "The Export interval has to be higher than 0" } + + // Create a dictionary with each output file to false. + // Set each file in [filesToExport] to true in the dictionary. + for (file in filesToExport) { + filesToExportDict[file] = true + } } } diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt index b41eb37bb..eb0d71ed9 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/ScenarioSpec.kt @@ -23,14 +23,12 @@ package org.opendc.experiments.base.experiment.specs import kotlinx.serialization.Serializable -import org.opendc.compute.simulator.telemetry.parquet.ComputeExportConfig @Serializable public data class ScenarioSpec( var id: Int = -1, var name: String = "", val outputFolder: String = "output", - val computeExportConfig: ComputeExportConfig, val topology: ScenarioTopologySpec, val workload: WorkloadSpec, val allocationPolicy: AllocationPolicySpec = AllocationPolicySpec(), diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 7cbce23a1..683bd5b66 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -147,10 +147,12 @@ public fun addExportModel( File("${scenario.outputFolder}/raw-output/$index"), "seed=$seed", bufferSize = 4096, - computeExportConfig = scenario.computeExportConfig, + scenario.exportModelSpec.filesToExportDict, + computeExportConfig = scenario.exportModelSpec.computeExportConfig, ), Duration.ofSeconds(scenario.exportModelSpec.exportInterval), startTime, + scenario.exportModelSpec.filesToExportDict, ), ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index b6d939c9b..46354d4c9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -124,6 +124,10 @@ public long onUpdate(long now) { long remainingDuration = this.scalingPolicy.getRemainingDuration( this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork); + if (remainingDuration == 0.0) { + this.remainingWork = 0.0; + } + return now + remainingDuration; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index ff7ff1996..dcbd79bbb 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -39,8 +39,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private double currentIncomingSupply; // The current supply provided by the supplier private boolean outgoingDemandUpdateNeeded = false; - private final Set updatedDemands = - new HashSet<>(); // Array of consumers that updated their demand in this cycle + private Set updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle private boolean overloaded = false; @@ -209,14 +208,18 @@ public void removeConsumerEdge(FlowEdge consumerEdge) { other.setConsumerIndex(other.getConsumerIndex() - 1); } - for (int idx_other : this.updatedDemands) { + HashSet newUpdatedDemands = new HashSet<>(); + for (int idx_other : this.updatedDemands) { if (idx_other > idx) { - this.updatedDemands.remove(idx_other); - this.updatedDemands.add(idx_other - 1); + newUpdatedDemands.add(idx_other - 1); + } else { + newUpdatedDemands.add(idx_other); } } + this.updatedDemands = newUpdatedDemands; + this.outgoingDemandUpdateNeeded = true; this.invalidate(); } diff --git a/site/docs/documentation/Input/Experiment.md b/site/docs/documentation/Input/Experiment.md index c8b96d1f0..a4212ddf6 100644 --- a/site/docs/documentation/Input/Experiment.md +++ b/site/docs/documentation/Input/Experiment.md @@ -24,7 +24,6 @@ This means that if all list based values have a single value, only one Scenario | initialSeed | integer | no | 0 | Seed used for random number generation to ensure reproducibility. | | runs | integer | no | 1 | Number of times the scenario should be run. | | exportModels | List[[ExportModel](#exportmodel)] | no | Default | Specifications for exporting data from the simulation. | -| computeExportConfig | [ComputeExportConfig](#checkpointmodel) | no | Default | The features that should be exported during the simulation | | maxNumFailures | List[integer] | no | [10] | The max number of times a task can fail before being terminated. | | topologies | List[[Topology](#topology)] | yes | N/A | List of topologies used in the scenario. | | workloads | List[[Workload](#workload)] | yes | N/A | List of workloads to be executed within the scenario. | @@ -39,9 +38,12 @@ type of each of these fields. ### ExportModel -| Variable | Type | Required? | Default | Description | -|----------------|-------|-----------|---------|---------------------------------------------| -| exportInterval | Int64 | no | 300 | The duration between two exports in seconds | +| Variable | Type | Required? | Default | Description | +|---------------------|-----------------------------------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| exportInterval | Int64 | no | 300 | The duration between two exports in seconds | +| computeExportConfig | [ComputeExportConfig](#checkpointmodel) | no | Default | The features that should be exported during the simulation | +| filesToExport | List[string] | no | all files | List of the files that should be exported during simulation. The elements should be picked from the set ("host", "task", "powerSource", "battery", "service") | + ### ComputeExportConfig diff --git a/site/docs/documentation/Input/Workload.md b/site/docs/documentation/Input/Workload.md index 5f2e61aed..b0a45942a 100644 --- a/site/docs/documentation/Input/Workload.md +++ b/site/docs/documentation/Input/Workload.md @@ -1,20 +1,20 @@ -OpenDC works with two types of traces that describe the servers that need to be run. Both traces have to be provided as +OpenDC works with two types of traces that describe the tasks that need to be run. Both traces have to be provided as parquet files. #### Task -The meta trace provides an overview of the servers: +The meta trace provides an overview of the tasks: -| Metric | Datatype | Unit | Summary | -|-----------------|----------|----------|--------------------------------------------------| -| id | string | | The id of the server | -| submission_time | int64 | datetime | The submission time of the server | -| duration | int64 | datetime | The finish time of the submission | -| cpu_count | int32 | count | The number of CPUs required to run this server | -| cpu_capacity | float64 | MHz | The amount of CPU required to run this server | -| mem_capacity | int64 | MB | The amount of memory required to run this server | +| Metric | Datatype | Unit | Summary | +|-----------------|----------|----------|------------------------------------------------| +| id | string | | The id of the server | +| submission_time | int64 | datetime | The submission time of the server | +| duration | int64 | datetime | The finish time of the submission | +| cpu_count | int32 | count | The number of CPUs required to run this task | +| cpu_capacity | float64 | MHz | The amount of CPU required to run this task | +| mem_capacity | int64 | MB | The amount of memory required to run this task | #### Fragment -The Fragment file provides information about the computational demand of each server over time: +The Fragment file provides information about the computational demand of each task over time: | Metric | Datatype | Unit | Summary | |-----------|------------|---------------|---------------------------------------------|