diff --git a/tasks/storage-market/storage_market.go b/tasks/storage-market/storage_market.go index 3654ee56b..ab59145f3 100644 --- a/tasks/storage-market/storage_market.go +++ b/tasks/storage-market/storage_market.go @@ -267,7 +267,7 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) { // Add PSD task - PSD is an exception which is processed for multiple deals at once to save // gas cost for PSD messages - err = d.addPSDTask(ctx, deals) + err = d.addPSDTask(ctx) if err != nil { log.Errorf("%w", err) } @@ -511,7 +511,9 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea req.Header = headers // Create a client and make the request - client := &http.Client{} + client := &http.Client{ + Timeout: 10 * time.Second, + } resp, err := client.Do(req) if err != nil { return false, xerrors.Errorf("error making GET request: %w", err) @@ -559,66 +561,59 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea return nil } -func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pipeline) error { - type queue struct { - deals []string - t time.Time +func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context) error { + publishPeriod := time.Duration(d.cfg.Market.StorageMarketConfig.MK12.PublishMsgPeriod) + maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg + + var eligibleSpIDs []struct { + SpID int64 `db:"sp_id"` + EarliestWaitTime time.Time `db:"earliest_wait_time"` } - dm := make(map[int64]queue) + err := d.db.Select(ctx, &eligibleSpIDs, `SELECT sp_id, MIN(psd_wait_time) AS earliest_wait_time + FROM market_mk12_deal_pipeline + WHERE started = TRUE + AND after_commp = TRUE + AND psd_task_id IS NULL + AND after_psd = FALSE + GROUP BY sp_id;`) + if err != nil { + return xerrors.Errorf("getting eligible SPs for psd tasks: %w", err) + } - for _, deal := range deals { - if deal.Started && deal.AfterCommp && deal.PSDWaitTime != nil && !deal.AfterPSD && deal.PSDTaskID == nil { - // Check if the spID is already in the map - if q, exists := dm[deal.SpID]; exists { - // Append the UUID to the deals list - q.deals = append(q.deals, deal.UUID) - - // Update the time if the current deal's time is older - if deal.PSDWaitTime.Before(q.t) { - q.t = *deal.PSDWaitTime - } + if len(eligibleSpIDs) == 0 { + return nil + } - // Update the map with the new queue - dm[deal.SpID] = q - } else { - // Add a new entry to the map if spID is not present - dm[deal.SpID] = queue{ - deals: []string{deal.UUID}, - t: *deal.PSDWaitTime, - } + for _, sp := range eligibleSpIDs { + if sp.EarliestWaitTime.Add(publishPeriod).Before(time.Now()) { + continue + } + d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + n, err := tx.Exec(`WITH deals_to_update AS ( + -- Select only deals that have not been assigned yet + SELECT uuid + FROM market_mk12_deal_pipeline + WHERE sp_id = $1 + AND started = TRUE + AND after_commp = TRUE + AND psd_task_id IS NULL -- Ensures only unassigned deals are selected + AND after_psd = FALSE + ORDER BY psd_wait_time ASC + LIMIT $2 -- Limit by maxDeals + ) + UPDATE market_mk12_deal_pipeline + SET psd_task_id = $3 + WHERE uuid IN (SELECT uuid FROM deals_to_update) + AND psd_task_id IS NULL; -- Ensures no overwrite in case another node updated + `, sp.SpID, maxDeals, id) + if err != nil { + return false, xerrors.Errorf("creating psd task: %w", err) } - } + return n > 0, nil + }) } - publishPeriod := d.cfg.Market.StorageMarketConfig.MK12.PublishMsgPeriod - maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg - - for _, q := range dm { - if q.t.Add(time.Duration(publishPeriod)).Before(time.Now()) || uint64(len(q.deals)) > maxDeals { - d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { - // update - var n int - for _, deal := range q.deals { - u, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1 - WHERE uuid = $2 AND started = TRUE AND after_commp = TRUE - AND psd_task_id IS NULL`, id, deal) - if err != nil { - return false, xerrors.Errorf("updating deal pipeline: %w", err) - } - n += u - } - - if n > 0 { - log.Infof("PSD task created for %d deals %s", n, q.deals) - } - - return n > 0, nil - }) - } else { - log.Infow("PSD task not created as the time is not yet reached", "time", q.t.Add(time.Duration(publishPeriod)), "deals", q.deals) - } - } return nil }