Skip to content

Commit

Permalink
update metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Feb 26, 2025
1 parent 43a3c22 commit 1b73711
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 36 deletions.
4 changes: 3 additions & 1 deletion harmony/harmonydb/sql/20230719-harmony.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ CREATE TABLE harmony_task (
added_by INTEGER NOT NULL,
previous_task INTEGER,
name varchar(16) NOT NULL
-- retries INTEGER NOT NULL DEFAULT 0 -- added later
-- retries INTEGER NOT NULL DEFAULT 0 (added in 20240927-task-retrywait.sql)
-- wait_accumulated INTERVAL DEFAULT '0 seconds' (added in 20250226-harmonytask-waittime.sql)

);
COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.';
COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.';
Expand Down
1 change: 1 addition & 0 deletions harmony/harmonydb/sql/20250226-harmonytask-waittime.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE harmony_task ADD COLUMN wait_accumulated INTERVAL DEFAULT '0 seconds';
83 changes: 73 additions & 10 deletions harmony/harmonytask/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ var (

// TaskMeasures groups all harmonytask metrics.
var TaskMeasures = struct {
TasksStarted *stats.Int64Measure
TasksCompleted *stats.Int64Measure
TasksFailed *stats.Int64Measure
TaskDuration promclient.Histogram
ActiveTasks *stats.Int64Measure
CpuUsage *stats.Float64Measure
GpuUsage *stats.Float64Measure
RamUsage *stats.Float64Measure
PollerIterations *stats.Int64Measure
AddedTasks *stats.Int64Measure
TasksStarted *stats.Int64Measure
TasksCompleted *stats.Int64Measure
TasksFailed *stats.Int64Measure
TaskDuration promclient.Histogram
PerTaskDuration *promclient.HistogramVec
PerTaskWaitTime *promclient.HistogramVec
PerTaskFailedDuration *promclient.HistogramVec
TaskWaitTime promclient.Histogram
TaskFailedDuration promclient.Histogram
ActiveTasks *stats.Int64Measure
CpuUsage *stats.Float64Measure
GpuUsage *stats.Float64Measure
RamUsage *stats.Float64Measure
PollerIterations *stats.Int64Measure
AddedTasks *stats.Int64Measure
}{
TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless),
TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless),
Expand All @@ -37,6 +42,39 @@ var TaskMeasures = struct {
Buckets: durationBuckets,
Help: "The histogram of task durations in seconds.",
}),
PerTaskDuration: promclient.NewHistogramVec(
promclient.HistogramOpts{
Name: pre + "task_duration_seconds_per_task",
Help: "The histogram of task durations in seconds per task.",
Buckets: durationBuckets,
},
[]string{"task_name"}, // Add task_name as a label
),
PerTaskWaitTime: promclient.NewHistogramVec(
promclient.HistogramOpts{
Name: pre + "task_wait_duration_seconds_per_task",
Help: "The histogram of task wait durations in seconds per task.",
Buckets: durationBuckets,
},
[]string{"task_name"}, // Add task_name as a label
),
TaskWaitTime: promclient.NewHistogram(promclient.HistogramOpts{
Name: pre + "task_wait_duration_seconds",
Buckets: durationBuckets,
Help: "The histogram of task wait durations in seconds.",
}),
TaskFailedDuration: promclient.NewHistogram(promclient.HistogramOpts{
Name: pre + "task_failure_duration_seconds",
Buckets: durationBuckets,
Help: "The histogram of task failure durations in seconds.",
}),
PerTaskFailedDuration: promclient.NewHistogramVec(
promclient.HistogramOpts{
Name: pre + "task_failure_duration_seconds_per_task",
Buckets: durationBuckets,
Help: "The histogram of task failure durations in seconds per task."},
[]string{"task_name"},
),
ActiveTasks: stats.Int64(pre+"active_tasks", "Current number of active tasks.", stats.UnitDimensionless),
CpuUsage: stats.Float64(pre+"cpu_usage", "Percentage of CPU in use.", stats.UnitDimensionless),
GpuUsage: stats.Float64(pre+"gpu_usage", "Percentage of GPU in use.", stats.UnitDimensionless),
Expand Down Expand Up @@ -102,4 +140,29 @@ func init() {
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.PerTaskDuration)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.PerTaskWaitTime)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.TaskWaitTime)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.PerTaskFailedDuration)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.TaskFailedDuration)
if err != nil {
panic(err)
}
}
73 changes: 48 additions & 25 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,38 +236,28 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, sectorID *abi.SectorID, w
workEnd := time.Now()
retryWait := time.Millisecond * 100

{
// metrics

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis())))

duration := workEnd.Sub(workStart).Seconds()
TaskMeasures.TaskDuration.Observe(duration)

if done {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksCompleted.M(1))
} else {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksFailed.M(1))
}
}
var totalWaitTime time.Duration
var failedExecutionTime time.Duration

retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
var retries uint
err := tx.QueryRow(`SELECT posted_time, retries FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime, &retries)
var waitAccumulated time.Duration
err := tx.QueryRow(`SELECT posted_time, wait_accumulated, retries FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime, &waitAccumulated, &retries)

if err != nil {
return false, fmt.Errorf("could not log completion: %w ", err)
}

currentWait := workStart.Sub(postedTime)
newWaitAccumulated := waitAccumulated + currentWait

taskExecutionTime := workEnd.Sub(workStart)

result := "unspecified error"
if done {
totalWaitTime = newWaitAccumulated
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {

Expand All @@ -281,6 +271,9 @@ retryRecordCompletion:
if doErr != nil {
result = "error: " + doErr.Error()
}

failedExecutionTime = taskExecutionTime

var deleteTask bool
if h.MaxFailures > 0 && retries >= h.MaxFailures-1 {
deleteTask = true
Expand All @@ -292,17 +285,17 @@ retryRecordCompletion:
}
// Note: Extra Info is left laying around for later review & clean-up
} else {
_, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL, retries=$1, update_time=CURRENT_TIMESTAMP WHERE id=$2`, retries+1, tID)
_, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL, retries=$1, wait_accumulated=$2, update_time=CURRENT_TIMESTAMP WHERE id=$3`, retries+1, newWaitAccumulated, tID)
if err != nil {
return false, fmt.Errorf("could not disown failed task: %v %v", tID, err)
}
}
}

var hid int
err = tx.QueryRow(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result).Scan(&hid)
err = tx.QueryRow(`INSERT INTO harmony_task_history (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`,
tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result).Scan(&hid)
if err != nil {
return false, fmt.Errorf("could not write history: %w", err)
}
Expand All @@ -327,6 +320,36 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.U
if !cm {
log.Error("Committing the task records failed")
}

{
// metrics

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis())))

duration := workEnd.Sub(workStart).Seconds()
TaskMeasures.TaskDuration.Observe(duration)

if done {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksCompleted.M(1))

// Record total wait time and task duration in Prometheus only for successful tasks
TaskMeasures.PerTaskDuration.WithLabelValues(h.Name).Observe(duration)

TaskMeasures.PerTaskWaitTime.WithLabelValues(h.Name).Observe(totalWaitTime.Seconds())
TaskMeasures.TaskWaitTime.Observe(totalWaitTime.Seconds())
} else {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksFailed.M(1))

TaskMeasures.PerTaskFailedDuration.WithLabelValues(h.Name).Observe(failedExecutionTime.Seconds())
TaskMeasures.TaskFailedDuration.Observe(failedExecutionTime.Seconds())
}
}
}

func (h *taskTypeHandler) AssertMachineHasCapacity() error {
Expand Down

0 comments on commit 1b73711

Please sign in to comment.