Skip to content

Commit 6d6434a

Browse files
committed
update harmonytask Do() to require context
1 parent a3473a4 commit 6d6434a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+101
-168
lines changed

alertmanager/task_alert.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,7 @@ func NewAlertTask(
100100
}
101101
}
102102

103-
func (a *AlertTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
104-
ctx := context.Background()
105-
103+
func (a *AlertTask) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
106104
alMap := make(map[string]*alertOut)
107105

108106
altrs := &alerts{

harmony/harmonytask/harmonytask.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type TaskInterface interface {
8383
// ONLY be called by harmonytask.
8484
// Indicate if the task no-longer needs scheduling with done=true including
8585
// cases where it's past the deadline.
86-
Do(taskID TaskID, stillOwned func() bool) (done bool, err error)
86+
Do(ctx context.Context, taskID TaskID, stillOwned func() bool) (done bool, err error)
8787

8888
// CanAccept should return if the task can run on this machine. It should
8989
// return null if the task type is not allowed on this machine.
@@ -242,7 +242,7 @@ func New(
242242
// passing a deadline will ignore those still running (to be picked-up later).
243243
func (e *TaskEngine) GracefullyTerminate() {
244244

245-
// call the cancel func to avoid picking up any new tasks. Running tasks have context.Background()
245+
// call the cancel func to avoid picking up any new tasks. Running tasks now inherit e.ctx.
246246
// Call shutdown to stop posting heartbeat to DB.
247247
e.grace()
248248
e.reg.Shutdown()

harmony/harmonytask/task_type_handler.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,10 @@ func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted
270270
}
271271
}()
272272

273-
done, doErr = h.Do(tID, func() bool {
273+
taskCtx, taskCancel := context.WithCancel(h.TaskEngine.ctx)
274+
defer taskCancel()
275+
276+
done, doErr = h.Do(taskCtx, tID, func() bool {
274277
if taskhelp.IsBackgroundTask(h.Name) || h.CanYield {
275278
if h.TaskEngine.yieldBackground.Load() {
276279
log.Infow("yielding background task", "name", h.Name, "id", tID)
@@ -279,8 +282,8 @@ func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted
279282
}
280283

281284
var owner int
282-
// Background here because we don't want GracefulRestart to block this save.
283-
err := h.TaskEngine.db.QueryRow(context.Background(),
285+
// Keep ownership checks tied to the same task context used by Do().
286+
err := h.TaskEngine.db.QueryRow(taskCtx,
284287
`SELECT owner_id FROM harmony_task WHERE id=$1`, tID).Scan(&owner)
285288
if err != nil {
286289
log.Error("Cannot determine ownership: ", err)

itests/alertnow_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package itests
22

33
import (
4+
"context"
45
"testing"
56

67
"github.com/stretchr/testify/require"
@@ -31,7 +32,7 @@ func TestAlertNow(t *testing.T) {
3132
alertmanager.AlertFuncs = []alertmanager.AlertFunc{alertmanager.NowCheck}
3233
// Create a new alert task
3334
at := alertmanager.NewAlertTask(nil, db, config.CurioAlertingConfig{}, as)
34-
done, err := at.Do(123, func() bool { return true })
35+
done, err := at.Do(context.Background(), 123, func() bool { return true })
3536
require.NoError(t, err)
3637
require.True(t, done)
3738
require.Equal(t, "Machine alertNowMachine: testMessage", tp.output)

tasks/balancemgr/task_balancemgr.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,7 @@ func (b *BalanceMgrTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask
176176
}
177177

178178
// Do implements harmonytask.TaskInterface.
179-
func (b *BalanceMgrTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
180-
ctx := context.Background()
179+
func (b *BalanceMgrTask) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
181180

182181
// select task info
183182
var id int64

tasks/expmgr/task_expmgr.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ func NewExpMgrTask(db *harmonydb.DB, chain api.FullNode, pcs *chainsched.CurioCh
3838
}
3939
}
4040

41-
func (e *ExpMgrTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
42-
ctx := context.Background()
41+
func (e *ExpMgrTask) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
4342

4443
log.Infow("starting expiration manager task", "task_id", taskID)
4544

tasks/f3/f3_task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func NewF3Task(db *harmonydb.DB, api F3ParticipationAPI, actors *config.Dynamic[
6666
}
6767
}
6868

69-
func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
69+
func (f *F3Task) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
7070
// Ensure that all chain calls are made on the same node (the first call will determine the node)
71-
ctx := deps.OnSingleNode(context.Background())
71+
ctx = deps.OnSingleNode(ctx)
7272

7373
var spID int64
7474
err = f.db.QueryRow(ctx, "SELECT sp_id FROM f3_tasks WHERE task_id = $1", taskID).Scan(&spID)

tasks/gc/pipeline_meta_gc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func NewPipelineGC(db *harmonydb.DB) *PipelineGC {
2424
}
2525
}
2626

27-
func (s *PipelineGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
27+
func (s *PipelineGC) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
2828
if err := s.cleanupSealed(); err != nil {
2929
return false, xerrors.Errorf("cleanupSealed: %w", err)
3030
}

tasks/gc/storage_endpoint_gc.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func NewStorageEndpointGC(si paths.SectorIndex, remote *paths.Remote, db *harmon
4242
}
4343
}
4444

45-
func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
45+
func (s *StorageEndpointGC) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
4646
/*
4747
1. Get all storage paths + urls (endpoints)
4848
2. Ping each url, record results
@@ -52,8 +52,6 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool
5252
4.2.1 in the same transaction remove sector refs to the dead path
5353
*/
5454

55-
ctx := context.Background()
56-
5755
var pathRefs []struct {
5856
StorageID storiface.ID `db:"storage_id"`
5957
Urls string `db:"urls"`

tasks/gc/storage_gc_mark.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ func NewStorageGCMark(si paths.SectorIndex, remote *paths.Remote, db *harmonydb.
5757
}
5858
}
5959

60-
func (s *StorageGCMark) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
61-
ctx := context.Background()
60+
func (s *StorageGCMark) Do(ctx context.Context, taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
6261

6362
/*
6463
CREATE TABLE storage_removal_marks (

0 commit comments

Comments
 (0)