Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions internal/cmgr/cmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmgr

import (
"context"
"errors"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -41,8 +42,21 @@ type Cmgr interface {
// Metrics related
QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetricsReq) (*ms.QueryNodeMetricsResp, error)
QueryRuleMetrics(ctx context.Context, req *ms.QueryRuleMetricsReq) (*ms.QueryRuleMetricsResp, error)

// Storage health & maintenance. Each call surfaces the local
// SQLite store; on builds without metrics enabled, the underlying
// store is nil and these return ErrMetricsDisabled.
DBHealth(ctx context.Context) (*ms.DBHealth, error)
DBCleanup(ctx context.Context, days int) (*ms.MaintenanceResult, error)
DBVacuum(ctx context.Context) (*ms.MaintenanceResult, error)
DBTruncate(ctx context.Context, confirm string) (*ms.MaintenanceResult, error)
DBResetStats() error
}

// ErrMetricsDisabled is returned by storage-health methods when the
// MetricsStore was never opened (no upstream sync URL configured).
var ErrMetricsDisabled = errors.New("metrics store disabled")

type cmgrImpl struct {
lock sync.RWMutex
cfg *Config
Expand Down Expand Up @@ -230,3 +244,39 @@ func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *ms.QueryNodeMetri
func (cm *cmgrImpl) QueryRuleMetrics(ctx context.Context, req *ms.QueryRuleMetricsReq) (*ms.QueryRuleMetricsResp, error) {
return cm.ms.QueryRuleMetric(ctx, req)
}

func (cm *cmgrImpl) DBHealth(ctx context.Context) (*ms.DBHealth, error) {
if cm.ms == nil {
return nil, ErrMetricsDisabled
}
return cm.ms.Health(ctx)
}

func (cm *cmgrImpl) DBCleanup(ctx context.Context, days int) (*ms.MaintenanceResult, error) {
if cm.ms == nil {
return nil, ErrMetricsDisabled
}
return cm.ms.CleanupOlderThan(ctx, days)
}

func (cm *cmgrImpl) DBVacuum(ctx context.Context) (*ms.MaintenanceResult, error) {
if cm.ms == nil {
return nil, ErrMetricsDisabled
}
return cm.ms.Vacuum(ctx)
}

func (cm *cmgrImpl) DBTruncate(ctx context.Context, confirm string) (*ms.MaintenanceResult, error) {
if cm.ms == nil {
return nil, ErrMetricsDisabled
}
return cm.ms.Truncate(ctx, confirm)
}

func (cm *cmgrImpl) DBResetStats() error {
if cm.ms == nil {
return ErrMetricsDisabled
}
cm.ms.ResetStats()
return nil
}
23 changes: 21 additions & 2 deletions internal/cmgr/ms/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,23 @@ type QueryRuleMetricsResp struct {
}

func (ms *MetricsStore) AddNodeMetric(ctx context.Context, m *metric_reader.NodeMetrics) error {
defer track(&ms.stats.AddNode)()
_, err := ms.db.ExecContext(ctx, `
INSERT OR REPLACE INTO node_metrics (timestamp, cpu_usage, memory_usage, disk_usage, network_in, network_out)
VALUES (?, ?, ?, ?, ?, ?)
`, m.SyncTime.Unix(), m.CpuUsagePercent, m.MemoryUsagePercent, m.DiskUsagePercent, m.NetworkReceiveBytesRate, m.NetworkTransmitBytesRate)
return err
if err != nil {
return err
}
// INSERT OR REPLACE may collapse duplicates rather than add a row;
// the count is best-effort and is reconciled by recountRows on
// next Vacuum / Truncate / restart.
ms.nodeRows.Add(1)
return nil
}

func (ms *MetricsStore) AddRuleMetric(ctx context.Context, rm *metric_reader.RuleMetrics) error {
defer track(&ms.stats.AddRule)()
tx, err := ms.db.BeginTx(ctx, nil)
if err != nil {
return err
Expand All @@ -91,19 +100,28 @@ func (ms *MetricsStore) AddRuleMetric(ctx context.Context, rm *metric_reader.Rul
}
defer stmt.Close() //nolint:errcheck

var inserted int64
for remote, pingMetric := range rm.PingMetrics {
_, err := stmt.ExecContext(ctx, rm.SyncTime.Unix(), rm.Label, remote, pingMetric.Latency,
rm.TCPConnectionCount[remote], rm.TCPHandShakeDuration[remote], rm.TCPNetworkTransmitBytes[remote],
rm.UDPConnectionCount[remote], rm.UDPHandShakeDuration[remote], rm.UDPNetworkTransmitBytes[remote])
if err != nil {
return err
}
inserted++
}

return tx.Commit()
if err := tx.Commit(); err != nil {
return err
}
// Same caveat as AddNodeMetric: REPLACE collapses, count is
// best-effort, reconciled on Vacuum / Truncate / restart.
ms.ruleRows.Add(inserted)
return nil
}

func (ms *MetricsStore) QueryNodeMetric(ctx context.Context, req *QueryNodeMetricsReq) (*QueryNodeMetricsResp, error) {
defer track(&ms.stats.QueryNode)()
var (
rows *sql.Rows
err error
Expand Down Expand Up @@ -149,6 +167,7 @@ func (ms *MetricsStore) QueryNodeMetric(ctx context.Context, req *QueryNodeMetri
}

func (ms *MetricsStore) QueryRuleMetric(ctx context.Context, req *QueryRuleMetricsReq) (*QueryRuleMetricsResp, error) {
defer track(&ms.stats.QueryRule)()
// Bucketed mode keeps the last sample per (label, remote) inside each
// step-second window. The bytes columns are monotonic counters, so
// last-of-bucket preserves the deltas the SPA computes — averaging
Expand Down
164 changes: 164 additions & 0 deletions internal/cmgr/ms/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package ms

import (
"context"
"errors"
"os"
"time"
)

// DBHealth is the storage + latency snapshot the Settings page polls.
// Sized small on purpose: every field is cheap (atomic load or one
// PRAGMA), so the handler can re-run on every refresh without a full
// COUNT(*) scan against the live tables.
type DBHealth struct {
FileBytes int64 `json:"db_file_bytes"`
PageCount int64 `json:"db_page_count"`
PageSize int64 `json:"db_page_size"`
FreelistPages int64 `json:"db_freelist_pages"`
NodeMetricsRows int64 `json:"node_metrics_rows"`
RuleMetricsRows int64 `json:"rule_metrics_rows"`
LastRuleWriteTs int64 `json:"last_rule_write_ts"`
Stats map[string]OpStatsSnapshot `json:"stats"`
}

func (ms *MetricsStore) Health(ctx context.Context) (*DBHealth, error) {
h := &DBHealth{
NodeMetricsRows: ms.nodeRows.Load(),
RuleMetricsRows: ms.ruleRows.Load(),
Stats: ms.stats.Snapshot(),
}
if fi, err := os.Stat(ms.dbPath); err == nil {
h.FileBytes = fi.Size()
}
if err := ms.db.QueryRowContext(ctx, "PRAGMA page_count").Scan(&h.PageCount); err != nil {
return nil, err
}
if err := ms.db.QueryRowContext(ctx, "PRAGMA page_size").Scan(&h.PageSize); err != nil {
return nil, err
}
if err := ms.db.QueryRowContext(ctx, "PRAGMA freelist_count").Scan(&h.FreelistPages); err != nil {
return nil, err
}
// COALESCE keeps the JSON shape (always int64) even when the table
// is empty — caller distinguishes "never written" via the 0 value.
if err := ms.db.QueryRowContext(ctx,
"SELECT COALESCE(MAX(timestamp), 0) FROM rule_metrics").Scan(&h.LastRuleWriteTs); err != nil {
return nil, err
}
return h, nil
}

// MaintenanceResult is the common shape returned by every maintenance
// op. Fields not relevant to a given op are left zero — Vacuum doesn't
// fill in NodeDeleted, Cleanup doesn't fill in BytesBefore, etc.
type MaintenanceResult struct {
NodeDeleted int64 `json:"node_deleted,omitempty"`
RuleDeleted int64 `json:"rule_deleted,omitempty"`
BytesBefore int64 `json:"bytes_before,omitempty"`
BytesAfter int64 `json:"bytes_after,omitempty"`
DurationMs int64 `json:"duration_ms"`
}

// CleanupOlderThan deletes rows older than `days` from both metrics
// tables. days <= 0 falls back to the historical 30-day default.
func (ms *MetricsStore) CleanupOlderThan(ctx context.Context, days int) (*MaintenanceResult, error) {
defer track(&ms.stats.Cleanup)()
if days <= 0 {
days = defaultRetentionDays
}
start := time.Now()
cutoff := time.Now().AddDate(0, 0, -days).Unix()
nodeDel, ruleDel, err := ms.deleteOlderThan(cutoff)
if err != nil {
return nil, err
}
_ = ctx // ctx kept for symmetry; deleteOlderThan uses ms.db directly
return &MaintenanceResult{
NodeDeleted: nodeDel,
RuleDeleted: ruleDel,
DurationMs: time.Since(start).Milliseconds(),
}, nil
}

// Vacuum reclaims free pages, blocking other queries for the duration.
// Cheap when the db is small (current ~2.5MB → <100ms); when it grows
// past ~1GB the lock window can stretch into multi-second territory —
// the SPA documents this in the confirm copy.
func (ms *MetricsStore) Vacuum(ctx context.Context) (*MaintenanceResult, error) {
defer track(&ms.stats.Vacuum)()
start := time.Now()
before := ms.dbFileSize()
if _, err := ms.db.ExecContext(ctx, "VACUUM"); err != nil {
return nil, err
}
after := ms.dbFileSize()
if err := ms.recountRows(); err != nil {
return nil, err
}
ms.l.Infof("vacuum: %d -> %d bytes in %s", before, after, time.Since(start))
return &MaintenanceResult{
BytesBefore: before,
BytesAfter: after,
DurationMs: time.Since(start).Milliseconds(),
}, nil
}

// ErrTruncateNotConfirmed is returned by Truncate when the caller does
// not pass the exact confirm literal. The handler turns this into a
// 400 so a missing form value can never wipe live data.
var ErrTruncateNotConfirmed = errors.New("truncate requires confirm=\"yes I am sure\"")

// truncateConfirm is the literal the API requires. Plain string, not
// boolean: a defaulted JSON field (`{}` → false) must not pass; only
// an explicit, typed phrase counts.
const truncateConfirm = "yes I am sure"

// Truncate empties both metrics tables and reclaims the freelist via
// VACUUM. The confirm string must match truncateConfirm exactly.
func (ms *MetricsStore) Truncate(ctx context.Context, confirm string) (*MaintenanceResult, error) {
if confirm != truncateConfirm {
return nil, ErrTruncateNotConfirmed
}
defer track(&ms.stats.Truncate)()
start := time.Now()
before := ms.dbFileSize()
nodeBefore := ms.nodeRows.Load()
ruleBefore := ms.ruleRows.Load()
if _, err := ms.db.ExecContext(ctx, "DELETE FROM node_metrics"); err != nil {
return nil, err
}
if _, err := ms.db.ExecContext(ctx, "DELETE FROM rule_metrics"); err != nil {
return nil, err
}
if _, err := ms.db.ExecContext(ctx, "VACUUM"); err != nil {
return nil, err
}
if err := ms.recountRows(); err != nil {
return nil, err
}
after := ms.dbFileSize()
ms.l.Warnf("truncate: deleted node=%d rule=%d, %d -> %d bytes",
nodeBefore, ruleBefore, before, after)
return &MaintenanceResult{
NodeDeleted: nodeBefore,
RuleDeleted: ruleBefore,
BytesBefore: before,
BytesAfter: after,
DurationMs: time.Since(start).Milliseconds(),
}, nil
}

// ResetStats zeroes every opStats counter. Operator escape hatch when a
// one-off latency spike (e.g. cold start, paused process) has poisoned
// the running max and the page is hard to read.
func (ms *MetricsStore) ResetStats() {
ms.stats.Reset()
}

func (ms *MetricsStore) dbFileSize() int64 {
if fi, err := os.Stat(ms.dbPath); err == nil {
return fi.Size()
}
return 0
}
Loading
Loading