@@ -39,6 +39,7 @@ import (
3939 commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
4040 commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
4141 dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
42+ managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
4243
4344 logger "d7y.io/dragonfly/v2/internal/dflog"
4445 internaljob "d7y.io/dragonfly/v2/internal/job"
@@ -256,15 +257,10 @@ func (j *job) preheatSinglePeer(ctx context.Context, taskID string, req *interna
256257// If all the seed peers download task failed, return error. If some of the seed peers download task failed, return success tasks and failure tasks.
257258// Notify the client that the preheat is successful.
258259func (j * job ) preheatAllSeedPeers (ctx context.Context , taskID string , req * internaljob.PreheatRequest , log * logger.SugaredLoggerOnWith ) (* internaljob.PreheatResponse , error ) {
259- // If seed peer is disabled, return error.
260- if ! j .config .SeedPeer .Enable {
261- return nil , fmt .Errorf ("cluster %d scheduler %s has disabled seed peer" , j .config .Manager .SchedulerClusterID , j .config .Server .AdvertiseIP )
262- }
263-
264260 // If scheduler has no available seed peer, return error.
265- seedPeers := j .resource . SeedPeer (). Client (). SeedPeers ( )
266- if len ( seedPeers ) == 0 {
267- return nil , fmt . Errorf ( "cluster %d scheduler %s has no available seed peer" , j . config . Manager . SchedulerClusterID , j . config . Server . AdvertiseIP )
261+ seedPeers , err := j .selectSeedPeers ( req . Percentage , log )
262+ if err != nil {
263+ return nil , err
268264 }
269265
270266 var (
@@ -401,14 +397,40 @@ func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *inter
401397 return nil , fmt .Errorf ("all peers preheat failed: %s" , msg )
402398}
403399
400+ // selectSeedPeers selects seed peers by percentage.
401+ func (j * job ) selectSeedPeers (percentage * uint8 , log * logger.SugaredLoggerOnWith ) ([]* managerv2.SeedPeer , error ) {
402+ if ! j .config .SeedPeer .Enable {
403+ return nil , fmt .Errorf ("cluster %d scheduler %s has disabled seed peer" , j .config .Manager .SchedulerClusterID , j .config .Server .AdvertiseIP )
404+ }
405+
406+ seedPeers := j .resource .SeedPeer ().Client ().SeedPeers ()
407+ if len (seedPeers ) == 0 {
408+ return nil , fmt .Errorf ("cluster %d scheduler %s has no available seed peer" , j .config .Manager .SchedulerClusterID , j .config .Server .AdvertiseIP )
409+ }
410+
411+ if percentage == nil {
412+ log .Infof ("percentage is nil, select all seed peers, length is %d" , len (seedPeers ))
413+ return seedPeers , nil
414+ }
415+
416+ count := (len (seedPeers ) * int (* percentage )) / 100
417+
418+ // Ensure at least one peer is selected if percentage > 0.
419+ if count == 0 && * percentage > 0 {
420+ count = 1
421+ }
422+
423+ log .Infof ("select %d seed peers from %d seed peers, percentage is %d" , count , len (seedPeers ), * percentage )
424+ return seedPeers [:count ], nil
425+ }
426+
404427// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
405428// If all the peers download task failed, return error. If some of the peers download task failed, return success tasks and
406429// failure tasks. Notify the client that the preheat is successful.
407430func (j * job ) preheatAllPeers (ctx context.Context , taskID string , req * internaljob.PreheatRequest , log * logger.SugaredLoggerOnWith ) (* internaljob.PreheatResponse , error ) {
408- // If scheduler has no available peer, return error.
409- peers := j .resource .HostManager ().LoadAll ()
410- if len (peers ) == 0 {
411- return nil , fmt .Errorf ("cluster %d scheduler %s has no available peer" , j .config .Manager .SchedulerClusterID , j .config .Server .AdvertiseIP )
431+ peers , err := j .selectPeers (req .Percentage , log )
432+ if err != nil {
433+ return nil , err
412434 }
413435
414436 var (
@@ -545,6 +567,29 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
545567 return nil , fmt .Errorf ("all peers preheat failed: %s" , msg )
546568}
547569
570+ // selectPeers selects peers by percentage.
571+ func (j * job ) selectPeers (percentage * uint8 , log * logger.SugaredLoggerOnWith ) ([]* resource.Host , error ) {
572+ peers := j .resource .HostManager ().LoadAll ()
573+ if len (peers ) == 0 {
574+ return nil , fmt .Errorf ("cluster %d scheduler %s has no available peer" , j .config .Manager .SchedulerClusterID , j .config .Server .AdvertiseIP )
575+ }
576+
577+ if percentage == nil {
578+ log .Infof ("percentage is nil, select all peers, length is %d" , len (peers ))
579+ return peers , nil
580+ }
581+
582+ count := (len (peers ) * int (* percentage )) / 100
583+
584+ // Ensure at least one peer is selected if percentage > 0.
585+ if count == 0 && * percentage > 0 {
586+ count = 1
587+ }
588+
589+ log .Infof ("select %d peers from %d peers, percentage is %d" , count , len (peers ), * percentage )
590+ return peers [:count ], nil
591+ }
592+
548593// preheatV1 preheats job by v1 grpc protocol.
549594func (j * job ) preheatV1 (ctx context.Context , taskID string , req * internaljob.PreheatRequest , log * logger.SugaredLoggerOnWith ) (* internaljob.PreheatResponse , error ) {
550595 urlMeta := & commonv1.UrlMeta {
0 commit comments