Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mark deal sealed in a DB transaction #434

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ jobs:
- run: git submodule update --init
- install-ubuntu-deps
- run: make deps
- run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.60.1
- run:
name: Lint
command: |
Expand Down
72 changes: 41 additions & 31 deletions tasks/seal/poller_commit_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,53 +168,63 @@ func (s *SealPoller) sendCommitBatch(ctx context.Context, spid int64, sectors []

func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error {
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
var execResult []dbExecResult

err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
comm, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var execResult []dbExecResult
err = tx.Select(&execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline spipeline
JOIN message_waits ON spipeline.commit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}

if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
return false, xerrors.Errorf("failed to query message_waits: %w", err)
}

if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
return err
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return false, xerrors.Errorf("failed to create miner address: %w", err)
}
}

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
return false, xerrors.Errorf("failed to handle commit message failure: %w", err)
}
}

if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron)
} else {
// yay!
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("get sector info: %w", err)
}

if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
return false, nil
} else {
// yay!

_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
_, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET
after_commit_msg_success = TRUE, commit_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = FALSE`,
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}

n, err := s.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update market_mk12_deal_pipeline: %w", err)
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update market_mk12_deal_pipeline: %w", err)
}
log.Debugw("marked deals as sealed", "sp", task.SpID, "sector", task.SectorNumber, "count", n)
return true, nil
}
log.Debugw("marked deals as sealed", "sp", task.SpID, "sector", task.SectorNumber, "count", n)
}
return false, nil
}, harmonydb.OptionRetry())
if err != nil {
return xerrors.Errorf("failed to commit transaction: %w", err)
}
if !comm {
return xerrors.Errorf("failed to commit transaction")
}
}

Expand Down
47 changes: 29 additions & 18 deletions tasks/snap/task_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,27 +688,38 @@ func (s *SubmitTask) schedule(ctx context.Context, addTaskFunc harmonytask.AddTa
})
}

// update landed
var tasks []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
}
comm, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// update landed
var tasks []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
}

err = tx.Select(&tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_prove_msg_success = FALSE AND after_submit = TRUE`)
if err != nil {
return false, xerrors.Errorf("getting tasks: %w", err)
}

err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_prove_msg_success = FALSE AND after_submit = TRUE`)
for _, t := range tasks {
if err := s.updateLanded(ctx, tx, t.SpID, t.SectorNumber); err != nil {
log.Errorw("updating landed", "sp", t.SpID, "sector", t.SectorNumber, "err", err)
return false, xerrors.Errorf("updating landed for sp %d, sector %d: %w", t.SpID, t.SectorNumber, err)
}
}
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return xerrors.Errorf("getting tasks: %w", err)
return xerrors.Errorf("updating landed: %w", err)
}

for _, t := range tasks {
if err := s.updateLanded(ctx, t.SpID, t.SectorNumber); err != nil {
log.Errorw("updating landed", "sp", t.SpID, "sector", t.SectorNumber, "err", err)
}
if !comm {
return xerrors.Errorf("updating landed: transaction failed")
}

return nil
}

func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) error {
func (s *SubmitTask) updateLanded(ctx context.Context, tx *harmonydb.Tx, spId, sectorNum int64) error {
var execResult []struct {
ProveMsgCID string `db:"prove_msg_cid"`
UpdateSealedCID string `db:"update_sealed_cid"`
Expand All @@ -719,7 +730,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}

err := s.db.Select(ctx, &execResult, `SELECT spipeline.prove_msg_cid, spipeline.update_sealed_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
err := tx.Select(&execResult, `SELECT spipeline.prove_msg_cid, spipeline.update_sealed_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_snap_pipeline spipeline
JOIN message_waits ON spipeline.prove_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, spId, sectorNum)
Expand All @@ -739,7 +750,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er
fallthrough
case exitcode.SysErrOutOfGas:
// just retry
n, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
n, err := tx.Exec(`UPDATE sectors_snap_pipeline SET
after_prove_msg_success = FALSE, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3 AND after_prove_msg_success = FALSE AND after_submit = TRUE`,
execResult[0].ExecutedTskCID, spId, sectorNum)
Expand Down Expand Up @@ -773,23 +784,23 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er

if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", spId, "sector", sectorNum, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron)
return xerrors.Errorf("sector info not found after prove message SP %d, sector %d, exec_epoch %d, exec_tskcid %s, msg_cid %s", spId, sectorNum, execResult[0].ExecutedTskEpoch, execResult[0].ExecutedTskCID, execResult[0].ExecutedMsgCID)
} else {
if si.SealedCID.String() != execResult[0].UpdateSealedCID {
log.Errorw("sector sealed CID mismatch after update?!", "sp", spId, "sector", sectorNum, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
return nil
return xerrors.Errorf("sector sealed CID mismatch after update?! SP %d, sector %d, exec_epoch %d, exec_tskcid %s, msg_cid %s", spId, sectorNum, execResult[0].ExecutedTskEpoch, execResult[0].ExecutedTskCID, execResult[0].ExecutedMsgCID)
}
// yay!

_, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
_, err := tx.Exec(`UPDATE sectors_snap_pipeline SET
after_prove_msg_success = TRUE, prove_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 AND after_prove_msg_success = FALSE`,
execResult[0].ExecutedTskCID, spId, sectorNum)
if err != nil {
return xerrors.Errorf("update sectors_snap_pipeline: %w", err)
}

n, err := s.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, spId, sectorNum)
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, spId, sectorNum)
if err != nil {
return xerrors.Errorf("update market_mk12_deal_pipeline: %w", err)
}
Expand Down