Skip to content

fix: wdPost #499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion harmony/harmonydb/harmonydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (db *DB) upgrade() error {
}
_, err = db.Exec(context.Background(), rawStringOnly(megaSql))
if err != nil {
msg := fmt.Sprintf("Could not upgrade! %s", err.Error())
msg := fmt.Sprintf("Could not upgrade (%s)! %s", name, err.Error())
logger.Error(msg)
return xerrors.New(msg) // makes devs lives easier by placing message at the end.
}
Expand Down
1 change: 1 addition & 0 deletions harmony/harmonydb/sql/20240930-pdp.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
ALTER TABLE parked_pieces ADD COLUMN long_term BOOLEAN NOT NULL DEFAULT FALSE;

ALTER TABLE parked_pieces DROP CONSTRAINT IF EXISTS parked_pieces_piece_cid_key;
ALTER TABLE parked_pieces DROP CONSTRAINT IF EXISTS parked_pieces_piece_cid_cleanup_task_id_key;
ALTER TABLE parked_pieces ADD CONSTRAINT parked_pieces_piece_cid_cleanup_task_id_key UNIQUE (piece_cid, piece_padded_size, long_term, cleanup_task_id);

ALTER TABLE parked_piece_refs ADD COLUMN long_term BOOLEAN NOT NULL DEFAULT FALSE;
Expand Down
10 changes: 5 additions & 5 deletions tasks/window/compute_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredP

postChallenges, err := ffi.GeneratePoStFallbackSectorChallenges(ppt, minerID, randomness, sectorNums)
if err != nil {
return nil, nil, xerrors.Errorf("generating fallback challenges: %v", err)
return nil, nil, xerrors.Errorf("generating fallback challenges: %w", err)
}

proofList := make([]ffi.PartitionProof, partitionCount)
Expand Down Expand Up @@ -426,12 +426,12 @@ func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredP
sk := pr.Skipped

if err != nil || len(sk) > 0 {
log.Errorf("generateWindowPost part:%d, skipped:%d, sectors: %d, err: %+v", partIdx, len(sk), len(sectors), err)
log.Errorf("generateWindowPost part:%d, skipped:%d, sectors: %d, err: %s", partIdx, len(sk), len(sectors), err)
flk.Lock()
skipped = append(skipped, sk...)

if err != nil {
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex: %d err: %w", partIdx, err))
}
flk.Unlock()
}
Expand All @@ -448,7 +448,7 @@ func (t *WdPostTask) generateWindowPoSt(ctx context.Context, ppt abi.RegisteredP

postProofs, err := ffi.MergeWindowPoStPartitionProofs(ppt, proofList)
if err != nil {
return nil, skipped, xerrors.Errorf("merge windowPoSt partition proofs: %v", err)
return nil, skipped, xerrors.Errorf("merge windowPoSt partition proofs: %w", err)
}

out = append(out, *postProofs)
Expand All @@ -470,7 +470,7 @@ func (t *WdPostTask) GenerateWindowPoStAdv(ctx context.Context, ppt abi.Register
select {
case t.parallel <- struct{}{}:
case <-ctx.Done():
return storiface.WindowPoStResult{}, xerrors.Errorf("context error waiting on challengeThrottle")
return storiface.WindowPoStResult{}, xerrors.Errorf("context error waiting on challengeThrottle: %w", ctx.Err())
}
}

Expand Down
53 changes: 29 additions & 24 deletions tasks/window/compute_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
&spID, &pps, &dlIdx, &partIdx,
)
if err != nil {
log.Errorf("WdPostTask.Do() failed to queryRow: %w", err)
log.Errorf("WdPostTask.Do() failed to queryRow: %s", err.Error())
return false, err
}

head, err := t.api.ChainHead(context.Background())
if err != nil {
log.Errorf("WdPostTask.Do() for SP %d on Deadline %d and Partition %d failed to get chain head: %w", spID, dlIdx, partIdx, err)
log.Errorf("WdPostTask.Do() for SP %d on Deadline %d and Partition %d failed to get chain head: %s", spID, dlIdx, partIdx, err.Error())
return false, xerrors.Errorf("SP %d on Deadline %d and Partition %d getting chain head: %w", spID, dlIdx, partIdx, err)
}

Expand All @@ -157,15 +157,15 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
testTask = new(int)
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(testTask)
if err != nil {
log.Errorf("WdPostTask.Do() for SP %d on Deadline %d and Partition %d failed to queryRow: %w", spID, dlIdx, partIdx, err)
log.Errorf("WdPostTask.Do() for SP %d on Deadline %d and Partition %d failed to queryRow: %s", spID, dlIdx, partIdx, err.Error())
return false
}

return *testTask > 0
}

if deadline.PeriodElapsed() && !isTestTask() {
log.Errorf("WdPost SP %d on Deadline %d and Partition %d removed stale task: %v %v", spID, dlIdx, partIdx, taskID, deadline)
log.Errorf("WdPost SP %d on Deadline %d and Partition %d removed stale task %d: %v", spID, dlIdx, partIdx, taskID, deadline)
return true, nil
}

Expand All @@ -178,13 +178,13 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done

maddr, err := address.NewIDAddress(spID)
if err != nil {
log.Errorf("WdPostTask.Do() failed to NewIDAddress: %w", err)
log.Errorf("WdPostTask.Do() failed to NewIDAddress: %s", err.Error())
return false, xerrors.Errorf("SP %d on Deadline %d and Partition %d getting miner address: %w", spID, dlIdx, partIdx, err)
}

ts, err := t.api.ChainGetTipSetAfterHeight(context.Background(), deadline.Challenge, head.Key())
if err != nil {
log.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to ChainGetTipSetAfterHeight: %w", spID, dlIdx, partIdx, err)
log.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to ChainGetTipSetAfterHeight: %s", spID, dlIdx, partIdx, err.Error())
return false, xerrors.Errorf("SP %d on Deadline %d and Partition %d getting tipset: %w", spID, dlIdx, partIdx, err)
}

Expand All @@ -194,49 +194,55 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
defer close(finish)

// Monitor the current height to cancel the task with correct error
go func(stAPI WDPoStAPI, cancel context.CancelCauseFunc, finish chan struct{}) {
go func(ctx context.Context, stAPI WDPoStAPI, cancel context.CancelCauseFunc, finish chan struct{}) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done(): // To avoid goroutine leaks if any goes wrong with finish channel
return
case <-finish:
return
case <-ticker.C:
h, err := stAPI.ChainHead(context.Background())
if err != nil {
log.Warnf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to get chain head: %w", spID, dlIdx, partIdx, err)
failed_at := time.Now()
for time.Now().After(failed_at.Add(daemonFailureGracePeriod)) { // In case daemon not reachable temporarily, allow 5 minutes grace period
log.Warnf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to get chain head: %s", spID, dlIdx, partIdx, err.Error())
failedAt := time.Now()
for time.Now().After(failedAt.Add(daemonFailureGracePeriod)) { // In case daemon not reachable temporarily, allow 5 minutes grace period
h, err = stAPI.ChainHead(context.Background())
if err == nil {
break
}
time.Sleep(2 * time.Second)
}
if err != nil {
log.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to get chain head, cancelling context: %s", spID, dlIdx, partIdx, err.Error())
cancel(xerrors.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to get chain head: %w", spID, dlIdx, partIdx, err))
return
}
}
if h.Height() > deadline.Challenge {
cancel(xerrors.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d cancelling context as head %d is greater then deadline close %d", spID, dlIdx, partIdx, h.Height(), deadline.Close))
return
if !isTestTask() {
log.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d deadline closed at %d, cancelling context", spID, dlIdx, partIdx, h.Height())
cancel(xerrors.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d cancelling context as head %d is greater then deadline close %d", spID, dlIdx, partIdx, h.Height(), deadline.Close))
return
}
}
}
}
}(t.api, cancel, finish)
}(ctx, t.api, cancel, finish)

postOut, err := t.DoPartition(ctx, ts, maddr, deadline, partIdx, isTestTask())
if err != nil {
if errors.Is(err, context.Canceled) {
return false, context.Cause(ctx)
return false, context.Cause(ctx) // Let's not return true here just in case. This will cause a retry and if deadline is truly closed then deadline check will mark this as complete
}
if errors.Is(err, errEmptyPartition) {
log.Warnf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to doPartition: %w", spID, dlIdx, partIdx, err)
return false, nil
log.Warnf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to doPartition: %s", spID, dlIdx, partIdx, err.Error())
return true, nil
}
log.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to doPartition: %w", spID, dlIdx, partIdx, err)
log.Errorf("WdPostTask.Do() SP %d on Deadline %d and Partition %d failed to doPartition: %s", spID, dlIdx, partIdx, err.Error())
return false, xerrors.Errorf("SP %d on Deadline %d and Partition %d doing PoSt: %w", spID, dlIdx, partIdx, err)
}

Expand All @@ -261,12 +267,11 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
ctx := context.Background()
_, err = t.db.Exec(ctx, `UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), taskID)
_, err = t.db.Exec(context.Background(), `UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), taskID)
if err != nil {
return false, xerrors.Errorf("updating harmony_test: %w", err)
}
log.Infof("SKIPPED sending test message to chain. SELECT * FROM harmony_test WHERE task_id= %v", taskID)
log.Infof("SKIPPED sending test message to chain. SELECT * FROM harmony_test WHERE task_id= %d", taskID)
return true, nil // nothing committed
}
// Insert into wdpost_proofs table
Expand All @@ -290,11 +295,11 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
)

if err != nil {
log.Errorf("WdPostTask.Do() SP %s on Deadline %d and Partition %d failed to insert into wdpost_proofs: %w", spID, dlIdx, partIdx, err)
log.Errorf("WdPostTask.Do() SP %s on Deadline %d and Partition %d failed to insert into wdpost_proofs: %s", spID, dlIdx, partIdx, err.Error())
return false, xerrors.Errorf("SP %d on Deadline %d and Partition %d inserting into wdpost_proofs: %w", spID, dlIdx, partIdx, err)
}
if n != 1 {
log.Errorf("WdPostTask.Do() SP %s on Deadline %d and Partition %d failed to insert into wdpost_proofs: %w", spID, dlIdx, partIdx, err)
log.Errorf("WdPostTask.Do() SP %s on Deadline %d and Partition %d failed to insert into wdpost_proofs: %s", spID, dlIdx, partIdx, err.Error())
return false, xerrors.Errorf("SP %d on Deadline %d and Partition %d inserting into wdpost_proofs: %w", spID, dlIdx, partIdx, err)
}

Expand Down Expand Up @@ -392,7 +397,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
FROM harmony_task_history
WHERE task_id = $1 AND result = false`, d.TaskID).Scan(&r)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %s", err.Error())
}
return r < 2
})
Expand Down Expand Up @@ -433,7 +438,7 @@ func (t *WdPostTask) GetSpid(db *harmonydb.DB, taskID int64) string {
var spid string
err := db.QueryRow(context.Background(), `SELECT sp_id FROM wdpost_partition_tasks WHERE task_id = $1`, taskID).Scan(&spid)
if err != nil {
log.Errorf("getting spid: %s", err)
log.Errorf("getting spid: %s", err.Error())
return ""
}
return spid
Expand Down