Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved PSD selection and findURL in market #419

Merged
merged 2 commits into from
Feb 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 52 additions & 58 deletions tasks/storage-market/storage_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {

// Add PSD task - PSD is an exception which is processed for multiple deals at once to save
// gas cost for PSD messages
err = d.addPSDTask(ctx, deals)
err = d.addPSDTask(ctx)
if err != nil {
log.Errorf("%w", err)
}
Expand Down Expand Up @@ -485,7 +485,9 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea
req.Header = headers

// Create a client and make the request
client := &http.Client{}
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return false, xerrors.Errorf("error making GET request: %w", err)
Expand Down Expand Up @@ -533,66 +535,58 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea
return nil
}

func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pipeline) error {
type queue struct {
deals []string
t time.Time
}

dm := make(map[int64]queue)

for _, deal := range deals {
if deal.Started && deal.AfterCommp && deal.PSDWaitTime != nil && !deal.AfterPSD && deal.PSDTaskID == nil {
// Check if the spID is already in the map
if q, exists := dm[deal.SpID]; exists {
// Append the UUID to the deals list
q.deals = append(q.deals, deal.UUID)

// Update the time if the current deal's time is older
if deal.PSDWaitTime.Before(q.t) {
q.t = *deal.PSDWaitTime
}

// Update the map with the new queue
dm[deal.SpID] = q
} else {
// Add a new entry to the map if spID is not present
dm[deal.SpID] = queue{
deals: []string{deal.UUID},
t: *deal.PSDWaitTime,
}
}
}
}

publishPeriod := d.cfg.Market.StorageMarketConfig.MK12.PublishMsgPeriod
func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context) error {
publishPeriod := time.Duration(d.cfg.Market.StorageMarketConfig.MK12.PublishMsgPeriod)
maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg

for _, q := range dm {
if q.t.Add(time.Duration(publishPeriod)).Before(time.Now()) || uint64(len(q.deals)) > maxDeals {
d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
var n int
for _, deal := range q.deals {
u, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
WHERE uuid = $2 AND started = TRUE AND after_commp = TRUE
AND psd_task_id IS NULL`, id, deal)
if err != nil {
return false, xerrors.Errorf("updating deal pipeline: %w", err)
}
n += u
}
d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
n, err := tx.Exec(`WITH grouped_deals AS (
-- Step 1: Group by sp_id and get the earliest psd_wait_time for each sp_id
SELECT
sp_id,
MIN(psd_wait_time) AS earliest_wait_time
FROM market_mk12_deal_pipeline
WHERE started = TRUE
AND after_commp = TRUE
AND psd_task_id IS NULL
AND after_psd = FALSE
GROUP BY sp_id
),
eligible_sp_ids AS (
-- Step 2: Select sp_ids where at least one deal has waited past publishPeriod
SELECT sp_id
FROM grouped_deals
WHERE earliest_wait_time + INTERVAL '1 second' * $1 < NOW()
),
eligible_deals AS (
-- Step 3: Select all deals for those sp_ids, ensuring selection is ordered by earliest psd_wait_time
SELECT d.uuid, d.sp_id, d.psd_wait_time
FROM market_mk12_deal_pipeline d
JOIN eligible_sp_ids e ON d.sp_id = e.sp_id
WHERE d.started = TRUE
AND d.after_commp = TRUE
AND d.psd_task_id IS NULL
AND d.after_psd = FALSE
ORDER BY d.psd_wait_time ASC -- Ensures deals are selected based on the earliest time first
),
deals_to_update AS (
-- Step 4: Select only the first maxDeals deals (ensuring no more than maxDeals are updated)
SELECT uuid
FROM eligible_deals
LIMIT $2
)
-- Step 5: Update only the selected maxDeals
UPDATE market_mk12_deal_pipeline
SET psd_task_id = $3
WHERE uuid IN (SELECT uuid FROM deals_to_update);
`, publishPeriod.Seconds(), maxDeals, id)
if err != nil {
return false, xerrors.Errorf("creating psd tasks: %w", err)
}

if n > 0 {
log.Infof("PSD task created for %d deals %s", n, q.deals)
}
return n > 0, nil
})

return n > 0, nil
})
} else {
log.Infow("PSD task not created as the time is not yet reached", "time", q.t.Add(time.Duration(publishPeriod)), "deals", q.deals)
}
}
return nil
}

Expand Down