@@ -539,53 +539,54 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context) error {
539
539
publishPeriod := time .Duration (d .cfg .Market .StorageMarketConfig .MK12 .PublishMsgPeriod )
540
540
maxDeals := d .cfg .Market .StorageMarketConfig .MK12 .MaxDealsPerPublishMsg
541
541
542
- d . adders [ pollerPSD ]. Val ( ctx )( func ( id harmonytask. TaskID , tx * harmonydb. Tx ) ( shouldCommit bool , err error ) {
543
- n , err := tx . Exec ( `WITH grouped_deals AS (
544
- -- Step 1: Group by sp_id and get the earliest psd_wait_time for each sp_id
545
- SELECT
546
- sp_id,
547
- MIN(psd_wait_time) AS earliest_wait_time
542
+ var eligibleSpIDs [] struct {
543
+ SpID int64 `db:"sp_id"`
544
+ EarliestWaitTime time. Time `db:"earliest_wait_time"`
545
+ }
546
+
547
+ err := d . db . Select ( ctx , & eligibleSpIDs , `SELECT sp_id, MIN(psd_wait_time) AS earliest_wait_time
548
548
FROM market_mk12_deal_pipeline
549
549
WHERE started = TRUE
550
550
AND after_commp = TRUE
551
551
AND psd_task_id IS NULL
552
552
AND after_psd = FALSE
553
- GROUP BY sp_id
554
- ),
555
- eligible_sp_ids AS (
556
- -- Step 2: Select sp_ids where at least one deal has waited past publishPeriod
557
- SELECT sp_id
558
- FROM grouped_deals
559
- WHERE earliest_wait_time + INTERVAL '1 second' * $1 < NOW()
560
- ),
561
- eligible_deals AS (
562
- -- Step 3: Select all deals for those sp_ids, ensuring selection is ordered by earliest psd_wait_time
563
- SELECT d.uuid, d.sp_id, d.psd_wait_time
564
- FROM market_mk12_deal_pipeline d
565
- JOIN eligible_sp_ids e ON d.sp_id = e.sp_id
566
- WHERE d.started = TRUE
567
- AND d.after_commp = TRUE
568
- AND d.psd_task_id IS NULL
569
- AND d.after_psd = FALSE
570
- ORDER BY d.psd_wait_time ASC -- Ensures deals are selected based on the earliest time first
571
- ),
572
- deals_to_update AS (
573
- -- Step 4: Select only the first maxDeals deals (ensuring no more than maxDeals are updated)
574
- SELECT uuid
575
- FROM eligible_deals
576
- LIMIT $2
577
- )
578
- -- Step 5: Update only the selected maxDeals
579
- UPDATE market_mk12_deal_pipeline
580
- SET psd_task_id = $3
581
- WHERE uuid IN (SELECT uuid FROM deals_to_update);
582
- ` , publishPeriod .Seconds (), maxDeals , id )
583
- if err != nil {
584
- return false , xerrors .Errorf ("creating psd tasks: %w" , err )
585
- }
553
+ GROUP BY sp_id;` )
554
+ if err != nil {
555
+ return xerrors .Errorf ("getting eligible SPs for psd tasks: %w" , err )
556
+ }
586
557
587
- return n > 0 , nil
588
- })
558
+ if len (eligibleSpIDs ) == 0 {
559
+ return nil
560
+ }
561
+
562
+ for _ , sp := range eligibleSpIDs {
563
+ if sp .EarliestWaitTime .Add (publishPeriod ).Before (time .Now ()) {
564
+ continue
565
+ }
566
+ d .adders [pollerPSD ].Val (ctx )(func (id harmonytask.TaskID , tx * harmonydb.Tx ) (shouldCommit bool , err error ) {
567
+ n , err := tx .Exec (`WITH deals_to_update AS (
568
+ -- Select only deals that have not been assigned yet
569
+ SELECT uuid
570
+ FROM market_mk12_deal_pipeline
571
+ WHERE sp_id = $1
572
+ AND started = TRUE
573
+ AND after_commp = TRUE
574
+ AND psd_task_id IS NULL -- Ensures only unassigned deals are selected
575
+ AND after_psd = FALSE
576
+ ORDER BY psd_wait_time ASC
577
+ LIMIT $2 -- Limit by maxDeals
578
+ )
579
+ UPDATE market_mk12_deal_pipeline
580
+ SET psd_task_id = $3
581
+ WHERE uuid IN (SELECT uuid FROM deals_to_update)
582
+ AND psd_task_id IS NULL; -- Ensures no overwrite in case another node updated
583
+ ` , sp .SpID , maxDeals , id )
584
+ if err != nil {
585
+ return false , xerrors .Errorf ("creating psd task: %w" , err )
586
+ }
587
+ return n > 0 , nil
588
+ })
589
+ }
589
590
590
591
return nil
591
592
}
0 commit comments