Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 91 additions & 3 deletions lib/iris/dashboard/src/components/controller/TaskDetail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import { useAutoRefresh } from '@/composables/useAutoRefresh'
import { stateToName } from '@/types/status'
import type {
TaskStatus,
TaskStatsSnapshot,
GetTaskStatusResponse,
} from '@/types/rpc'
import { timestampMs, formatBytes, formatCpuMillicores, formatDuration, formatRelativeTime } from '@/utils/formatting'
import { timestampMs, formatBytes, formatRate, formatCpuMillicores, formatDuration, formatRelativeTime } from '@/utils/formatting'

import { controllerRpcCall } from '@/composables/useRpc'
import { useProfileAction } from '@/composables/useProfileAction'
Expand Down Expand Up @@ -77,6 +78,45 @@ const memHistory = computed(() =>
(task.value?.resourceHistory ?? []).map(r => r.memoryMb ? parseFloat(r.memoryMb) : 0)
)

const statsHistory = computed(() => task.value?.taskStatsHistory ?? [])

const latestStats = computed(() => statsHistory.value[statsHistory.value.length - 1] ?? null)

// Compute per-second throughput from cumulative deltas, then smooth with a
// 5-point centred moving average to reduce reporting-interval spikes.
function throughputSeries(field: 'itemsProcessed' | 'bytesProcessed'): number[] {
const snaps = statsHistory.value
if (snaps.length < 2) return []
const raw: number[] = []
for (let i = 1; i < snaps.length; i++) {
const prev = snaps[i - 1]
const curr = snaps[i]
const dtMs = parseFloat(curr.timestampMs ?? '0') - parseFloat(prev.timestampMs ?? '0')
if (dtMs <= 0) continue
const dv = parseFloat(curr[field] ?? '0') - parseFloat(prev[field] ?? '0')
raw.push(Math.max(0, dv / (dtMs / 1000)))
}
const window = 5
return raw.map((_, i) => {
const lo = Math.max(0, i - Math.floor(window / 2))
const hi = Math.min(raw.length, lo + window)
const slice = raw.slice(lo, hi)
return slice.reduce((a, b) => a + b, 0) / slice.length
})
}

const itemsThroughput = computed(() => throughputSeries('itemsProcessed'))
const bytesThroughput = computed(() => throughputSeries('bytesProcessed'))

const itemsThroughputMax = computed(() => {
const m = Math.max(...itemsThroughput.value, 0)
return m > 0 ? m.toFixed(1) + '/s' : '0/s'
})
const bytesThroughputMax = computed(() => {
const m = Math.max(...bytesThroughput.value, 0)
return m > 0 ? formatRate(m) : '0 B/s'
})

// Use job-level resource limits for gauge totals when available.
const cpuTotal = computed(() => {
const jobCpu = (jobResources.value?.cpuMillicores ?? 0) / 1000
Expand Down Expand Up @@ -276,16 +316,64 @@ watch(() => props.taskId, async () => {
<div v-if="cpuHistory.length > 1" class="grid grid-cols-2 gap-4 mb-6">
<div class="rounded-lg border border-surface-border bg-surface p-3">
<div class="text-xs text-text-secondary mb-2">CPU %</div>
<Sparkline :data="cpuHistory" :width="200" :height="40" color="var(--color-accent, #2563eb)" />
<Sparkline :data="cpuHistory" :width="200" :height="40" fill color="var(--color-accent, #2563eb)" />
<div class="text-xs font-mono text-text-muted mt-1">{{ cpuUsed.toFixed(0) }}%</div>
</div>
<div class="rounded-lg border border-surface-border bg-surface p-3">
<div class="text-xs text-text-secondary mb-2">Memory (MB)</div>
<Sparkline :data="memHistory" :width="200" :height="40" color="var(--color-status-purple, #8b5cf6)" />
<Sparkline :data="memHistory" :width="200" :height="40" fill color="var(--color-status-purple, #8b5cf6)" />
<div class="text-xs font-mono text-text-muted mt-1">{{ memUsedMb.toFixed(0) }} MB</div>
</div>
</div>

<!-- Task stats -->
<div v-if="statsHistory.length > 0" class="grid grid-cols-2 gap-4 mb-6">
<!-- Left: status text + latest values -->
<div class="rounded-lg border border-surface-border bg-surface p-3 space-y-2">
<div v-if="task.statusMessage" class="text-xs text-text font-mono whitespace-pre-wrap break-all">{{ task.statusMessage }}</div>
<div class="text-xs text-text-muted space-y-1 mt-2">
<div>Items processed: <span class="font-mono text-text">{{ latestStats ? parseInt(latestStats.itemsProcessed ?? '0').toLocaleString() : '—' }}</span></div>
<div>Bytes processed: <span class="font-mono text-text">{{ latestStats ? formatBytes(parseFloat(latestStats.bytesProcessed ?? '0')) : '—' }}</span></div>
</div>
</div>

<!-- Right: throughput charts -->
<div class="space-y-3">
<div class="rounded-lg border border-surface-border bg-surface p-3">
<div class="text-xs text-text-secondary mb-2">Items / sec</div>
<Sparkline
v-if="itemsThroughput.length > 1"
:data="itemsThroughput"
:width="200"
:height="40"
fill
color="var(--color-accent, #2563eb)"
show-y-axis
:y-axis-top-label="itemsThroughputMax"
/>
<div class="text-xs font-mono text-text-muted mt-1">
{{ itemsThroughput.length ? itemsThroughput[itemsThroughput.length - 1].toFixed(1) + ' items/s' : '—' }}
</div>
</div>
<div class="rounded-lg border border-surface-border bg-surface p-3">
<div class="text-xs text-text-secondary mb-2">Bytes / sec</div>
<Sparkline
v-if="bytesThroughput.length > 1"
:data="bytesThroughput"
:width="200"
:height="40"
fill
color="var(--color-status-purple, #8b5cf6)"
show-y-axis
:y-axis-top-label="bytesThroughputMax"
/>
<div class="text-xs font-mono text-text-muted mt-1">
{{ bytesThroughput.length ? formatRate(bytesThroughput[bytesThroughput.length - 1]) : '—' }}
</div>
</div>
</div>
</div>

<!-- Error display -->
<div
v-if="task.error"
Expand Down
98 changes: 70 additions & 28 deletions lib/iris/dashboard/src/components/shared/Sparkline.vue
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,32 @@ const props = withDefaults(defineProps<{
width?: number
height?: number
fillColor?: string
/** Stretch the SVG to 100% of its container width. */
fill?: boolean
/** Show a max-value line and label on the right. */
showYAxis?: boolean
/** Label for the max tick. If omitted, the raw max value is shown. */
yAxisTopLabel?: string
}>(), {
color: 'var(--color-accent, #2563eb)',
width: 64,
height: 20,
})

const PAD = 1
/** Extra top padding reserved for the y-axis label. */
const LABEL_H = 11

const topPad = computed(() => props.showYAxis ? PAD + LABEL_H : PAD)

const dataMax = computed(() => {
if (!props.data || props.data.length < 1) return 0
return Math.max(...props.data)
})

const topLabel = computed(() =>
props.yAxisTopLabel ?? dataMax.value.toFixed(1)
)

/**
* Build the SVG polyline points string from the data array.
Expand All @@ -24,16 +43,15 @@ const points = computed(() => {
if (!props.data || props.data.length < 1) return ''

const data = props.data.length === 1 ? [props.data[0], props.data[0]] : props.data
const max = Math.max(...data)

const max = dataMax.value
const innerW = props.width - 2 * PAD
const innerH = props.height - 2 * PAD
const innerH = props.height - topPad.value - PAD

return data.map((v, i) => {
const x = PAD + (i / (data.length - 1)) * innerW
const y = max === 0
? PAD + innerH
: PAD + innerH - (Math.min(v, max) / max) * innerH
? topPad.value + innerH
: topPad.value + innerH - (Math.min(v, max) / max) * innerH
return `${x.toFixed(1)},${y.toFixed(1)}`
}).join(' ')
})
Expand All @@ -42,36 +60,60 @@ const points = computed(() => {
const areaPoints = computed(() => {
if (!points.value) return ''
const innerW = props.width - 2 * PAD
const innerH = props.height - 2 * PAD
const bottomRight = `${(PAD + innerW).toFixed(1)},${(PAD + innerH).toFixed(1)}`
const bottomLeft = `${PAD.toFixed(1)},${(PAD + innerH).toFixed(1)}`
const innerH = props.height - topPad.value - PAD
const bottomRight = `${(PAD + innerW).toFixed(1)},${(topPad.value + innerH).toFixed(1)}`
const bottomLeft = `${PAD.toFixed(1)},${(topPad.value + innerH).toFixed(1)}`
return `${points.value} ${bottomRight} ${bottomLeft}`
})

const hasData = computed(() => props.data && props.data.length >= 1)
</script>

<template>
<svg
<div
v-if="hasData"
class="sparkline"
:width="width"
:height="height"
:viewBox="`0 0 ${width} ${height}`"
preserveAspectRatio="none"
class="relative"
:class="fill ? 'w-full' : 'inline-block'"
:style="fill ? undefined : { width: `${width}px`, height: `${height}px` }"
>
<polygon
v-if="fillColor"
:points="areaPoints"
:fill="fillColor"
/>
<polyline
fill="none"
:stroke="color"
stroke-width="1.5"
stroke-linecap="round"
stroke-linejoin="round"
:points="points"
/>
</svg>
<svg
class="sparkline block"
:class="fill ? 'w-full' : ''"
:width="fill ? undefined : width"
:height="height"
:viewBox="`0 0 ${width} ${height}`"
preserveAspectRatio="none"
>
<!-- Y-axis: thin line across the top of the data area -->
<line
v-if="showYAxis"
:x1="PAD" :y1="topPad"
:x2="width - PAD" :y2="topPad"
stroke="currentColor"
stroke-width="0.5"
opacity="0.25"
/>

<polygon
v-if="fillColor"
:points="areaPoints"
:fill="fillColor"
/>
<polyline
fill="none"
:stroke="color"
stroke-width="1.5"
stroke-linecap="round"
stroke-linejoin="round"
:points="points"
/>
</svg>

<!-- Label rendered as HTML so it isn't stretched by the SVG scaling -->
<span
v-if="showYAxis"
class="absolute right-1 text-[9px] opacity-60 leading-none pointer-events-none"
:style="{ top: `${PAD}px` }"
>{{ topLabel }}</span>
</div>
</template>
8 changes: 8 additions & 0 deletions lib/iris/dashboard/src/types/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ export interface TaskAttempt {
isWorkerFailure?: boolean
}

export interface TaskStatsSnapshot {
itemsProcessed?: string
bytesProcessed?: string
timestampMs?: string
}

export interface TaskStatus {
taskId: string
state: string
Expand All @@ -95,6 +101,8 @@ export interface TaskStatus {
canBeScheduled?: boolean
containerId?: string
resourceHistory?: ResourceUsage[]
statusMessage?: string
taskStatsHistory?: TaskStatsSnapshot[]
}

// -- Jobs --
Expand Down
17 changes: 17 additions & 0 deletions lib/iris/src/iris/cluster/controller/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def task_to_proto(task: TaskDetailRow, worker_address: str = "") -> job_pb2.Task
proto.finished_at.CopyFrom(timestamp_to_proto(current_attempt.finished_at))
if task.container_id:
proto.container_id = task.container_id
if task.status_message:
proto.status_message = task.status_message
# For pending tasks with prior terminal attempts, surface retry context.
if task.state == job_pb2.TASK_STATE_PENDING and task.attempts and task.attempts[-1].state in TERMINAL_TASK_STATES:
last = task.attempts[-1]
Expand Down Expand Up @@ -1494,6 +1496,12 @@ def get_task_status(
"WHERE trh.task_id = ? AND trh.attempt_id = ? ORDER BY trh.id DESC LIMIT ?",
(task_id.to_wire(), task.current_attempt_id, TASK_RESOURCE_HISTORY_RETENTION),
)
stats_rows = q.raw(
"SELECT tsh.items_processed, tsh.bytes_processed, tsh.timestamp_ms "
"FROM task_stats_history tsh "
"WHERE tsh.task_id = ? ORDER BY tsh.id DESC LIMIT ?",
(task_id.to_wire(), TASK_RESOURCE_HISTORY_RETENTION),
)
jc_row = q.raw(
"SELECT jc.res_cpu_millicores, jc.res_memory_bytes, jc.res_disk_bytes, jc.res_device_json "
"FROM job_config jc WHERE jc.job_id = ?",
Expand All @@ -1509,6 +1517,15 @@ def get_task_status(
memory_peak_mb=r.memory_peak_mb,
)
)
for r in reversed(stats_rows):
proto.task_stats_history.append(
job_pb2.TaskStatsSnapshot(
items_processed=r.items_processed,
bytes_processed=r.bytes_processed,
timestamp_ms=r.timestamp_ms,
)
)

# Populate resource_usage from the latest history entry (newest is first before reversal).
if history_rows:
latest = history_rows[0]
Expand Down
12 changes: 12 additions & 0 deletions lib/iris/src/iris/rpc/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ message TaskStatus {
string container_id = 19; // Platform container identifier (Docker container ID, K8s pod name)
// Historical resource snapshots (oldest first). Populated only in GetTaskStatus detail view.
repeated ResourceUsage resource_history = 20;

// Human-readable status string reported by the task (e.g. Zephyr stage progress).
string status_message = 21;

// Historical stats snapshots (oldest first). Populated only in GetTaskStatus detail view.
repeated TaskStatsSnapshot task_stats_history = 22;
}

message TaskStatsSnapshot {
int64 items_processed = 1;
int64 bytes_processed = 2;
int64 timestamp_ms = 3;
}

// Record of a single task execution attempt
Expand Down
158 changes: 80 additions & 78 deletions lib/iris/src/iris/rpc/job_pb2.py

Large diffs are not rendered by default.

18 changes: 16 additions & 2 deletions lib/iris/src/iris/rpc/job_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class GetProcessStatusResponse(_message.Message):
def __init__(self, process_info: _Optional[_Union[ProcessInfo, _Mapping]] = ..., log_entries: _Optional[_Iterable[_Union[_logging_pb2.LogEntry, _Mapping]]] = ...) -> None: ...

class TaskStatus(_message.Message):
__slots__ = ("task_id", "state", "worker_id", "worker_address", "exit_code", "error", "started_at", "finished_at", "ports", "resource_usage", "build_metrics", "current_attempt_id", "attempts", "pending_reason", "can_be_scheduled", "container_id", "resource_history")
__slots__ = ("task_id", "state", "worker_id", "worker_address", "exit_code", "error", "started_at", "finished_at", "ports", "resource_usage", "build_metrics", "current_attempt_id", "attempts", "pending_reason", "can_be_scheduled", "container_id", "resource_history", "status_message", "task_stats_history")
class PortsEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -366,6 +366,8 @@ class TaskStatus(_message.Message):
CAN_BE_SCHEDULED_FIELD_NUMBER: _ClassVar[int]
CONTAINER_ID_FIELD_NUMBER: _ClassVar[int]
RESOURCE_HISTORY_FIELD_NUMBER: _ClassVar[int]
STATUS_MESSAGE_FIELD_NUMBER: _ClassVar[int]
TASK_STATS_HISTORY_FIELD_NUMBER: _ClassVar[int]
task_id: str
state: TaskState
worker_id: str
Expand All @@ -383,7 +385,19 @@ class TaskStatus(_message.Message):
can_be_scheduled: bool
container_id: str
resource_history: _containers.RepeatedCompositeFieldContainer[ResourceUsage]
def __init__(self, task_id: _Optional[str] = ..., state: _Optional[_Union[TaskState, str]] = ..., worker_id: _Optional[str] = ..., worker_address: _Optional[str] = ..., exit_code: _Optional[int] = ..., error: _Optional[str] = ..., started_at: _Optional[_Union[_time_pb2.Timestamp, _Mapping]] = ..., finished_at: _Optional[_Union[_time_pb2.Timestamp, _Mapping]] = ..., ports: _Optional[_Mapping[str, int]] = ..., resource_usage: _Optional[_Union[ResourceUsage, _Mapping]] = ..., build_metrics: _Optional[_Union[BuildMetrics, _Mapping]] = ..., current_attempt_id: _Optional[int] = ..., attempts: _Optional[_Iterable[_Union[TaskAttempt, _Mapping]]] = ..., pending_reason: _Optional[str] = ..., can_be_scheduled: _Optional[bool] = ..., container_id: _Optional[str] = ..., resource_history: _Optional[_Iterable[_Union[ResourceUsage, _Mapping]]] = ...) -> None: ...
status_message: str
task_stats_history: _containers.RepeatedCompositeFieldContainer[TaskStatsSnapshot]
def __init__(self, task_id: _Optional[str] = ..., state: _Optional[_Union[TaskState, str]] = ..., worker_id: _Optional[str] = ..., worker_address: _Optional[str] = ..., exit_code: _Optional[int] = ..., error: _Optional[str] = ..., started_at: _Optional[_Union[_time_pb2.Timestamp, _Mapping]] = ..., finished_at: _Optional[_Union[_time_pb2.Timestamp, _Mapping]] = ..., ports: _Optional[_Mapping[str, int]] = ..., resource_usage: _Optional[_Union[ResourceUsage, _Mapping]] = ..., build_metrics: _Optional[_Union[BuildMetrics, _Mapping]] = ..., current_attempt_id: _Optional[int] = ..., attempts: _Optional[_Iterable[_Union[TaskAttempt, _Mapping]]] = ..., pending_reason: _Optional[str] = ..., can_be_scheduled: _Optional[bool] = ..., container_id: _Optional[str] = ..., resource_history: _Optional[_Iterable[_Union[ResourceUsage, _Mapping]]] = ..., status_message: _Optional[str] = ..., task_stats_history: _Optional[_Iterable[_Union[TaskStatsSnapshot, _Mapping]]] = ...) -> None: ...

class TaskStatsSnapshot(_message.Message):
__slots__ = ("items_processed", "bytes_processed", "timestamp_ms")
ITEMS_PROCESSED_FIELD_NUMBER: _ClassVar[int]
BYTES_PROCESSED_FIELD_NUMBER: _ClassVar[int]
TIMESTAMP_MS_FIELD_NUMBER: _ClassVar[int]
items_processed: int
bytes_processed: int
timestamp_ms: int
def __init__(self, items_processed: _Optional[int] = ..., bytes_processed: _Optional[int] = ..., timestamp_ms: _Optional[int] = ...) -> None: ...

class TaskAttempt(_message.Message):
__slots__ = ("attempt_id", "worker_id", "state", "exit_code", "error", "started_at", "finished_at", "is_worker_failure")
Expand Down
Loading
Loading