From 1b73711aaf2db492c084683f6decb32cb9b3ddd7 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 26 Feb 2025 21:03:43 +0400 Subject: [PATCH] update metrics --- harmony/harmonydb/sql/20230719-harmony.sql | 4 +- .../sql/20250226-harmonytask-waittime.sql | 1 + harmony/harmonytask/metrics.go | 83 ++++++++++++++++--- harmony/harmonytask/task_type_handler.go | 73 ++++++++++------ 4 files changed, 125 insertions(+), 36 deletions(-) create mode 100644 harmony/harmonydb/sql/20250226-harmonytask-waittime.sql diff --git a/harmony/harmonydb/sql/20230719-harmony.sql b/harmony/harmonydb/sql/20230719-harmony.sql index 5b3d8b265..3c4e192d9 100644 --- a/harmony/harmonydb/sql/20230719-harmony.sql +++ b/harmony/harmonydb/sql/20230719-harmony.sql @@ -18,7 +18,9 @@ CREATE TABLE harmony_task ( added_by INTEGER NOT NULL, previous_task INTEGER, name varchar(16) NOT NULL - -- retries INTEGER NOT NULL DEFAULT 0 -- added later + -- retries INTEGER NOT NULL DEFAULT 0 (added in 20240927-task-retrywait.sql) + -- wait_accumulated INTERVAL DEFAULT '0 seconds' (added in 20250226-harmonytask-waittime.sql) + ); COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.'; COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.'; diff --git a/harmony/harmonydb/sql/20250226-harmonytask-waittime.sql b/harmony/harmonydb/sql/20250226-harmonytask-waittime.sql new file mode 100644 index 000000000..4caa5ba2b --- /dev/null +++ b/harmony/harmonydb/sql/20250226-harmonytask-waittime.sql @@ -0,0 +1 @@ +ALTER TABLE harmony_task ADD COLUMN wait_accumulated INTERVAL DEFAULT '0 seconds'; diff --git a/harmony/harmonytask/metrics.go b/harmony/harmonytask/metrics.go index 76a6b2e5a..73f07b67d 100644 --- a/harmony/harmonytask/metrics.go +++ b/harmony/harmonytask/metrics.go @@ -18,16 +18,21 @@ var ( // TaskMeasures groups all harmonytask metrics. var TaskMeasures = struct { - TasksStarted *stats.Int64Measure - TasksCompleted *stats.Int64Measure - TasksFailed *stats.Int64Measure - TaskDuration promclient.Histogram - ActiveTasks *stats.Int64Measure - CpuUsage *stats.Float64Measure - GpuUsage *stats.Float64Measure - RamUsage *stats.Float64Measure - PollerIterations *stats.Int64Measure - AddedTasks *stats.Int64Measure + TasksStarted *stats.Int64Measure + TasksCompleted *stats.Int64Measure + TasksFailed *stats.Int64Measure + TaskDuration promclient.Histogram + PerTaskDuration *promclient.HistogramVec + PerTaskWaitTime *promclient.HistogramVec + PerTaskFailedDuration *promclient.HistogramVec + TaskWaitTime promclient.Histogram + TaskFailedDuration promclient.Histogram + ActiveTasks *stats.Int64Measure + CpuUsage *stats.Float64Measure + GpuUsage *stats.Float64Measure + RamUsage *stats.Float64Measure + PollerIterations *stats.Int64Measure + AddedTasks *stats.Int64Measure }{ TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless), TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless), @@ -37,6 +42,39 @@ var TaskMeasures = struct { Buckets: durationBuckets, Help: "The histogram of task durations in seconds.", }), + PerTaskDuration: promclient.NewHistogramVec( + promclient.HistogramOpts{ + Name: pre + "task_duration_seconds_per_task", + Help: "The histogram of task durations in seconds per task.", + Buckets: durationBuckets, + }, + []string{"task_name"}, // Add task_name as a label + ), + PerTaskWaitTime: promclient.NewHistogramVec( + promclient.HistogramOpts{ + Name: pre + "task_wait_duration_seconds_per_task", + Help: "The histogram of task wait durations in seconds per task.", + Buckets: durationBuckets, + }, + []string{"task_name"}, // Add task_name as a label + ), + TaskWaitTime: promclient.NewHistogram(promclient.HistogramOpts{ + Name: pre + "task_wait_duration_seconds", + Buckets: durationBuckets, + Help: "The histogram of task wait durations in seconds.", + }), + TaskFailedDuration: promclient.NewHistogram(promclient.HistogramOpts{ + Name: pre + "task_failure_duration_seconds", + Buckets: durationBuckets, + Help: "The histogram of task failure durations in seconds.", + }), + PerTaskFailedDuration: promclient.NewHistogramVec( + promclient.HistogramOpts{ + Name: pre + "task_failure_duration_seconds_per_task", + Buckets: durationBuckets, + Help: "The histogram of task failure durations in seconds per task."}, + []string{"task_name"}, + ), ActiveTasks: stats.Int64(pre+"active_tasks", "Current number of active tasks.", stats.UnitDimensionless), CpuUsage: stats.Float64(pre+"cpu_usage", "Percentage of CPU in use.", stats.UnitDimensionless), GpuUsage: stats.Float64(pre+"gpu_usage", "Percentage of GPU in use.", stats.UnitDimensionless), @@ -102,4 +140,29 @@ func init() { if err != nil { panic(err) } + + err = promclient.Register(TaskMeasures.PerTaskDuration) + if err != nil { + panic(err) + } + + err = promclient.Register(TaskMeasures.PerTaskWaitTime) + if err != nil { + panic(err) + } + + err = promclient.Register(TaskMeasures.TaskWaitTime) + if err != nil { + panic(err) + } + + err = promclient.Register(TaskMeasures.PerTaskFailedDuration) + if err != nil { + panic(err) + } + + err = promclient.Register(TaskMeasures.TaskFailedDuration) + if err != nil { + panic(err) + } } diff --git a/harmony/harmonytask/task_type_handler.go b/harmony/harmonytask/task_type_handler.go index 6db86bb23..ec0251483 100644 --- a/harmony/harmonytask/task_type_handler.go +++ b/harmony/harmonytask/task_type_handler.go @@ -236,38 +236,28 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, sectorID *abi.SectorID, w workEnd := time.Now() retryWait := time.Millisecond * 100 - { - // metrics - - _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ - tag.Upsert(taskNameTag, h.Name), - }, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis()))) - - duration := workEnd.Sub(workStart).Seconds() - TaskMeasures.TaskDuration.Observe(duration) - - if done { - _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ - tag.Upsert(taskNameTag, h.Name), - }, TaskMeasures.TasksCompleted.M(1)) - } else { - _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ - tag.Upsert(taskNameTag, h.Name), - }, TaskMeasures.TasksFailed.M(1)) - } - } + var totalWaitTime time.Duration + var failedExecutionTime time.Duration retryRecordCompletion: cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { var postedTime time.Time var retries uint - err := tx.QueryRow(`SELECT posted_time, retries FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime, &retries) + var waitAccumulated time.Duration + err := tx.QueryRow(`SELECT posted_time, wait_accumulated, retries FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime, &waitAccumulated, &retries) if err != nil { return false, fmt.Errorf("could not log completion: %w ", err) } + + currentWait := workStart.Sub(postedTime) + newWaitAccumulated := waitAccumulated + currentWait + + taskExecutionTime := workEnd.Sub(workStart) + result := "unspecified error" if done { + totalWaitTime = newWaitAccumulated _, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID) if err != nil { @@ -281,6 +271,9 @@ retryRecordCompletion: if doErr != nil { result = "error: " + doErr.Error() } + + failedExecutionTime = taskExecutionTime + var deleteTask bool if h.MaxFailures > 0 && retries >= h.MaxFailures-1 { deleteTask = true @@ -292,7 +285,7 @@ retryRecordCompletion: } // Note: Extra Info is left laying around for later review & clean-up } else { - _, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL, retries=$1, update_time=CURRENT_TIMESTAMP WHERE id=$2`, retries+1, tID) + _, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL, retries=$1, wait_accumulated=$2, update_time=CURRENT_TIMESTAMP WHERE id=$3`, retries+1, newWaitAccumulated, tID) if err != nil { return false, fmt.Errorf("could not disown failed task: %v %v", tID, err) } @@ -300,9 +293,9 @@ retryRecordCompletion: } var hid int - err = tx.QueryRow(`INSERT INTO harmony_task_history - (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result).Scan(&hid) + err = tx.QueryRow(`INSERT INTO harmony_task_history (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, + tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result).Scan(&hid) if err != nil { return false, fmt.Errorf("could not write history: %w", err) } @@ -327,6 +320,36 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.U if !cm { log.Error("Committing the task records failed") } + + { + // metrics + + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis()))) + + duration := workEnd.Sub(workStart).Seconds() + TaskMeasures.TaskDuration.Observe(duration) + + if done { + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.TasksCompleted.M(1)) + + // Record total wait time and task duration in Prometheus only for successful tasks + TaskMeasures.PerTaskDuration.WithLabelValues(h.Name).Observe(duration) + + TaskMeasures.PerTaskWaitTime.WithLabelValues(h.Name).Observe(totalWaitTime.Seconds()) + TaskMeasures.TaskWaitTime.Observe(totalWaitTime.Seconds()) + } else { + _ = stats.RecordWithTags(context.Background(), []tag.Mutator{ + tag.Upsert(taskNameTag, h.Name), + }, TaskMeasures.TasksFailed.M(1)) + + TaskMeasures.PerTaskFailedDuration.WithLabelValues(h.Name).Observe(failedExecutionTime.Seconds()) + TaskMeasures.TaskFailedDuration.Observe(failedExecutionTime.Seconds()) + } + } } func (h *taskTypeHandler) AssertMachineHasCapacity() error {