5050
5151from .lbsTransferStrategyBase import TransferStrategyBase
5252from ..Model .lbsRank import Rank
53+ from ..Model .lbsObject import Object
5354from ..Model .lbsPhase import Phase
5455
5556
@@ -100,7 +101,7 @@ def __init__(self, criterion, parameters: dict, lgr: Logger):
100101 self .__n_sub_skipped , self .__n_sub_transfers , self .__n_sub_tries = 0 , 0 , 0
101102
102103 def __build_rank_clusters (self , rank : Rank , with_nullset ) -> dict :
103- """Cluster migratiable objects by shared block ID when available."""
104+ """Cluster migratable objects by shared block ID when available."""
104105 # Iterate over all migratable objects on rank
105106 clusters = {None : []} if with_nullset else {}
106107 for o in rank .get_migratable_objects ():
@@ -117,6 +118,104 @@ def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict:
117118 k : clusters [k ]
118119 for k in random .sample (list (clusters .keys ()), len (clusters ))}
119120
121+ def __build_rank_subclusters_heuristic (self , r_src : Rank , r_try : Rank , ave_load : float ) -> set :
122+ """Build a limited set of subclusters using a heuristic to find possible
123+ transfers without a huge amount of computational complexity."""
124+
125+ # Bail out early if no clusters are available
126+ if not (clusters := self .__build_rank_clusters (r_src , False )):
127+ self ._logger .info (f"No migratable clusters on rank { r_src .get_id ()} " )
128+ return []
129+
130+ src_load = r_src .get_load ()
131+ try_load = r_try .get_load ()
132+
133+ subclusters = {}
134+
135+ amount_over_average = src_load - ave_load ;
136+ amount_under_average = ave_load - try_load ;
137+
138+ # print(f"amount_over_average={amount_over_average}, amount_under_average={amount_under_average}")
139+
140+ clusters_to_subcluster = {}
141+
142+ for k in clusters .keys ():
143+ v = clusters [k ]
144+ cluster_load : float = 0.0
145+ for o in v :
146+ cluster_load += o .get_load ()
147+ #print(f"k={k}: cluster_load={cluster_load}")
148+ if cluster_load < amount_over_average or cluster_load < amount_under_average :
149+ pass
150+ else :
151+ clusters_to_subcluster [k ] = v
152+
153+ def comparison (elm : Object ):
154+ return elm .get_load ()
155+
156+ # print(f"clusters_to_subcluster={clusters_to_subcluster}")
157+
158+ for k in clusters_to_subcluster .keys ():
159+ v = clusters_to_subcluster [k ]
160+ # print(f"k={k}")
161+
162+ v .sort (key = comparison )
163+ # print(f"sorted v={v}")
164+
165+ load_sum : float = 0.0
166+ cluster_set : list = []
167+ for o in v :
168+ load_sum += o .get_load ()
169+ cluster_set .append (o )
170+
171+ if load_sum > amount_under_average :
172+ break
173+
174+ subclusters [tuple (cluster_set )] = load_sum
175+
176+ return sorted (subclusters .keys (), key = subclusters .get )
177+
178+ def __transfer_subclusters_heuristic (self , phase : Phase , r_src : Rank , targets : set , ave_load : float , max_load : float ) -> None :
179+ """Perform feasible subcluster transfers from given rank to possible targets."""
180+
181+ # Only do this if this rank is above the mean
182+ if not r_src .get_load () > ave_load :
183+ return
184+
185+ for r_try in targets :
186+ # Only consider transferring here if this rank is under the mean load
187+ if r_try .get_load () < ave_load :
188+ for o_src in self .__build_rank_subclusters_heuristic (r_src , r_try , ave_load ):
189+ c_try = self ._criterion .compute (r_src , o_src , r_try )
190+ #print(f"o_src={o_src}, c_try={c_try}")
191+
192+ objects_load = sum (o .get_load () for o in o_src )
193+ l_dst = math .inf
194+
195+ # Additional filters prior to subclustering
196+ if c_try <= self .__subclustering_minimum_improvement * r_src .get_load () or \
197+ r_src .get_load () < self .__subclustering_threshold * max_load :
198+ continue
199+
200+ l_try = abs (r_try .get_load () + objects_load - ave_load )
201+ if l_try < l_dst :
202+ c_dst , r_dst , l_dst = c_try , r_try , l_try
203+ elif l_try == l_dst and c_try > c_dst :
204+ c_dst , r_dst = c_try , r_try
205+
206+ # Decide whether transfer is beneficial
207+ self .__n_sub_tries += 1
208+ if c_dst > 0.0 :
209+ # Transfer subcluster and break out
210+ self ._logger .info (
211+ f"Transferring subcluster with { len (o_src )} objects to rank { r_dst .get_id ()} " )
212+ self ._n_transfers += phase .transfer_objects (
213+ r_src , o_src , r_dst )
214+ self .__n_sub_transfers += 1
215+ break
216+ # Reject subcluster transfer
217+ self ._n_rejects += len (o_src )
218+
120219 def __build_rank_subclusters (self , r_src : Rank ) -> set :
121220 """Build subclusters to bring rank closest and above average load."""
122221
@@ -146,11 +245,11 @@ def __build_rank_subclusters(self, r_src: Rank) -> set:
146245 for p in range (1 , n_o + 1 )) if self ._deterministic_transfer else (
147246 tuple (random .sample (v , p ))
148247 for p in nr .binomial (n_o , 0.5 , min (n_o , self .__max_subclusters )))):
248+
149249 # Reject subclusters overshooting within relative tolerance
150250 reach_load = src_load - sum (o .get_load () for o in c )
151251 if reach_load < (1.0 - self .__cluster_swap_rtol ) * self ._average_load :
152252 continue
153-
154253 # Retain subclusters with their respective distance and cluster
155254 subclusters [c ] = reach_load
156255
@@ -282,7 +381,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):
282381
283382 if not self .__disable_subclustering :
284383 # Perform feasible subcluster swaps from given rank to possible targets
285- self .__transfer_subclusters (phase , r_src , targets , ave_load , max_load )
384+ self .__transfer_subclusters_heuristic (phase , r_src , targets , ave_load , max_load )
286385
287386 # Report on new load and exit from rank
288387 self ._logger .debug (
@@ -297,7 +396,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):
297396 # Iterate over ranks
298397 for r_src , targets in rank_targets .items ():
299398 # Perform feasible subcluster swaps from given rank to possible targets
300- self .__transfer_subclusters (phase , r_src , targets , ave_load , max_load )
399+ self .__transfer_subclusters_heuristic (phase , r_src , targets , ave_load , max_load )
301400
302401 # Report on new load and exit from rank
303402 self ._logger .debug (
0 commit comments