Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update metrics #424

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion harmony/harmonydb/sql/20230719-harmony.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ CREATE TABLE harmony_task (
added_by INTEGER NOT NULL,
previous_task INTEGER,
name varchar(16) NOT NULL
-- retries INTEGER NOT NULL DEFAULT 0 -- added later
-- retries INTEGER NOT NULL DEFAULT 0 (added in 20240927-task-retrywait.sql)
-- wait_accumulated INTERVAL DEFAULT '0 seconds' (added in 20250226-harmonytask-waittime.sql)

);
COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.';
COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.';
Expand Down
1 change: 1 addition & 0 deletions harmony/harmonydb/sql/20250226-harmonytask-waittime.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE harmony_task ADD COLUMN wait_accumulated INTERVAL DEFAULT '0 seconds';
83 changes: 73 additions & 10 deletions harmony/harmonytask/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ var (

// TaskMeasures groups all harmonytask metrics.
var TaskMeasures = struct {
TasksStarted *stats.Int64Measure
TasksCompleted *stats.Int64Measure
TasksFailed *stats.Int64Measure
TaskDuration promclient.Histogram
ActiveTasks *stats.Int64Measure
CpuUsage *stats.Float64Measure
GpuUsage *stats.Float64Measure
RamUsage *stats.Float64Measure
PollerIterations *stats.Int64Measure
AddedTasks *stats.Int64Measure
TasksStarted *stats.Int64Measure
TasksCompleted *stats.Int64Measure
TasksFailed *stats.Int64Measure
TaskDuration promclient.Histogram
PerTaskDuration *promclient.HistogramVec
PerTaskWaitTime *promclient.HistogramVec
PerTaskFailedDuration *promclient.HistogramVec
TaskWaitTime promclient.Histogram
TaskFailedDuration promclient.Histogram
ActiveTasks *stats.Int64Measure
CpuUsage *stats.Float64Measure
GpuUsage *stats.Float64Measure
RamUsage *stats.Float64Measure
PollerIterations *stats.Int64Measure
AddedTasks *stats.Int64Measure
}{
TasksStarted: stats.Int64(pre+"tasks_started", "Total number of tasks started.", stats.UnitDimensionless),
TasksCompleted: stats.Int64(pre+"tasks_completed", "Total number of tasks completed successfully.", stats.UnitDimensionless),
Expand All @@ -37,6 +42,39 @@ var TaskMeasures = struct {
Buckets: durationBuckets,
Help: "The histogram of task durations in seconds.",
}),
PerTaskDuration: promclient.NewHistogramVec(
promclient.HistogramOpts{
Name: pre + "task_duration_seconds_per_task",
Help: "The histogram of task durations in seconds per task.",
Buckets: durationBuckets,
},
[]string{"task_name"}, // Add task_name as a label
),
PerTaskWaitTime: promclient.NewHistogramVec(
promclient.HistogramOpts{
Name: pre + "task_wait_duration_seconds_per_task",
Help: "The histogram of task wait durations in seconds per task.",
Buckets: durationBuckets,
},
[]string{"task_name"}, // Add task_name as a label
),
TaskWaitTime: promclient.NewHistogram(promclient.HistogramOpts{
Name: pre + "task_wait_duration_seconds",
Buckets: durationBuckets,
Help: "The histogram of task wait durations in seconds.",
}),
TaskFailedDuration: promclient.NewHistogram(promclient.HistogramOpts{
Name: pre + "task_failure_duration_seconds",
Buckets: durationBuckets,
Help: "The histogram of task failure durations in seconds.",
}),
PerTaskFailedDuration: promclient.NewHistogramVec(
promclient.HistogramOpts{
Name: pre + "task_failure_duration_seconds_per_task",
Buckets: durationBuckets,
Help: "The histogram of task failure durations in seconds per task."},
[]string{"task_name"},
),
ActiveTasks: stats.Int64(pre+"active_tasks", "Current number of active tasks.", stats.UnitDimensionless),
CpuUsage: stats.Float64(pre+"cpu_usage", "Percentage of CPU in use.", stats.UnitDimensionless),
GpuUsage: stats.Float64(pre+"gpu_usage", "Percentage of GPU in use.", stats.UnitDimensionless),
Expand Down Expand Up @@ -102,4 +140,29 @@ func init() {
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.PerTaskDuration)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.PerTaskWaitTime)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.TaskWaitTime)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.PerTaskFailedDuration)
if err != nil {
panic(err)
}

err = promclient.Register(TaskMeasures.TaskFailedDuration)
if err != nil {
panic(err)
}
}
73 changes: 48 additions & 25 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,38 +236,28 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, sectorID *abi.SectorID, w
workEnd := time.Now()
retryWait := time.Millisecond * 100

{
// metrics

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis())))

duration := workEnd.Sub(workStart).Seconds()
TaskMeasures.TaskDuration.Observe(duration)

if done {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksCompleted.M(1))
} else {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksFailed.M(1))
}
}
var totalWaitTime time.Duration
var failedExecutionTime time.Duration

retryRecordCompletion:
cm, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
var postedTime time.Time
var retries uint
err := tx.QueryRow(`SELECT posted_time, retries FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime, &retries)
var waitAccumulated time.Duration
err := tx.QueryRow(`SELECT posted_time, wait_accumulated, retries FROM harmony_task WHERE id=$1`, tID).Scan(&postedTime, &waitAccumulated, &retries)

if err != nil {
return false, fmt.Errorf("could not log completion: %w ", err)
}

currentWait := workStart.Sub(postedTime)
newWaitAccumulated := waitAccumulated + currentWait

taskExecutionTime := workEnd.Sub(workStart)

result := "unspecified error"
if done {
totalWaitTime = newWaitAccumulated
_, err = tx.Exec("DELETE FROM harmony_task WHERE id=$1", tID)
if err != nil {

Expand All @@ -281,6 +271,9 @@ retryRecordCompletion:
if doErr != nil {
result = "error: " + doErr.Error()
}

failedExecutionTime = taskExecutionTime

var deleteTask bool
if h.MaxFailures > 0 && retries >= h.MaxFailures-1 {
deleteTask = true
Expand All @@ -292,17 +285,17 @@ retryRecordCompletion:
}
// Note: Extra Info is left laying around for later review & clean-up
} else {
_, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL, retries=$1, update_time=CURRENT_TIMESTAMP WHERE id=$2`, retries+1, tID)
_, err := tx.Exec(`UPDATE harmony_task SET owner_id=NULL, retries=$1, wait_accumulated=$2, update_time=CURRENT_TIMESTAMP WHERE id=$3`, retries+1, newWaitAccumulated, tID)
if err != nil {
return false, fmt.Errorf("could not disown failed task: %v %v", tID, err)
}
}
}

var hid int
err = tx.QueryRow(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result).Scan(&hid)
err = tx.QueryRow(`INSERT INTO harmony_task_history (task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`,
tID, h.Name, postedTime.UTC(), workStart.UTC(), workEnd.UTC(), done, h.TaskEngine.hostAndPort, result).Scan(&hid)
if err != nil {
return false, fmt.Errorf("could not write history: %w", err)
}
Expand All @@ -327,6 +320,36 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.U
if !cm {
log.Error("Committing the task records failed")
}

{
// metrics

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis())))

duration := workEnd.Sub(workStart).Seconds()
TaskMeasures.TaskDuration.Observe(duration)

if done {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksCompleted.M(1))

// Record total wait time and task duration in Prometheus only for successful tasks
TaskMeasures.PerTaskDuration.WithLabelValues(h.Name).Observe(duration)

TaskMeasures.PerTaskWaitTime.WithLabelValues(h.Name).Observe(totalWaitTime.Seconds())
TaskMeasures.TaskWaitTime.Observe(totalWaitTime.Seconds())
} else {
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.TasksFailed.M(1))

TaskMeasures.PerTaskFailedDuration.WithLabelValues(h.Name).Observe(failedExecutionTime.Seconds())
TaskMeasures.TaskFailedDuration.Observe(failedExecutionTime.Seconds())
}
}
}

func (h *taskTypeHandler) AssertMachineHasCapacity() error {
Expand Down