Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved PSD selection and findURL in market #419

Merged
merged 2 commits into from
Feb 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 50 additions & 55 deletions tasks/storage-market/storage_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {

// Add PSD task - PSD is an exception which is processed for multiple deals at once to save
// gas cost for PSD messages
err = d.addPSDTask(ctx, deals)
err = d.addPSDTask(ctx)
if err != nil {
log.Errorf("%w", err)
}
Expand Down Expand Up @@ -485,7 +485,9 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea
req.Header = headers

// Create a client and make the request
client := &http.Client{}
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return false, xerrors.Errorf("error making GET request: %w", err)
Expand Down Expand Up @@ -533,66 +535,59 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea
return nil
}

func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pipeline) error {
type queue struct {
deals []string
t time.Time
func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context) error {
publishPeriod := time.Duration(d.cfg.Market.StorageMarketConfig.MK12.PublishMsgPeriod)
maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg

var eligibleSpIDs []struct {
SpID int64 `db:"sp_id"`
EarliestWaitTime time.Time `db:"earliest_wait_time"`
}

dm := make(map[int64]queue)
err := d.db.Select(ctx, &eligibleSpIDs, `SELECT sp_id, MIN(psd_wait_time) AS earliest_wait_time
FROM market_mk12_deal_pipeline
WHERE started = TRUE
AND after_commp = TRUE
AND psd_task_id IS NULL
AND after_psd = FALSE
GROUP BY sp_id;`)
if err != nil {
return xerrors.Errorf("getting eligible SPs for psd tasks: %w", err)
}

for _, deal := range deals {
if deal.Started && deal.AfterCommp && deal.PSDWaitTime != nil && !deal.AfterPSD && deal.PSDTaskID == nil {
// Check if the spID is already in the map
if q, exists := dm[deal.SpID]; exists {
// Append the UUID to the deals list
q.deals = append(q.deals, deal.UUID)

// Update the time if the current deal's time is older
if deal.PSDWaitTime.Before(q.t) {
q.t = *deal.PSDWaitTime
}
if len(eligibleSpIDs) == 0 {
return nil
}

// Update the map with the new queue
dm[deal.SpID] = q
} else {
// Add a new entry to the map if spID is not present
dm[deal.SpID] = queue{
deals: []string{deal.UUID},
t: *deal.PSDWaitTime,
}
for _, sp := range eligibleSpIDs {
if sp.EarliestWaitTime.Add(publishPeriod).Before(time.Now()) {
continue
}
d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
n, err := tx.Exec(`WITH deals_to_update AS (
-- Select only deals that have not been assigned yet
SELECT uuid
FROM market_mk12_deal_pipeline
WHERE sp_id = $1
AND started = TRUE
AND after_commp = TRUE
AND psd_task_id IS NULL -- Ensures only unassigned deals are selected
AND after_psd = FALSE
ORDER BY psd_wait_time ASC
LIMIT $2 -- Limit by maxDeals
)
UPDATE market_mk12_deal_pipeline
SET psd_task_id = $3
WHERE uuid IN (SELECT uuid FROM deals_to_update)
AND psd_task_id IS NULL; -- Ensures no overwrite in case another node updated
`, sp.SpID, maxDeals, id)
if err != nil {
return false, xerrors.Errorf("creating psd task: %w", err)
}
}
return n > 0, nil
})
}

publishPeriod := d.cfg.Market.StorageMarketConfig.MK12.PublishMsgPeriod
maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg

for _, q := range dm {
if q.t.Add(time.Duration(publishPeriod)).Before(time.Now()) || uint64(len(q.deals)) > maxDeals {
d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
var n int
for _, deal := range q.deals {
u, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
WHERE uuid = $2 AND started = TRUE AND after_commp = TRUE
AND psd_task_id IS NULL`, id, deal)
if err != nil {
return false, xerrors.Errorf("updating deal pipeline: %w", err)
}
n += u
}

if n > 0 {
log.Infof("PSD task created for %d deals %s", n, q.deals)
}

return n > 0, nil
})
} else {
log.Infow("PSD task not created as the time is not yet reached", "time", q.t.Add(time.Duration(publishPeriod)), "deals", q.deals)
}
}
return nil
}

Expand Down
Loading