Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 103 additions & 35 deletions lib/iris/dashboard/src/components/controller/JobDetail.vue
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<script setup lang="ts">
import { ref, computed, onMounted, watch } from 'vue'
import { RouterLink } from 'vue-router'
import { controllerRpcCall } from '@/composables/useRpc'
import { controllerRpcCall, useLogServerStatsRpc } from '@/composables/useRpc'
import { useAutoRefresh } from '@/composables/useAutoRefresh'
import { stateToName, stateDisplayName } from '@/types/status'
import type {
JobStatus, TaskStatus, LaunchJobRequest, JobQuery,
GetJobStatusResponse, ListTasksResponse, ListJobsResponse,
ResourceUsage,
} from '@/types/rpc'
import { timestampMs, formatTimestamp, formatDuration, formatRelativeTime, formatBytes, formatCpuMillicores, formatDeviceConfig, bandDisplayName, bandColor } from '@/utils/formatting'
import { decodeArrowIpc } from '@/utils/arrow'
import { getLeafJobName } from '@/utils/jobTree'
import PageShell from '@/components/layout/PageShell.vue'
import StatusBadge from '@/components/shared/StatusBadge.vue'
Expand Down Expand Up @@ -45,8 +45,6 @@ const profilingTaskId = ref<string | null>(null)
const copiedName = ref(false)
const taskSearch = ref('')
const stateFilter = ref('')
const resourceMin = ref<ResourceUsage | null>(null)
const resourceMax = ref<ResourceUsage | null>(null)

type SortColumn = 'task' | 'state' | 'mem' | 'peakMem' | 'cpu' | 'duration'
type SortDir = 'asc' | 'desc'
Expand Down Expand Up @@ -100,6 +98,87 @@ async function fetchChildJobs(parentJobId: string): Promise<JobStatus[]> {
return response.jobs ?? []
}

// --- Per-task resource samples sourced from finelog stats (iris.task) ---
//
// Latest sample per task_id, scoped to this job's tasks. Drives MEM /
// PEAK MEM / CPU columns and their sort comparators. Empty until the
// stats query lands. The controller no longer populates
// TaskStatus.resource_usage, so this is the canonical source.
interface TaskStatRow {
task_id?: string
attempt_id?: number
cpu_millicores?: number
memory_mb?: number
memory_peak_mb?: number
}

function buildTaskStatsSql(taskIds: readonly string[]): string {
if (taskIds.length === 0) return ''
// QueryRequest has no param binding; manual DuckDB single-quote escape.
const list = taskIds.map(t => `'${t.replace(/'/g, "''")}'`).join(',')
return `
SELECT task_id, attempt_id, cpu_millicores, memory_mb, memory_peak_mb
FROM "iris.task"
WHERE task_id IN (${list})
QUALIFY row_number() OVER (PARTITION BY task_id ORDER BY ts DESC) = 1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 P2: This drops the attempt dimension when choosing the latest resource row. iris.task rows are per attempt, but the query keeps only the latest row per task_id and taskUsageMap then applies it to whatever the controller says is the current task state. On a retry/preemption, a previous attempt can remain the latest row until the new attempt emits a sample, so a running retry can briefly or permanently show the failed attempt’s MEM/CPU values. Please keep attempt identity in the selection, for example by returning the latest row per (task_id, attempt_id) and only mapping rows whose attempt_id matches TaskStatus.currentAttemptId.

Generated with Codex.

`.trim()
}

const { data: taskStatsData, refresh: fetchTaskStats } = useLogServerStatsRpc<{ arrowIpc?: string }>(
'Query',
() => ({ sql: buildTaskStatsSql(tasks.value.map(t => t.taskId)) }),
)

const taskUsageMap = computed<Map<string, TaskStatRow>>(() => {
const ipc = taskStatsData.value?.arrowIpc
const m = new Map<string, TaskStatRow>()
if (!ipc) return m
const rows = decodeArrowIpc(ipc).rows as TaskStatRow[]
for (const r of rows) {
if (r.task_id) m.set(r.task_id, r)
}
return m
})

function taskMemMb(taskId: string): number {
return Number(taskUsageMap.value.get(taskId)?.memory_mb ?? 0)
}
function taskPeakMemMb(taskId: string): number {
return Number(taskUsageMap.value.get(taskId)?.memory_peak_mb ?? 0)
}
function taskCpuMillicores(taskId: string): number {
return Number(taskUsageMap.value.get(taskId)?.cpu_millicores ?? 0)
}

// Min/max of the latest stat sample across currently-running tasks. Powers
// the "Live Resource Usage" panel; null when no running task has reported.
interface RunningResourceRange {
cpuMillicoresMin: number
cpuMillicoresMax: number
memoryMbMin: number
memoryMbMax: number
memoryPeakMbMax: number
}
const runningResourceRange = computed<RunningResourceRange | null>(() => {
const samples: TaskStatRow[] = []
for (const t of tasks.value) {
if (stateToName(t.state) !== 'running') continue
const r = taskUsageMap.value.get(t.taskId)
if (r) samples.push(r)
}
if (samples.length === 0) return null
const cpus = samples.map(r => Number(r.cpu_millicores ?? 0))
const mems = samples.map(r => Number(r.memory_mb ?? 0))
const peaks = samples.map(r => Number(r.memory_peak_mb ?? 0))
return {
cpuMillicoresMin: Math.min(...cpus),
cpuMillicoresMax: Math.max(...cpus),
memoryMbMin: Math.min(...mems),
memoryMbMax: Math.max(...mems),
memoryPeakMbMax: Math.max(...peaks),
}
})

async function fetchData() {
const gen = ++fetchGeneration
error.value = null
Expand All @@ -115,10 +194,15 @@ async function fetchData() {
}
job.value = jobResp.job
jobRequest.value = jobResp.request ?? null
resourceMin.value = jobResp.resourceMin ?? null
resourceMax.value = jobResp.resourceMax ?? null
tasks.value = tasksResp.tasks ?? []

// Refresh stats only once tasks are known so the SQL filter targets the
// current job's tasks. Failures here surface as zero values — never block
// the rest of the page.
if (tasks.value.length > 0) {
void fetchTaskStats()
}

const parentIds = [props.jobId, ...expandedChildJobs.value]
const childEntries = await Promise.all(
parentIds.map(async parentJobId => [parentJobId, await fetchChildJobs(parentJobId)] as const),
Expand Down Expand Up @@ -180,22 +264,6 @@ function taskDuration(t: TaskStatus): string {
return formatDuration(started, ended)
}

function formatMemMb(usage: ResourceUsage | undefined): string {
if (!usage?.memoryMb) return '-'
const mb = parseInt(usage.memoryMb, 10)
return `${mb} MB`
}

function formatPeakMemMb(usage: ResourceUsage | undefined): string {
if (!usage?.memoryPeakMb) return '-'
const mb = parseInt(usage.memoryPeakMb, 10)
return `${mb} MB`
}

function formatCpu(usage: ResourceUsage | undefined): string {
return formatCpuMillicores(usage?.cpuMillicores)
}

function taskIndex(taskId: string): string {
const last = taskId.split('/').pop()
if (!last) return '-'
Expand Down Expand Up @@ -463,13 +531,13 @@ const filteredTasks = computed(() => {
cmp = (STATE_SORT_ORDER[stateToName(a.state)] ?? 99) - (STATE_SORT_ORDER[stateToName(b.state)] ?? 99)
break
case 'mem':
cmp = (parseInt(a.resourceUsage?.memoryMb ?? '0') || 0) - (parseInt(b.resourceUsage?.memoryMb ?? '0') || 0)
cmp = taskMemMb(a.taskId) - taskMemMb(b.taskId)
break
case 'peakMem':
cmp = (parseInt(a.resourceUsage?.memoryPeakMb ?? '0') || 0) - (parseInt(b.resourceUsage?.memoryPeakMb ?? '0') || 0)
cmp = taskPeakMemMb(a.taskId) - taskPeakMemMb(b.taskId)
break
case 'cpu':
cmp = (a.resourceUsage?.cpuMillicores ?? 0) - (b.resourceUsage?.cpuMillicores ?? 0)
cmp = taskCpuMillicores(a.taskId) - taskCpuMillicores(b.taskId)
break
case 'duration':
cmp = taskDurationMs(a) - taskDurationMs(b)
Expand Down Expand Up @@ -730,7 +798,7 @@ async function handleProfile(taskId: string, profilerType: string, format: strin

<!-- Live resource usage (min/max across running tasks) -->
<div
v-if="resourceMin && resourceMax"
v-if="runningResourceRange"
class="mb-6 rounded-lg border border-surface-border bg-surface px-4 py-3"
>
<h3 class="text-xs font-semibold uppercase tracking-wider text-text-secondary mb-2">
Expand All @@ -739,19 +807,19 @@ async function handleProfile(taskId: string, profilerType: string, format: strin
<div class="grid grid-cols-1 sm:grid-cols-3 gap-2 sm:gap-4 text-sm">
<div>
<span class="text-text-muted">CPU:</span>
<span class="font-mono ml-1">{{ formatCpuMillicores(resourceMin.cpuMillicores ?? 0) }}</span>
<span class="font-mono ml-1">{{ formatCpuMillicores(runningResourceRange.cpuMillicoresMin) }}</span>
<span class="text-text-muted mx-1">&ndash;</span>
<span class="font-mono">{{ formatCpuMillicores(resourceMax.cpuMillicores ?? 0) }}</span>
<span class="font-mono">{{ formatCpuMillicores(runningResourceRange.cpuMillicoresMax) }}</span>
</div>
<div>
<span class="text-text-muted">Memory:</span>
<span class="font-mono ml-1">{{ formatBytes((resourceMin.memoryMb ? parseFloat(resourceMin.memoryMb) : 0) * 1024 * 1024) }}</span>
<span class="font-mono ml-1">{{ formatBytes(runningResourceRange.memoryMbMin * 1024 * 1024) }}</span>
<span class="text-text-muted mx-1">&ndash;</span>
<span class="font-mono">{{ formatBytes((resourceMax.memoryMb ? parseFloat(resourceMax.memoryMb) : 0) * 1024 * 1024) }}</span>
<span class="font-mono">{{ formatBytes(runningResourceRange.memoryMbMax * 1024 * 1024) }}</span>
</div>
<div v-if="resourceMax.memoryPeakMb">
<div v-if="runningResourceRange.memoryPeakMbMax">
<span class="text-text-muted">Peak Memory:</span>
<span class="font-mono ml-1">{{ formatBytes((resourceMax.memoryPeakMb ? parseFloat(resourceMax.memoryPeakMb) : 0) * 1024 * 1024) }}</span>
<span class="font-mono ml-1">{{ formatBytes(runningResourceRange.memoryPeakMbMax * 1024 * 1024) }}</span>
</div>
</div>
</div>
Expand Down Expand Up @@ -1137,13 +1205,13 @@ async function handleProfile(taskId: string, profilerType: string, format: strin
<span v-else class="text-text-muted">&mdash;</span>
</td>
<td class="hidden lg:table-cell px-2 sm:px-3 py-2 text-[13px] font-mono">
{{ formatMemMb(task.resourceUsage) }}
{{ taskMemMb(task.taskId) ? `${taskMemMb(task.taskId)} MB` : '-' }}
</td>
<td class="hidden lg:table-cell px-2 sm:px-3 py-2 text-[13px] font-mono">
{{ formatPeakMemMb(task.resourceUsage) }}
{{ taskPeakMemMb(task.taskId) ? `${taskPeakMemMb(task.taskId)} MB` : '-' }}
</td>
<td class="hidden lg:table-cell px-2 sm:px-3 py-2 text-[13px] font-mono">
{{ formatCpu(task.resourceUsage) }}
{{ formatCpuMillicores(taskCpuMillicores(task.taskId)) }}
</td>
<td class="hidden md:table-cell px-2 sm:px-3 py-2 text-[13px] font-mono text-text-secondary">
{{ formatTimestamp(task.startedAt) }}
Expand Down
22 changes: 3 additions & 19 deletions lib/iris/dashboard/src/types/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ export interface TaskStatus {
startedAt?: ProtoTimestamp
finishedAt?: ProtoTimestamp
ports?: Record<string, number>
// Worker-resident in-memory snapshot (Worker.GetTaskStatus only). The
// controller-served TaskStatus carries no resourceUsage; query the
// iris.task stats namespace via useLogServerStatsRpc for time series.
resourceUsage?: ResourceUsage
buildMetrics?: BuildMetrics
currentAttemptId?: number
attempts?: TaskAttempt[]
pendingReason?: string
canBeScheduled?: boolean
containerId?: string
resourceHistory?: ResourceUsage[]
statusTextDetailMd?: string
statusTextSummaryMd?: string
}
Expand All @@ -109,7 +111,6 @@ export interface JobStatus {
startedAt?: ProtoTimestamp
finishedAt?: ProtoTimestamp
ports?: Record<string, number>
resourceUsage?: ResourceUsage
statusMessage?: string
buildMetrics?: BuildMetrics
failureCount?: number
Expand Down Expand Up @@ -147,8 +148,6 @@ export interface ListJobsResponse {
export interface GetJobStatusResponse {
job: JobStatus
request?: LaunchJobRequest
resourceMin?: ResourceUsage
resourceMax?: ResourceUsage
}

export interface CommandEntrypoint {
Expand Down Expand Up @@ -240,19 +239,6 @@ export interface ListWorkersResponse {
hasMore: boolean
}

export interface WorkerResourceSnapshot {
timestamp?: ProtoTimestamp
hostCpuPercent?: number
memoryUsedBytes?: string
memoryTotalBytes?: string
diskUsedBytes?: string
diskTotalBytes?: string
runningTaskCount?: number
totalProcessCount?: number
netRecvBps?: string
netSentBps?: string
}

export interface WorkerTaskAttempt {
taskId: string
attempt?: TaskAttempt
Expand All @@ -267,8 +253,6 @@ export interface GetWorkerStatusResponse {
// page render on a slow LogService proxy. Fetched separately via
// LogService.FetchLogs(source="/system/worker/<worker_id>").
recentAttempts?: WorkerTaskAttempt[]
currentResources?: WorkerResourceSnapshot
resourceHistory?: WorkerResourceSnapshot[]
}

// -- Endpoints --
Expand Down
11 changes: 7 additions & 4 deletions lib/iris/src/iris/cluster/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
get_tpu_count,
is_job_finished,
)
from iris.cluster.worker.stats import TASK_STATS_NAMESPACE, IrisTaskStat
from iris.managed_thread import ManagedThread, ThreadContainer, get_thread_container
from iris.rpc import controller_pb2, job_pb2
from iris.rpc.auth import AuthTokenInjector, NullAuthInterceptor, StaticTokenProvider, TokenVerifier
Expand Down Expand Up @@ -1177,11 +1178,13 @@ def __init__(
self._remote_stats_service = StatsServiceProxy(self._log_service_address, interceptors=log_client_interceptors)

# Providers that collect logs outside the worker process push directly
# to the log server via RPC.
# to the log server via RPC. K8s pods have no worker daemon, so the
# provider also writes per-pod resource samples to iris.task itself —
# mirroring what the worker daemon does on the GCE/TPU path.
if isinstance(self._provider, K8sTaskProvider):
self._provider.log_client = LogClient.connect(
self._log_service_address, interceptors=log_client_interceptors
)
k8s_log_client = LogClient.connect(self._log_service_address, interceptors=log_client_interceptors)
self._provider.log_client = k8s_log_client
self._provider.task_stats_table = k8s_log_client.get_table(TASK_STATS_NAMESPACE, IrisTaskStat)

# Controller process logs ship to the log server via RemoteLogHandler.
self._log_client = LogClient.connect(self._log_service_address, interceptors=log_client_interceptors)
Expand Down
20 changes: 6 additions & 14 deletions lib/iris/src/iris/cluster/controller/transitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ class TaskUpdate:
new_state: int
error: str | None = None
exit_code: int | None = None
resource_usage: job_pb2.ResourceUsage | None = None
container_id: str | None = None


Expand All @@ -212,7 +211,6 @@ def task_updates_from_proto(entries) -> list[TaskUpdate]:
new_state=entry.state,
error=entry.error or None,
exit_code=entry.exit_code if entry.HasField("exit_code") else None,
resource_usage=entry.resource_usage if entry.resource_usage.ByteSize() > 0 else None,
container_id=entry.container_id or None,
)
)
Expand Down Expand Up @@ -1379,7 +1377,7 @@ def _apply_task_transitions(
prior_state = task.state

# Fast path: task already in the reported state with no new data to apply.
has_new_data = update.error is not None or update.exit_code is not None or update.resource_usage is not None
has_new_data = update.error is not None or update.exit_code is not None
if update.new_state == prior_state and not has_new_data:
continue

Expand Down Expand Up @@ -1590,15 +1588,11 @@ def apply_task_updates(self, cur: TransactionCursor, req: HeartbeatApplyRequest)
def apply_heartbeats_batch(self, cur: TransactionCursor, requests: list[HeartbeatApplyRequest]) -> list[TxResult]:
"""Apply multiple heartbeats in a single transaction.

Two-pass architecture to minimise SQL round-trips:

1. Bulk-fetch all referenced task rows, classify each update as
*steady-state* (same state, no error/exit_code) or *transition*.
2a. Batch steady-state resource_usage writes via ``executemany``.
2b. Feed only transitions through ``_apply_task_transitions``, which
retains the full state machine (retry, cascade, decommit, etc.).

Worker health updates are also batched via ``executemany``.
Bulk-fetch all referenced task rows, drop steady-state updates whose
only payload would have been (now-vestigial) resource samples, and feed
the remaining state-changing or terminal updates through
``_apply_task_transitions``. Worker health updates are batched via
``executemany``.
"""
_empty = TxResult(tasks_to_kill=set())
results: list[TxResult] = [_empty] * len(requests)
Expand Down Expand Up @@ -1648,8 +1642,6 @@ def apply_heartbeats_batch(self, cur: TransactionCursor, requests: list[Heartbea

if is_state_change or has_terminal_data:
transition_updates.append(update)
# Steady-state resource usage no longer persisted; iris.worker /
# iris.task stats namespaces own per-tick measurements.

if transition_updates:
transition_entries.append(
Expand Down
Loading
Loading