Skip to content

Commit

Permalink
updated sealing states
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Feb 19, 2025
1 parent 271bc8a commit ff52404
Showing 1 changed file with 40 additions and 122 deletions.
162 changes: 40 additions & 122 deletions market/mk12/cidgravity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/filecoin-project/go-state-types/crypto"

"github.com/filecoin-project/curio/build"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
)

const FormatVersion = "1.0"

// CidGravityPayload defines the structure of the JSON payload for the POST request
type CidGravityPayload struct {
Agent string `json:"Agent"`
Expand Down Expand Up @@ -69,8 +69,8 @@ type CidGravityPayload struct {
Sealing int `json:"Sealing"`
} `json:"DealStagingStates"`
Pipeline struct {
IsSnap bool `json:"IsSnap"`
States []SealingStates `json:"States"`
IsSnap bool `json:"IsSnap"`
States SealingStates `json:"States"`
}
}

Expand All @@ -83,10 +83,36 @@ type CidGravityPayload struct {
}

type SealingStates struct {
Name string `json:"Name"`
Pending int64 `json:"Pending"`
Running int64 `json:"Running"`
Failed int64 `json:"Failed"`
SDRPending int64 `db:"sdr_pending" json:"SDRPending,omitempty"`
SDRRunning int64 `db:"sdr_running" json:"SDRRunning,omitempty"`
SDRFailed int64 `db:"sdr_failed" json:"SDRFailed,omitempty"`
TreesPending int64 `db:"trees_pending" json:"TreesPending,omitempty"`
TreesRunning int64 `db:"trees_running" json:"TreesRunning,omitempty"`
TreesFailed int64 `db:"trees_failed" json:"TreesFailed,omitempty"`
PrecommitMsgPending int64 `db:"precommit_msg_pending" json:"PrecommitMsgPending,omitempty"`
PrecommitMsgRunning int64 `db:"precommit_msg_running" json:"PrecommitMsgRunning,omitempty"`
PrecommitMsgFailed int64 `db:"precommit_msg_failed" json:"PrecommitMsgFailed,omitempty"`
WaitSeedPending int64 `db:"wait_seed_pending" json:"WaitSeedPending,omitempty"`
WaitSeedRunning int64 `db:"wait_seed_running" json:"WaitSeedRunning,omitempty"`
WaitSeedFailed int64 `db:"wait_seed_failed" json:"WaitSeedFailed,omitempty"`
PoRepPending int64 `db:"porep_pending" json:"PoRepPending,omitempty"`
PoRepRunning int64 `db:"porep_running" json:"PoRepRunning,omitempty"`
PoRepFailed int64 `db:"porep_failed" json:"PoRepFailed,omitempty"`
CommitMsgPending int64 `db:"commit_msg_pending" json:"CommitMsgPending,omitempty"`
CommitMsgRunning int64 `db:"commit_msg_running" json:"CommitMsgRunning,omitempty"`
CommitMsgFailed int64 `db:"commit_msg_failed" json:"CommitMsgFailed,omitempty"`
EncodeRunning int64 `db:"encode_running" json:"EncodeRunning,omitempty"`
EncodePending int64 `db:"encode_pending" json:"EncodePending,omitempty"`
EncodeFailed int64 `db:"encode_failed" json:"EncodeFailed,omitempty"`
ProveRunning int64 `db:"prove_running" json:"ProveRunning,omitempty"`
ProvePending int64 `db:"prove_pending" json:"ProvePending,omitempty"`
ProveFailed int64 `db:"prove_failed" json:"ProveFailed,omitempty"`
SubmitRunning int64 `db:"submit_running" json:"SubmitRunning,omitempty"`
SubmitPending int64 `db:"submit_pending" json:"SubmitPending,omitempty"`
SubmitFailed int64 `db:"submit_failed" json:"SubmitFailed,omitempty"`
MoveStorageRunning int64 `db:"move_storage_running" json:"MoveStorageRunning,omitempty"`
MoveStoragePending int64 `db:"move_storage_pending" json:"MoveStoragePending,omitempty"`
MoveStorageFailed int64 `db:"move_storage_failed" json:"MoveStorageFailed,omitempty"`
}

type cidGravityResponse struct {
Expand Down Expand Up @@ -203,7 +229,7 @@ func (m *MK12) cidGravityCheck(ctx context.Context, deal *ProviderDealState) (bo
func (m *MK12) prepareCidGravityPayload(ctx context.Context, deal *ProviderDealState) ([]byte, error) {
data := CidGravityPayload{
Agent: agentName,
FormatVersion: build.BuildVersion,
FormatVersion: FormatVersion,
DealType: "storage",
}

Expand Down Expand Up @@ -299,25 +325,7 @@ func (m *MK12) prepareCidGravityPayload(ctx context.Context, deal *ProviderDealS
data.SealingPipelineState.Pipeline.IsSnap = m.cfg.Ingest.DoSnap

if m.cfg.Ingest.DoSnap {
var cts []struct {
Total int64 `db:"total"`

EncodePending int64 `db:"encode_pending"`
EncodeRunning int64 `db:"encode_running"`
EncodeFailed int64 `db:"encode_failed"`

ProvePending int64 `db:"prove_pending"`
ProveRunning int64 `db:"prove_running"`
ProveFailed int64 `db:"prove_failed"`

SubmitPending int64 `db:"submit_pending"`
SubmitRunning int64 `db:"submit_running"`
SubmitFailed int64 `db:"submit_failed"`

MoveStoragePending int64 `db:"move_storage_pending"`
MoveStorageRunning int64 `db:"move_storage_running"`
MoveStorageFailed int64 `db:"move_storage_failed"`
}
var cts []SealingStates

err = m.db.Select(ctx, &cts, `WITH pipeline_data AS (
SELECT sp.*,
Expand All @@ -340,8 +348,6 @@ func (m *MK12) prepareCidGravityPayload(ctx context.Context, deal *ProviderDealS
WHERE after_move_storage = false
)
SELECT
COUNT(*) AS total,
-- Encode stage
COUNT(*) FILTER (WHERE after_encode = false AND task_id_encode IS NOT NULL AND encode_owner IS NULL) AS encode_pending,
COUNT(*) FILTER (WHERE after_encode = false AND task_id_encode IS NOT NULL AND encode_owner IS NOT NULL) AS encode_running,
Expand Down Expand Up @@ -372,57 +378,11 @@ func (m *MK12) prepareCidGravityPayload(ctx context.Context, deal *ProviderDealS
return nil, xerrors.Errorf("expected 1 row, got 0")
}

ct := cts[0]

data.SealingPipelineState.Pipeline.States = []SealingStates{
{
Name: "Encode",
Running: ct.EncodeRunning,
Pending: ct.EncodePending,
Failed: ct.EncodeFailed,
},
{
Name: "ProveUpdate",
Running: ct.ProveRunning,
Pending: ct.ProvePending,
Failed: ct.ProveFailed,
},
{
Name: "SubmitUpdate",
Running: ct.SubmitRunning,
Pending: ct.SubmitPending,
Failed: ct.SubmitFailed,
},
{
Name: "MoveStorage",
Running: ct.MoveStorageRunning,
Pending: ct.MoveStoragePending,
Failed: ct.MoveStorageFailed,
},
}
data.SealingPipelineState.Pipeline.States = cts[0]

} else {
var cts []struct {
Total int64 `db:"total"`

SDRPending int64 `db:"sdr_pending"`
SDRRunning int64 `db:"sdr_running"`
SDRFailed int64 `db:"sdr_failed"`
TreesPending int64 `db:"trees_pending"`
TreesRunning int64 `db:"trees_running"`
TreesFailed int64 `db:"trees_failed"`
PrecommitMsgPending int64 `db:"precommit_msg_pending"`
PrecommitMsgRunning int64 `db:"precommit_msg_running"`
PrecommitMsgFailed int64 `db:"precommit_msg_failed"`
WaitSeedPending int64 `db:"wait_seed_pending"`
WaitSeedRunning int64 `db:"wait_seed_running"`
PoRepPending int64 `db:"porep_pending"`
PoRepRunning int64 `db:"porep_running"`
PoRepFailed int64 `db:"porep_failed"`
CommitMsgPending int64 `db:"commit_msg_pending"`
CommitMsgRunning int64 `db:"commit_msg_running"`
CommitMsgFailed int64 `db:"commit_msg_failed"`
}

var cts []SealingStates

err = m.db.Select(ctx, &cts, `WITH pipeline_data AS (
SELECT
Expand Down Expand Up @@ -476,9 +436,6 @@ func (m *MK12) prepareCidGravityPayload(ctx context.Context, deal *ProviderDealS
FROM pipeline_data
)
SELECT
-- Total active pipelines: those not done and not failed
COUNT(*) FILTER (WHERE NOT at_done AND NOT at_failed) AS total,
-- SDR stage pending/running
COUNT(*) FILTER (WHERE at_sdr AND task_id_sdr IS NOT NULL AND sdr_owner IS NULL) AS sdr_pending,
COUNT(*) FILTER (WHERE at_sdr AND task_id_sdr IS NOT NULL AND sdr_owner IS NOT NULL) AS sdr_running,
Expand Down Expand Up @@ -536,46 +493,7 @@ func (m *MK12) prepareCidGravityPayload(ctx context.Context, deal *ProviderDealS
return nil, xerrors.Errorf("expected 1 row, got 0")
}

ct := cts[0]

data.SealingPipelineState.Pipeline.States = []SealingStates{
{
Name: "SDR",
Running: ct.SDRRunning,
Pending: ct.SDRPending,
Failed: ct.SDRFailed,
},
{
Name: "Trees",
Running: ct.TreesRunning,
Pending: ct.TreesPending,
Failed: ct.TreesFailed,
},
{
Name: "PrecommitMsg",
Running: ct.PrecommitMsgRunning,
Pending: ct.PrecommitMsgPending,
Failed: ct.PrecommitMsgFailed,
},
{
Name: "WaitSeed",
Running: ct.WaitSeedRunning,
Pending: ct.WaitSeedPending,
Failed: 0,
},
{
Name: "PoRep",
Running: ct.PoRepRunning,
Pending: ct.PoRepPending,
Failed: ct.PoRepFailed,
},
{
Name: "CommitMsg",
Running: ct.CommitMsgRunning,
Pending: ct.CommitMsgPending,
Failed: ct.CommitMsgFailed,
},
}
data.SealingPipelineState.Pipeline.States = cts[0]
}

return json.Marshal(data)
Expand Down

0 comments on commit ff52404

Please sign in to comment.