Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion infra/status-page/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ down by in-flight builds.
The Probes panel renders the synthetic-canary telemetry the
`infra/probes/` daemon writes to the finelog `infra.canary.metrics`
namespace (one flat `{metric, value, labels, collected_at}` row per
sample). Two bounded DuckDB queries run against the **active
sample). Two bounded SQL queries (Apache DataFusion, finelog's read
engine — note: no JSON functions, so labels are decoded app-side) run
against the **active
environment's** finelog log-server through its `StatsService.Query`
Connect RPC — the same JSON-over-HTTP shape the controller's
`ExecuteRawQuery` uses, except the result is an Arrow IPC stream, which
Expand Down
30 changes: 19 additions & 11 deletions infra/status-page/server/sources/probes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ interface MetricRow {
collected_ms: number;
}

// Health-check rows: the probe label is projected into its own column by
// checksSql, so there is no labels blob to decode here.
// Health-check rows: same flat shape as MetricRow. The probe name lives in
// the labels blob (`{"probe": "..."}`) and is decoded in JS — finelog's
// DataFusion engine has no JSON functions to slice it server-side.
interface CheckRow {
probe: string;
labels: string;
metric: string;
value: number;
collected_ms: number;
Expand All @@ -115,28 +116,33 @@ function emptyProvisioning(): ProvisioningSnapshot {
return { windowHours: null, collectedAt: null, fleet: null, pools: [] };
}

// SQL is Apache DataFusion (finelog's read engine), NOT DuckDB: no JSON
// functions (labels are decoded in JS), and timestamps are read out as epoch
// millis via arrow_cast(...,'Int64') so Arrow hands back a plain integer. The
// labels blob doubles as the per-probe partition key — each health check
// emits exactly `{"probe": "<name>"}`, so one labels value maps to one probe.
const checksSql = (cutoff: string) => `
WITH recent AS (
SELECT
json_extract_string(labels, '$.probe') AS probe,
labels,
metric,
value,
collected_at,
ROW_NUMBER() OVER (
PARTITION BY json_extract_string(labels, '$.probe'), metric
PARTITION BY labels, metric
ORDER BY collected_at DESC
) AS rn
FROM "${METRICS_NAMESPACE}"
WHERE metric IN ('${METRIC_UP}', '${METRIC_LATENCY_MS}')
AND collected_at >= TIMESTAMP '${cutoff}'
)
SELECT probe, metric, value::DOUBLE AS value, epoch_ms(collected_at)::BIGINT AS collected_ms
SELECT labels, metric, value, arrow_cast(collected_at, 'Int64') AS collected_ms

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Convert DataFusion timestamps down to milliseconds

When this query runs against finelog, collected_at is stored as Arrow Timestamp(Microsecond, None) (lib/finelog/rust/src/store/schema.rs documents the microsecond storage unit), so arrow_cast(collected_at, 'Int64') yields microseconds, not epoch milliseconds. The parsed rows are later passed directly to new Date(...), so every probe/provisioning freshness timestamp will be about 1000x too large and render as a far-future/negative-relative time; the same cast in provisioningSql has the same effect.

Useful? React with 👍 / 👎.

FROM recent
WHERE rn = 1 AND probe IS NOT NULL
WHERE rn = 1
`;

const provisioningSql = (cutoff: string) => `
SELECT metric, value::DOUBLE AS value, labels, epoch_ms(collected_at)::BIGINT AS collected_ms
SELECT metric, value, labels, arrow_cast(collected_at, 'Int64') AS collected_ms
FROM "${METRICS_NAMESPACE}"
WHERE metric LIKE '${PROVISION_PREFIX}%'
AND collected_at >= TIMESTAMP '${cutoff}'
Expand All @@ -157,7 +163,7 @@ function asMetricRows(rows: Record<string, unknown>[]): MetricRow[] {

function asCheckRows(rows: Record<string, unknown>[]): CheckRow[] {
return rows.map((r) => ({
probe: String(r.probe),
labels: String(r.labels ?? "{}"),
metric: String(r.metric),
value: Number(r.value),
collected_ms: Number(r.collected_ms),
Expand All @@ -172,8 +178,10 @@ function parseChecks(rows: CheckRow[]): ProbeCheck[] {
const up = new Map<string, { value: number; collectedMs: number }>();
const latency = new Map<string, number>();
for (const row of rows) {
if (row.metric === METRIC_UP) up.set(row.probe, { value: row.value, collectedMs: row.collected_ms });
else if (row.metric === METRIC_LATENCY_MS) latency.set(row.probe, row.value);
const probe = safeLabels(row.labels).probe;
if (!probe) continue;
if (row.metric === METRIC_UP) up.set(probe, { value: row.value, collectedMs: row.collected_ms });
else if (row.metric === METRIC_LATENCY_MS) latency.set(probe, row.value);
}
return [...up.entries()]
.map(([probe, { value, collectedMs }]) => ({
Expand Down
Loading