Skip to content

Commit

Permalink
mark deal sealed in a DB transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Mar 7, 2025
1 parent 395bc47 commit 7c075d3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 49 deletions.
72 changes: 41 additions & 31 deletions tasks/seal/poller_commit_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,53 +168,63 @@ func (s *SealPoller) sendCommitBatch(ctx context.Context, spid int64, sectors []

func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error {
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
var execResult []dbExecResult

err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
comm, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var execResult []dbExecResult
err = tx.Select(&execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_sdr_pipeline spipeline
JOIN message_waits ON spipeline.commit_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, task.SpID, task.SectorNumber)
if err != nil {
log.Errorw("failed to query message_waits", "error", err)
}

if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return err
return false, xerrors.Errorf("failed to query message_waits: %w", err)
}

if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
return err
if len(execResult) > 0 {
maddr, err := address.NewIDAddress(uint64(task.SpID))
if err != nil {
return false, xerrors.Errorf("failed to create miner address: %w", err)
}
}

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
return false, xerrors.Errorf("failed to handle commit message failure: %w", err)
}
}

if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron)
} else {
// yay!
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("get sector info: %w", err)
}

if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
return false, nil
} else {
// yay!

_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
_, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET
after_commit_msg_success = TRUE, commit_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = FALSE`,
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}

n, err := s.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, task.SpID, task.SectorNumber)
if err != nil {
return xerrors.Errorf("update market_mk12_deal_pipeline: %w", err)
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update market_mk12_deal_pipeline: %w", err)
}
log.Debugw("marked deals as sealed", "sp", task.SpID, "sector", task.SectorNumber, "count", n)
return true, nil
}
log.Debugw("marked deals as sealed", "sp", task.SpID, "sector", task.SectorNumber, "count", n)
}
return false, nil
}, harmonydb.OptionRetry())
if err != nil {
return xerrors.Errorf("failed to commit transaction: %w", err)
}
if !comm {
return xerrors.Errorf("failed to commit transaction")
}
}

Expand Down
47 changes: 29 additions & 18 deletions tasks/snap/task_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,27 +688,38 @@ func (s *SubmitTask) schedule(ctx context.Context, addTaskFunc harmonytask.AddTa
})
}

// update landed
var tasks []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
}
comm, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// update landed
var tasks []struct {
SpID int64 `db:"sp_id"`
SectorNumber int64 `db:"sector_number"`
}

err = tx.Select(&tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_prove_msg_success = FALSE AND after_submit = TRUE`)
if err != nil {
return false, xerrors.Errorf("getting tasks: %w", err)
}

err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_prove_msg_success = FALSE AND after_submit = TRUE`)
for _, t := range tasks {
if err := s.updateLanded(ctx, tx, t.SpID, t.SectorNumber); err != nil {
log.Errorw("updating landed", "sp", t.SpID, "sector", t.SectorNumber, "err", err)
return false, xerrors.Errorf("updating landed for sp %d, sector %d: %w", t.SpID, t.SectorNumber, err)
}
}
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return xerrors.Errorf("getting tasks: %w", err)
return xerrors.Errorf("updating landed: %w", err)
}

for _, t := range tasks {
if err := s.updateLanded(ctx, t.SpID, t.SectorNumber); err != nil {
log.Errorw("updating landed", "sp", t.SpID, "sector", t.SectorNumber, "err", err)
}
if !comm {
return xerrors.Errorf("updating landed: transaction failed")
}

return nil
}

func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) error {
func (s *SubmitTask) updateLanded(ctx context.Context, tx *harmonydb.Tx, spId, sectorNum int64) error {
var execResult []struct {
ProveMsgCID string `db:"prove_msg_cid"`
UpdateSealedCID string `db:"update_sealed_cid"`
Expand All @@ -719,7 +730,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
}

err := s.db.Select(ctx, &execResult, `SELECT spipeline.prove_msg_cid, spipeline.update_sealed_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
err := tx.Select(&execResult, `SELECT spipeline.prove_msg_cid, spipeline.update_sealed_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
FROM sectors_snap_pipeline spipeline
JOIN message_waits ON spipeline.prove_msg_cid = message_waits.signed_message_cid
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, spId, sectorNum)
Expand All @@ -739,7 +750,7 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er
fallthrough
case exitcode.SysErrOutOfGas:
// just retry
n, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
n, err := tx.Exec(`UPDATE sectors_snap_pipeline SET
after_prove_msg_success = FALSE, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3 AND after_prove_msg_success = FALSE AND after_submit = TRUE`,
execResult[0].ExecutedTskCID, spId, sectorNum)
Expand Down Expand Up @@ -773,23 +784,23 @@ func (s *SubmitTask) updateLanded(ctx context.Context, spId, sectorNum int64) er

if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", spId, "sector", sectorNum, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron)
return xerrors.Errorf("sector info not found after prove message SP %d, sector %d, exec_epoch %d, exec_tskcid %s, msg_cid %s", spId, sectorNum, execResult[0].ExecutedTskEpoch, execResult[0].ExecutedTskCID, execResult[0].ExecutedMsgCID)
} else {
if si.SealedCID.String() != execResult[0].UpdateSealedCID {
log.Errorw("sector sealed CID mismatch after update?!", "sp", spId, "sector", sectorNum, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
return nil
return xerrors.Errorf("sector sealed CID mismatch after update?! SP %d, sector %d, exec_epoch %d, exec_tskcid %s, msg_cid %s", spId, sectorNum, execResult[0].ExecutedTskEpoch, execResult[0].ExecutedTskCID, execResult[0].ExecutedMsgCID)
}
// yay!

_, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
_, err := tx.Exec(`UPDATE sectors_snap_pipeline SET
after_prove_msg_success = TRUE, prove_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 AND after_prove_msg_success = FALSE`,
execResult[0].ExecutedTskCID, spId, sectorNum)
if err != nil {
return xerrors.Errorf("update sectors_snap_pipeline: %w", err)
}

n, err := s.db.Exec(ctx, `UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, spId, sectorNum)
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET sealed = TRUE WHERE sp_id = $1 AND sector = $2 AND sealed = FALSE`, spId, sectorNum)
if err != nil {
return xerrors.Errorf("update market_mk12_deal_pipeline: %w", err)
}
Expand Down

0 comments on commit 7c075d3

Please sign in to comment.