@@ -179,12 +179,14 @@ struct InformationPropagation {
179179};
180180
181181struct TaskClusterSummaryInfo {
182+ TaskClusterSummaryInfo () = default ;
183+
182184 int cluster_id = -1 ;
183185 int num_tasks_ = 0 ;
184186 double cluster_load = 0.0 ;
185187 double cluster_intra_send_bytes = 0.0 ;
186188 double cluster_intra_recv_bytes = 0.0 ;
187- std::unordered_set <model::Edge> inter_edges_;
189+ std::vector <model::Edge> inter_edges_;
188190
189191 // Memory info
190192 std::unordered_map<model::SharedBlockType, model::BytesType> shared_block_bytes_;
@@ -232,7 +234,7 @@ struct TemperedLB : baselb::BaseLB {
232234
233235 void clusterBasedOnCommunication () {
234236 auto & pd = this ->getPhaseData ();
235- clusterer_ = std::make_unique<LeidenCPMStandaloneClusterer>(pd);
237+ clusterer_ = std::make_unique<LeidenCPMStandaloneClusterer>(pd, 80.0 );
236238 clusterer_->compute ();
237239 }
238240
@@ -242,6 +244,101 @@ struct TemperedLB : baselb::BaseLB {
242244 clusterer_->compute ();
243245 }
244246
247+ void doClustering () {
248+ if (config_.cluster_based_on_communication_ || config_.cluster_based_on_shared_blocks_ ) {
249+ if (config_.cluster_based_on_communication_ ) {
250+ clusterBasedOnCommunication ();
251+ } else if (config_.cluster_based_on_shared_blocks_ ) {
252+ clusterBasedOnSharedBlocks ();
253+ }
254+ }
255+ }
256+
257+ void buildClusterSummaries () {
258+ assert (clusterer_ != nullptr && " Clusterer must be initialized to build summaries" );
259+ auto const & pd = this ->getPhaseData ();
260+ int const rank = comm_.getRank ();
261+
262+ // Task -> local cluster id
263+ auto const & t2c = clusterer_->taskToCluster ();
264+
265+ // Prepare summary per local cluster id
266+ std::unordered_map<int , TaskClusterSummaryInfo> summary_by_local;
267+ for (auto const & cl : clusterer_->clusters ()) {
268+ TaskClusterSummaryInfo info;
269+ info.cluster_id = localToGlobalClusterID (cl.id );
270+ info.num_tasks_ = static_cast <int >(cl.members .size ());
271+ info.cluster_load = cl.load ;
272+ summary_by_local.emplace (cl.id , std::move (info));
273+ }
274+
275+ // Walk communications: accumulate intra send/recv; collect broadened inter edges starting in a cluster
276+ for (auto const & e : pd.getCommunications ()) {
277+ // Only consider edges that involve this rank
278+ if (e.getFromRank () != rank && e.getToRank () != rank) continue ;
279+
280+ auto u = e.getFrom ();
281+ auto v = e.getTo ();
282+ if (!pd.hasTask (u) || !pd.hasTask (v)) continue ;
283+
284+ auto itu = t2c.find (u);
285+ auto itv = t2c.find (v);
286+ int cu = (itu != t2c.end ()) ? itu->second : -1 ; // local cluster id or -1 if unclustered
287+ int cv = (itv != t2c.end ()) ? itv->second : -1 ;
288+ auto vol = e.getVolume ();
289+
290+ // Intra-cluster: both endpoints mapped and equal -> accumulate send/recv
291+ if (cu != -1 && cv != -1 && cu == cv) {
292+ auto & sum = summary_by_local.at (cu);
293+ if (e.getFromRank () == rank) {
294+ sum.cluster_intra_send_bytes += vol;
295+ }
296+ if (e.getToRank () == rank) {
297+ sum.cluster_intra_recv_bytes += vol;
298+ }
299+ continue ;
300+ }
301+
302+ // Broadened "inter" edges: if the edge starts in a cluster on this rank, add to that cluster
303+ // Conditions:
304+ // - source endpoint (u) is clustered locally (cu != -1)
305+ // - source is on this rank (e.getFromRank() == rank)
306+ // - destination is:
307+ // * in a different local cluster (cv == -1 or cv != cu), or
308+ // * on a different rank (e.getToRank() != rank)
309+ if (e.getFromRank () == rank && cu != -1 ) {
310+ bool dest_is_external =
311+ (cv == -1 ) || (cv != cu) || (e.getToRank () != rank);
312+ if (dest_is_external) {
313+ summary_by_local.at (cu).inter_edges_ .push_back (e);
314+ }
315+ }
316+
317+ // Enable vice-versa logic to capture edges entering a cluster
318+ // Conditions:
319+ // - destination endpoint (v) is clustered locally (cv != -1)
320+ // - destination is on this rank (e.getToRank() == rank)
321+ // - source is:
322+ // * in a different local cluster (cu == -1 or cu != cv), or
323+ // * on a different rank (e.getFromRank() != rank)
324+ if (e.getToRank () == rank && cv != -1 ) {
325+ bool src_is_external = (cu == -1 ) || (cu != cv) || (e.getFromRank () != rank);
326+ if (src_is_external) {
327+ summary_by_local.at (cv).inter_edges_ .push_back (e);
328+ }
329+ }
330+ }
331+
332+ // Emit summaries
333+ for (auto const & cl : clusterer_->clusters ()) {
334+ auto const & sum = summary_by_local.at (cl.id );
335+ printf (" %d: buildClusterSummaries cluster %d size=%zu load=%.2f intra_send=%.2f intra_recv=%.2f inter_edges=%zu\n " ,
336+ rank, cl.id , cl.members .size (), cl.load ,
337+ sum.cluster_intra_send_bytes , sum.cluster_intra_recv_bytes ,
338+ sum.inter_edges_ .size ());
339+ }
340+ }
341+
245342 void makeCommunicationsSymmetric () {
246343 CommunicationsSymmetrizer<CommT> symm (comm_, this ->getPhaseData ());
247344 symm.run ();
@@ -267,19 +364,24 @@ struct TemperedLB : baselb::BaseLB {
267364 }
268365
269366 void run () {
367+ for (int trial = 0 ; trial < config_.num_trials_ ; ++trial) {
368+ printf (" %d: Starting trial %d/%d\n " , comm_.getRank (), trial + 1 , config_.num_trials_ );
369+ runTrial ();
370+ }
371+ }
372+
373+ void runTrial () {
374+ // Save a clone of the phase data before load balancing
375+ savePhaseData ();
376+
270377 auto total_load = computeLoad ();
271378 printf (" %d: initial total load: %f, num tasks: %zu\n " , comm_.getRank (), total_load, numTasks ());
272379
273380 // Make communications symmetric before distributed decisions
274381 makeCommunicationsSymmetric ();
275382
276- if (config_.cluster_based_on_communication_ || config_.cluster_based_on_shared_blocks_ ) {
277- if (config_.cluster_based_on_communication_ ) {
278- clusterBasedOnCommunication ();
279- } else if (config_.cluster_based_on_shared_blocks_ ) {
280- clusterBasedOnSharedBlocks ();
281- }
282- }
383+ // Run the clustering algorithm if appropiate for the configuration
384+ doClustering ();
283385
284386 // Generate visualization after symmetrization/clustering
285387 visualizeGraph (" temperedlb2" );
@@ -302,7 +404,23 @@ struct TemperedLB : baselb::BaseLB {
302404#else
303405 // Just assume max of 1000 clusters per rank for now, until we have bcast
304406#endif
407+ if (clusterer_ != nullptr ) {
408+ buildClusterSummaries ();
409+ }
305410 }
411+
412+ // Before we restore phase data for the next trial, save the work and task distribution
413+ // @todo: for now, we recompute work from scratch but we probably can use the breakdown
414+ trial_work_distribution_.emplace_back (
415+ WorkModelCalculator::computeWork (
416+ config_.work_model_ ,
417+ WorkModelCalculator::computeWorkBreakdown (this ->getPhaseData ())
418+ ),
419+ this ->getPhaseData ().getTaskIds ()
420+ );
421+
422+ // Restore phase data
423+ restorePhaseData ();
306424 }
307425
308426 Clusterer const * getClusterer () const { return clusterer_.get (); }
@@ -350,6 +468,8 @@ struct TemperedLB : baselb::BaseLB {
350468 std::unique_ptr<Clusterer> clusterer_;
351469 // / @brief Global maximum number of clusters across all ranks
352470 int global_max_clusters_ = 1000 ;
471+ // / @brief Task distribution and work for each trial
472+ std::vector<std::tuple<double , std::unordered_set<model::TaskType>>> trial_work_distribution_;
353473};
354474
355475} /* end namespace vt_lb::algo::temperedlb */
0 commit comments