@@ -267,7 +267,7 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {
267
267
268
268
// Add PSD task - PSD is an exception which is processed for multiple deals at once to save
269
269
// gas cost for PSD messages
270
- err = d .addPSDTask (ctx , deals )
270
+ err = d .addPSDTask (ctx )
271
271
if err != nil {
272
272
log .Errorf ("%w" , err )
273
273
}
@@ -511,7 +511,9 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea
511
511
req .Header = headers
512
512
513
513
// Create a client and make the request
514
- client := & http.Client {}
514
+ client := & http.Client {
515
+ Timeout : 10 * time .Second ,
516
+ }
515
517
resp , err := client .Do (req )
516
518
if err != nil {
517
519
return false , xerrors .Errorf ("error making GET request: %w" , err )
@@ -559,66 +561,59 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea
559
561
return nil
560
562
}
561
563
562
- func (d * CurioStorageDealMarket ) addPSDTask (ctx context.Context , deals []MK12Pipeline ) error {
563
- type queue struct {
564
- deals []string
565
- t time.Time
564
+ func (d * CurioStorageDealMarket ) addPSDTask (ctx context.Context ) error {
565
+ publishPeriod := time .Duration (d .cfg .Market .StorageMarketConfig .MK12 .PublishMsgPeriod )
566
+ maxDeals := d .cfg .Market .StorageMarketConfig .MK12 .MaxDealsPerPublishMsg
567
+
568
+ var eligibleSpIDs []struct {
569
+ SpID int64 `db:"sp_id"`
570
+ EarliestWaitTime time.Time `db:"earliest_wait_time"`
566
571
}
567
572
568
- dm := make (map [int64 ]queue )
573
+ err := d .db .Select (ctx , & eligibleSpIDs , `SELECT sp_id, MIN(psd_wait_time) AS earliest_wait_time
574
+ FROM market_mk12_deal_pipeline
575
+ WHERE started = TRUE
576
+ AND after_commp = TRUE
577
+ AND psd_task_id IS NULL
578
+ AND after_psd = FALSE
579
+ GROUP BY sp_id;` )
580
+ if err != nil {
581
+ return xerrors .Errorf ("getting eligible SPs for psd tasks: %w" , err )
582
+ }
569
583
570
- for _ , deal := range deals {
571
- if deal .Started && deal .AfterCommp && deal .PSDWaitTime != nil && ! deal .AfterPSD && deal .PSDTaskID == nil {
572
- // Check if the spID is already in the map
573
- if q , exists := dm [deal .SpID ]; exists {
574
- // Append the UUID to the deals list
575
- q .deals = append (q .deals , deal .UUID )
576
-
577
- // Update the time if the current deal's time is older
578
- if deal .PSDWaitTime .Before (q .t ) {
579
- q .t = * deal .PSDWaitTime
580
- }
584
+ if len (eligibleSpIDs ) == 0 {
585
+ return nil
586
+ }
581
587
582
- // Update the map with the new queue
583
- dm [deal .SpID ] = q
584
- } else {
585
- // Add a new entry to the map if spID is not present
586
- dm [deal .SpID ] = queue {
587
- deals : []string {deal .UUID },
588
- t : * deal .PSDWaitTime ,
589
- }
588
+ for _ , sp := range eligibleSpIDs {
589
+ if sp .EarliestWaitTime .Add (publishPeriod ).Before (time .Now ()) {
590
+ continue
591
+ }
592
+ d .adders [pollerPSD ].Val (ctx )(func (id harmonytask.TaskID , tx * harmonydb.Tx ) (shouldCommit bool , err error ) {
593
+ n , err := tx .Exec (`WITH deals_to_update AS (
594
+ -- Select only deals that have not been assigned yet
595
+ SELECT uuid
596
+ FROM market_mk12_deal_pipeline
597
+ WHERE sp_id = $1
598
+ AND started = TRUE
599
+ AND after_commp = TRUE
600
+ AND psd_task_id IS NULL -- Ensures only unassigned deals are selected
601
+ AND after_psd = FALSE
602
+ ORDER BY psd_wait_time ASC
603
+ LIMIT $2 -- Limit by maxDeals
604
+ )
605
+ UPDATE market_mk12_deal_pipeline
606
+ SET psd_task_id = $3
607
+ WHERE uuid IN (SELECT uuid FROM deals_to_update)
608
+ AND psd_task_id IS NULL; -- Ensures no overwrite in case another node updated
609
+ ` , sp .SpID , maxDeals , id )
610
+ if err != nil {
611
+ return false , xerrors .Errorf ("creating psd task: %w" , err )
590
612
}
591
- }
613
+ return n > 0 , nil
614
+ })
592
615
}
593
616
594
- publishPeriod := d .cfg .Market .StorageMarketConfig .MK12 .PublishMsgPeriod
595
- maxDeals := d .cfg .Market .StorageMarketConfig .MK12 .MaxDealsPerPublishMsg
596
-
597
- for _ , q := range dm {
598
- if q .t .Add (time .Duration (publishPeriod )).Before (time .Now ()) || uint64 (len (q .deals )) > maxDeals {
599
- d .adders [pollerPSD ].Val (ctx )(func (id harmonytask.TaskID , tx * harmonydb.Tx ) (shouldCommit bool , err error ) {
600
- // update
601
- var n int
602
- for _ , deal := range q .deals {
603
- u , err := tx .Exec (`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
604
- WHERE uuid = $2 AND started = TRUE AND after_commp = TRUE
605
- AND psd_task_id IS NULL` , id , deal )
606
- if err != nil {
607
- return false , xerrors .Errorf ("updating deal pipeline: %w" , err )
608
- }
609
- n += u
610
- }
611
-
612
- if n > 0 {
613
- log .Infof ("PSD task created for %d deals %s" , n , q .deals )
614
- }
615
-
616
- return n > 0 , nil
617
- })
618
- } else {
619
- log .Infow ("PSD task not created as the time is not yet reached" , "time" , q .t .Add (time .Duration (publishPeriod )), "deals" , q .deals )
620
- }
621
- }
622
617
return nil
623
618
}
624
619
0 commit comments