diff --git a/.circleci/config.yml b/.circleci/config.yml index 5ba7bdaf6..47ac112a3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -280,6 +280,8 @@ jobs: - run: git submodule update --init - install-ubuntu-deps - run: make deps + - run: | + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.60.1 - run: name: Lint command: | diff --git a/tasks/seal/poller_commit_msg.go b/tasks/seal/poller_commit_msg.go index 4bb628177..437471b8f 100644 --- a/tasks/seal/poller_commit_msg.go +++ b/tasks/seal/poller_commit_msg.go @@ -168,53 +168,63 @@ func (s *SealPoller) sendCommitBatch(ctx context.Context, spid int64, sectors [] func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error { if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() { - var execResult []dbExecResult - err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + comm, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + var execResult []dbExecResult + err = tx.Select(&execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used FROM sectors_sdr_pipeline spipeline JOIN message_waits ON spipeline.commit_msg_cid = message_waits.signed_message_cid WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, task.SpID, task.SectorNumber) - if err != nil { - log.Errorw("failed to query message_waits", "error", err) - } - - if len(execResult) > 0 { - maddr, err := address.NewIDAddress(uint64(task.SpID)) if err != nil { - return err + return false, xerrors.Errorf("failed to query message_waits: %w", err) } - if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok { - if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil { - return err + if len(execResult) > 0 { + maddr, err := address.NewIDAddress(uint64(task.SpID)) + if err != nil { + return false, xerrors.Errorf("failed to create miner address: %w", err) } - } - si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("get sector info: %w", err) - } + if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok { + if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil { + return false, xerrors.Errorf("failed to handle commit message failure: %w", err) + } + } - if si == nil { - log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) - // todo handdle missing sector info (not found after cron) - } else { - // yay! + si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("get sector info: %w", err) + } + + if si == nil { + log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) + return false, nil + } else { + // yay! - _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + _, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET after_commit_msg_success = TRUE, commit_msg_tsk = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = FALSE`, - execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) - if err != nil { - return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) - } + execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } - n, err := s.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, task.SpID, task.SectorNumber) - if err != nil { - return xerrors.Errorf("update market_mk12_deal_pipeline: %w", err) + n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update market_mk12_deal_pipeline: %w", err) + } + log.Debugw("marked deals as sealed", "sp", task.SpID, "sector", task.SectorNumber, "count", n) + return true, nil } - log.Debugw("marked deals as sealed", "sp", task.SpID, "sector", task.SectorNumber, "count", n) } + return false, nil + }, harmonydb.OptionRetry()) + if err != nil { + return xerrors.Errorf("failed to commit transaction: %w", err) + } + if !comm { + return xerrors.Errorf("failed to commit transaction") } } diff --git a/tasks/snap/task_submit.go b/tasks/snap/task_submit.go index cfb60afbe..7ead070a8 100644 --- a/tasks/snap/task_submit.go +++ b/tasks/snap/task_submit.go @@ -688,27 +688,38 @@ func (s *SubmitTask) schedule(ctx context.Context, addTaskFunc harmonytask.AddTa }) } - // update landed - var tasks []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - } + comm, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // update landed + var tasks []struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + } + + err = tx.Select(&tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_prove_msg_success = FALSE AND after_submit = TRUE`) + if err != nil { + return false, xerrors.Errorf("getting tasks: %w", err) + } - err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_prove_msg_success = FALSE AND after_submit = TRUE`) + for _, t := range tasks { + if err := s.updateLanded(ctx, tx, t.SpID, t.SectorNumber); err != nil { + log.Errorw("updating landed", "sp", t.SpID, "sector", t.SectorNumber, "err", err) + return false, xerrors.Errorf("updating landed for sp %d, sector %d: %w", t.SpID, t.SectorNumber, err) + } + } + return true, nil + }, harmonydb.OptionRetry()) if err != nil { - return xerrors.Errorf("getting tasks: %w", err) + return xerrors.Errorf("updating landed: %w", err) } - for _, t := range tasks { - if err := s.updateLanded(ctx, t.SpID, t.SectorNumber); err != nil { - log.Errorw("updating landed", "sp", t.SpID, "sector", t.SectorNumber, "err", err) - } + if !comm { + return xerrors.Errorf("updating landed: transaction failed") } return nil } -func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) error { +func (s *SubmitTask) updateLanded(ctx context.Context, tx *harmonydb.Tx, spId, sectorNum int64) error { var execResult []struct { ProveMsgCID string `db:"prove_msg_cid"` UpdateSealedCID string `db:"update_sealed_cid"` @@ -719,7 +730,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"` } - err := s.db.Select(ctx, &execResult, `SELECT spipeline.prove_msg_cid, spipeline.update_sealed_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used + err := tx.Select(&execResult, `SELECT spipeline.prove_msg_cid, spipeline.update_sealed_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used FROM sectors_snap_pipeline spipeline JOIN message_waits ON spipeline.prove_msg_cid = message_waits.signed_message_cid WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, spId, sectorNum) @@ -739,7 +750,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er fallthrough case exitcode.SysErrOutOfGas: // just retry - n, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + n, err := tx.Exec(`UPDATE sectors_snap_pipeline SET after_prove_msg_success = FALSE, after_submit = FALSE WHERE sp_id = $2 AND sector_number = $3 AND after_prove_msg_success = FALSE AND after_submit = TRUE`, execResult[0].ExecutedTskCID, spId, sectorNum) @@ -773,15 +784,15 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er if si == nil { log.Errorw("todo handle missing sector info (not found after cron)", "sp", spId, "sector", sectorNum, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) - // todo handdle missing sector info (not found after cron) + return xerrors.Errorf("sector info not found after prove message SP %d, sector %d, exec_epoch %d, exec_tskcid %s, msg_cid %s", spId, sectorNum, execResult[0].ExecutedTskEpoch, execResult[0].ExecutedTskCID, execResult[0].ExecutedMsgCID) } else { if si.SealedCID.String() != execResult[0].UpdateSealedCID { log.Errorw("sector sealed CID mismatch after update?!", "sp", spId, "sector", sectorNum, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) - return nil + return xerrors.Errorf("sector sealed CID mismatch after update?! SP %d, sector %d, exec_epoch %d, exec_tskcid %s, msg_cid %s", spId, sectorNum, execResult[0].ExecutedTskEpoch, execResult[0].ExecutedTskCID, execResult[0].ExecutedMsgCID) } // yay! - _, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + _, err := tx.Exec(`UPDATE sectors_snap_pipeline SET after_prove_msg_success = TRUE, prove_msg_tsk = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_prove_msg_success = FALSE`, execResult[0].ExecutedTskCID, spId, sectorNum) @@ -789,7 +800,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er return xerrors.Errorf("update sectors_snap_pipeline: %w", err) } - n, err := s.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, spId, sectorNum) + n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, spId, sectorNum) if err != nil { return xerrors.Errorf("update market_mk12_deal_pipeline: %w", err) }