diff --git a/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h b/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h index cf9682b938..37fcc1ded1 100644 --- a/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h +++ b/src/vt/vrt/collection/balance/temperedlb/tempered_msgs.h @@ -57,8 +57,11 @@ using BytesType = double; struct ClusterInfo { LoadType load = 0; BytesType bytes = 0; + SharedIDType shared_id = -1; double intra_send_vol = 0, intra_recv_vol = 0; - std::unordered_map inter_send_vol, inter_recv_vol; + std::unordered_map inter_cluster_send_vol, + inter_cluster_recv_vol; + std::unordered_map obj_send_vol, obj_recv_vol; NodeType home_node = uninitialized_destination; BytesType edge_weight = 0; BytesType max_object_working_bytes = 0; @@ -67,10 +70,35 @@ struct ClusterInfo { BytesType max_object_serialized_bytes_outside = 0; BytesType cluster_footprint = 0; + void addInterClusterEdge(bool is_send, SharedIDType id, double volume) { + if (is_send) { + inter_cluster_send_vol[id] += volume; + } else { + inter_cluster_recv_vol[id] += volume; + } + } + + void addIntraVolume(bool is_send, double volume) { + if (is_send) { + intra_send_vol += volume; + } else { + intra_recv_vol += volume; + } + } + + void addObjEdge(bool is_send, elm::ElementIDStruct obj, double volume) { + if (is_send) { + obj_send_vol[obj] += volume; + } else { + obj_recv_vol[obj] += volume; + } + } + template void serialize(SerializerT& s) { - s | load | bytes | intra_send_vol | intra_recv_vol; - s | inter_send_vol | inter_recv_vol; + s | load | shared_id | bytes | intra_send_vol | intra_recv_vol; + s | inter_cluster_send_vol | inter_cluster_recv_vol; + s | obj_send_vol | obj_recv_vol; s | home_node | edge_weight; s | max_object_working_bytes; s | max_object_working_bytes_outside; @@ -86,6 +114,8 @@ struct NodeInfo { double inter_send_vol = 0, inter_recv_vol = 0; double intra_send_vol = 0, intra_recv_vol = 0; double shared_vol = 0; + std::set shared_ids; + std::set non_cluster_objs; template void serialize(SerializerT& s) { @@ -93,6 +123,8 @@ struct NodeInfo { s | inter_send_vol | inter_recv_vol; s | intra_send_vol | intra_recv_vol; s | shared_vol; + s | shared_ids; + s | non_cluster_objs; } }; diff --git a/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc b/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc index e6344dca2d..1c8ba29fbf 100644 --- a/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc +++ b/src/vt/vrt/collection/balance/temperedlb/temperedlb.cc @@ -275,7 +275,7 @@ Default: false "memory_threshold", R"( Values: -Defaut: 0 +Default: 0 Description: The memory threshold TemperedLB should strictly stay under which is respected if memory information is present in the user-defined data. )" @@ -284,7 +284,7 @@ respected if memory information is present in the user-defined data. "alpha", R"( Values: -Defaut: 1.0 +Default: 1.0 Description: α in the work model (load in work model) )" }, @@ -292,7 +292,7 @@ Description: α in the work model (load in work model) "beta", R"( Values: -Defaut: 0.0 +Default: 0.0 Description: β in the work model (inter-node communication in work model) )" }, @@ -300,7 +300,7 @@ Description: β in the work model (inter-node communication in work model) "gamma", R"( Values: -Defaut: 0.0 +Default: 0.0 Description: γ in the work model (intra-node communication in work model) )" }, @@ -308,7 +308,7 @@ Description: γ in the work model (intra-node communication in work model) "delta", R"( Values: -Defaut: 0.0 +Default: 0.0 Description: δ in the work model (shared-memory-edges in work model) )" }, @@ -316,8 +316,17 @@ Description: δ in the work model (shared-memory-edges in work model) "epsilon", R"( Values: -Defaut: infinity +Default: infinity Description: ε in the work model (memory term in work model) +)" + }, + { + "converge_tolerance", + R"( +Values: +Default: 0.01 +Description: The relative tolerance across a window of 8 iterations of work +to define convergence. )" } }; @@ -388,10 +397,13 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { f_ = static_cast(std::ceil(std::pow(num_nodes, 1.0/k_max_))); } else { // set both the fanout and the rounds - k_max_ = static_cast(std::max(1.0, - std::round(std::sqrt(std::log(num_nodes)/std::log(2.0))) - )); - f_ = static_cast(std::ceil(std::pow(num_nodes, 1.0/k_max_))); + f_ = static_cast(2); + k_max_ = std::max( + static_cast(1), + static_cast( + std::ceil(std::sqrt(std::log(num_nodes)/std::log(2.0))) + ) + ); } } else if (knowledge_ == KnowledgeEnum::Complete) { f_ = num_nodes - 1; @@ -432,6 +444,10 @@ void TemperedLB::inputParams(balance::ConfigEntry* config) { target_pole_ = config->getOrDefault("targetpole", target_pole_); mem_thresh_ = config->getOrDefault("memory_threshold", mem_thresh_); + converge_tolerance_ = config->getOrDefault( + "converge_tolerance", converge_tolerance_ + ); + balance::LBArgsEnumConverter criterion_converter_( "criterion", "CriterionEnum", { {CriterionEnum::Grapevine, "Grapevine"}, @@ -545,9 +561,9 @@ void TemperedLB::runLB(LoadType total_load) { if (theContext()->getNode() == 0) { vt_debug_print( terse, temperedlb, - "TemperedLB::runLB: avg={}, max={}, pole={}, imb={}, load={}, should_lb={}\n", + "TemperedLB::runLB: avg={}, max={}, pole={}, imb={}, load={}, should_lb={}, memory_threshold={}, converge_tolerance={}\n", LoadType(avg), LoadType(max), LoadType(pole), imb, - LoadType(load), should_lb + LoadType(load), should_lb, mem_thresh_, converge_tolerance_ ); if (!should_lb) { @@ -560,18 +576,17 @@ void TemperedLB::runLB(LoadType total_load) { // Perform load rebalancing when deemed necessary if (should_lb) { -#if vt_check_enabled(trace_enabled) - theTrace()->disableTracing(); -#endif + lb_stages_epoch_ = theTerm()->makeEpochCollective("doLBStages"); - runInEpochCollective("doLBStages", [&,this]{ - auto this_node = theContext()->getNode(); - proxy_[this_node].template send<&TemperedLB::doLBStages>(imb); - }); + auto this_node = theContext()->getNode(); + proxy_[this_node].template send<&TemperedLB::doLBStages>(imb); -#if vt_check_enabled(trace_enabled) - theTrace()->enableTracing(); -#endif + // Disable epoch manually so propagation doesn't continue while we know it + // can't finish + theTerm()->finishedEpoch(lb_stages_epoch_); + theTerm()->disableTD(lb_stages_epoch_); + theTerm()->disableTD(vt::term::any_epoch_sentinel); + theTerm()->setLocalTerminated(false, true); } } @@ -638,8 +653,51 @@ void TemperedLB::readClustersMemoryData() { } } +void TemperedLB::makeClusterSummaryAddEdges( + SharedIDType shared_id, ClusterInfo& info, + std::set const& cluster_objs, + ObjIDType obj, bool is_send, + std::vector> const& edges +) { + for (auto const& [send_or_recv_obj, volume] : edges) { + vt_debug_print( + verbose, temperedlb, + "computeClusterSummary: shared_id={} send obj={}, recv_obj={}\n", + shared_id, + is_send ? obj : send_or_recv_obj, + is_send ? send_or_recv_obj : obj + ); + + if (cluster_objs.find(send_or_recv_obj) != cluster_objs.end()) { + // intra-cluster edge + info.addIntraVolume(is_send, volume); + + // inter-cluster edge + info.addInterClusterEdge(is_send, shared_id, volume); + } else if ( + auto it2 = obj_shared_block_.find(send_or_recv_obj); + it2 != obj_shared_block_.end() + ) { + // inter-cluster edge + info.addInterClusterEdge(is_send, it2->second, volume); + + vt_debug_print( + verbose, temperedlb, + "computeClusterSummary: ADDING inter shared_id={} send obj={}, " + "recv_obj={}\n", + shared_id, + is_send ? obj : send_or_recv_obj, + is_send ? send_or_recv_obj : obj + ); + + } else { + // across-object edge not part of a cluster + info.addObjEdge(is_send, send_or_recv_obj, volume); + } + } +} + ClusterInfo TemperedLB::makeClusterSummary(SharedIDType shared_id) { - auto const this_node = theContext()->getNode(); auto const& [home_node, shared_volume] = shared_block_edge_[shared_id]; auto const shared_bytes = shared_block_size_[shared_id]; @@ -647,6 +705,7 @@ ClusterInfo TemperedLB::makeClusterSummary(SharedIDType shared_id) { info.bytes = shared_bytes; info.home_node = home_node; info.edge_weight = shared_volume; + info.shared_id = shared_id; std::set cluster_objs; BytesType max_object_working_bytes = 0; @@ -712,49 +771,14 @@ ClusterInfo TemperedLB::makeClusterSummary(SharedIDType shared_id) { if (info.load != 0) { for (auto&& obj : cluster_objs) { if (auto it = send_edges_.find(obj); it != send_edges_.end()) { - for (auto const& [target, volume] : it->second) { - vt_debug_print( - verbose, temperedlb, - "computeClusterSummary: send obj={}, target={}\n", - obj, target - ); - - if (cluster_objs.find(target) != cluster_objs.end()) { - // intra-cluster edge - info.intra_send_vol += volume; - } else if ( - cur_objs_.find(target) != cur_objs_.end() or - target.isLocatedOnThisNode() - ) { - // intra-rank edge - info.inter_send_vol[this_node] += volume; - } else { - // inter-rank edge - info.inter_send_vol[target.getCurrNode()] += volume; - } - } + makeClusterSummaryAddEdges( + shared_id, info, cluster_objs, obj, true, it->second + ); } if (auto it = recv_edges_.find(obj); it != recv_edges_.end()) { - for (auto const& [target, volume] : it->second) { - vt_debug_print( - verbose, temperedlb, - "computeClusterSummary: recv obj={}, target={}\n", - obj, target - ); - if (cluster_objs.find(target) != cluster_objs.end()) { - // intra-cluster edge - info.intra_recv_vol += volume; - } else if ( - cur_objs_.find(target) != cur_objs_.end() or - target.isLocatedOnThisNode() - ) { - // intra-rank edge - info.inter_recv_vol[this_node] += volume; - } else { - // inter-rank edge - info.inter_recv_vol[target.getCurrNode()] += volume; - } - } + makeClusterSummaryAddEdges( + shared_id, info, cluster_objs, obj, false, it->second + ); } } } @@ -855,6 +879,7 @@ void TemperedLB::workStatsHandler(std::vector const& vec) { work_mean_ = work.avg(); work_max_ = work.max(); new_work_imbalance_ = work.I(); + work_stats_handler_ = true; } double TemperedLB::computeWork( @@ -874,7 +899,7 @@ WorkBreakdown TemperedLB::computeWorkBreakdown( std::unordered_map const& objs, std::set const& exclude, std::unordered_map const& include -) { +) const { double load = 0; // Communication bytes sent/recv'ed within the rank @@ -886,14 +911,9 @@ WorkBreakdown TemperedLB::computeWorkBreakdown( if (exclude.find(obj) == exclude.end()) { if (auto it = send_edges_.find(obj); it != send_edges_.end()) { for (auto const& [target, volume] : it->second) { - vt_debug_print( - verbose, temperedlb, - "computeWorkBreakdown: send obj={}, target={}\n", - obj, target - ); if ( - cur_objs_.find(target) != cur_objs_.end() or - target.isLocatedOnThisNode() + objs.find(target) != objs.end() or + non_cluster_objs_.find(target) != non_cluster_objs_.end() ) { intra_rank_bytes_sent += volume; } else { @@ -903,14 +923,9 @@ WorkBreakdown TemperedLB::computeWorkBreakdown( } if (auto it = recv_edges_.find(obj); it != recv_edges_.end()) { for (auto const& [target, volume] : it->second) { - vt_debug_print( - verbose, temperedlb, - "computeWorkBreakdown: recv obj={}, target={}\n", - obj, target - ); if ( - cur_objs_.find(target) != cur_objs_.end() or - target.isLocatedOnThisNode() + objs.find(target) != objs.end() or + non_cluster_objs_.find(target) != non_cluster_objs_.end() ) { intra_rank_bytes_recv += volume; } else { @@ -972,73 +987,184 @@ WorkBreakdown TemperedLB::computeWorkBreakdown( double TemperedLB::computeWorkAfterClusterSwap( NodeType node, NodeInfo const& info, ClusterInfo const& to_remove, ClusterInfo const& to_add -) { +) const { // Start with the existing work for the node and work backwards to compute the // new work with the cluster removed double node_work = info.work; + double node_intra_send = info.intra_send_vol; + double node_intra_recv = info.intra_recv_vol; + double node_inter_send = info.inter_send_vol; + double node_inter_recv = info.inter_recv_vol; + + vt_debug_print( + verbose, temperedlb, + "computeWorkAfterClusterSwap: node_work={}, to_remove.load={}, intra={}, " + "inter={}\n", + node_work, + to_remove.load, + gamma * std::max(node_intra_send, node_intra_recv), + beta * std::max(node_inter_send, node_inter_recv) + ); + // Remove/add clusters' load factor from work model node_work -= alpha * to_remove.load; node_work += alpha * to_add.load; - // Remove/add clusters' intra-comm - double const node_intra_send = info.intra_send_vol; - double const node_intra_recv = info.intra_recv_vol; + // Subtract out these factors to adjust them based on new situation node_work -= gamma * std::max(node_intra_send, node_intra_recv); - node_work += gamma * std::max( - node_intra_send - to_remove.intra_send_vol + to_add.intra_send_vol, - node_intra_recv - to_remove.intra_recv_vol + to_add.intra_recv_vol + + vt_debug_print( + verbose, temperedlb, + "node_work (after gamma)={}, sub off={} {} {}\n", + node_work, std::max(node_inter_send, node_inter_recv), + beta, beta * std::max(node_inter_send, node_inter_recv) ); - // Uninitialized destination means that the cluster is empty - // If to_remove was remote, remove that component from the work - if ( - to_remove.home_node != node and - to_remove.home_node != uninitialized_destination - ) { - node_work -= delta * to_remove.edge_weight; + node_work -= beta * std::max(node_inter_send, node_inter_recv); + + auto cur_shared_ids = info.shared_ids; + + // All edges outside the to_remove cluster that are also off the node need to + // be removed from the inter-node volume, otherwise needs to be removed from + // intra-node volume. + for (auto const& [shared_id, volume] : to_remove.inter_cluster_send_vol) { + // Local cluster edge if it's in our shared IDs, otherwise it's remote + if (shared_id == to_remove.shared_id) { + node_intra_send -= volume; + } else if (cur_shared_ids.find(shared_id) != cur_shared_ids.end()) { + // Send edge to cluster on this rank + node_intra_send -= volume; + node_intra_recv -= volume; + + node_inter_recv += volume; + } else { + // Send edge to cluster *not* on this rank + node_inter_send -= volume; + } } - // If to_add is now remote, add that component to the work - if ( - to_add.home_node != node and - to_add.home_node != uninitialized_destination - ) { - node_work += delta * to_add.edge_weight; + for (auto const& [shared_id, volume] : to_remove.inter_cluster_recv_vol) { + // Local cluster edge if it's in our shared IDs, otherwise it's remote + if (shared_id == to_remove.shared_id) { + node_intra_recv -= volume; + } else if (cur_shared_ids.find(shared_id) != cur_shared_ids.end()) { + // Recv edge from cluster on this rank + node_intra_recv -= volume; + node_intra_send -= volume; + + node_inter_send += volume; + } else { + // Recv edge from cluster *not* on this rank + node_inter_recv -= volume; + } } - // Update formulae for inter-node communication - double node_inter_send = info.inter_send_vol; - double node_inter_recv = info.inter_recv_vol; - node_work -= beta * std::max(node_inter_send, node_inter_recv); + for (auto const& [recv_obj, volume] : to_remove.obj_send_vol) { + if (info.non_cluster_objs.find(recv_obj) != info.non_cluster_objs.end()) { + node_intra_recv -= volume; + node_intra_send -= volume; - // All edges outside the to_remove cluster that are also off the node need to - // be removed from the inter-node volumes - for (auto const& [target, volume] : to_remove.inter_send_vol) { - if (target != node) { + node_inter_recv += volume; + } else { node_inter_send -= volume; } } - for (auto const& [target, volume] : to_remove.inter_recv_vol) { - if (target != node) { + + for (auto const& [send_obj, volume] : to_remove.obj_recv_vol) { + if (info.non_cluster_objs.find(send_obj) != info.non_cluster_objs.end()) { + node_intra_recv -= volume; + node_intra_send -= volume; + + node_inter_send += volume; + } else { node_inter_recv -= volume; } } - // All edges outside the to_add cluster that are now off the node need to - // be added from the inter-node volumes - for (auto const& [target, volume] : to_add.inter_send_vol) { - if (target != node) { + // Remove from list of shared IDs for add calculation + if (to_remove.shared_id != no_shared_id) { + cur_shared_ids.erase(cur_shared_ids.find(to_remove.shared_id)); + } + + ////////////////////////////////////////////////////////////////////////////// + + for (auto const& [shared_id, volume] : to_add.inter_cluster_send_vol) { + // Local cluster edge if it's in our shared IDs, otherwise it's remote + if (shared_id == to_add.shared_id) { + node_intra_send += volume; + } else if (cur_shared_ids.find(shared_id) != cur_shared_ids.end()) { + node_intra_send += volume; + node_intra_recv += volume; + + node_inter_recv -= volume; + } else { node_inter_send += volume; } } - for (auto const& [target, volume] : to_add.inter_recv_vol) { - if (target != node) { + + for (auto const& [shared_id, volume] : to_add.inter_cluster_recv_vol) { + // Local cluster edge if it's in our shared IDs, otherwise it's remote + if (shared_id == to_add.shared_id) { + node_intra_recv += volume; + } else if (cur_shared_ids.find(shared_id) != cur_shared_ids.end()) { + node_intra_recv += volume; + node_intra_send += volume; + + node_inter_send -= volume; + } else { + node_inter_recv += volume; + } + } + + for (auto const& [recv_obj, volume] : to_add.obj_send_vol) { + if (info.non_cluster_objs.find(recv_obj) != info.non_cluster_objs.end()) { + node_intra_send += volume; + node_intra_recv += volume; + + node_inter_recv -= volume; + } else { + node_inter_send += volume; + } + } + + for (auto const& [send_obj, volume] : to_add.obj_recv_vol) { + if (info.non_cluster_objs.find(send_obj) != info.non_cluster_objs.end()) { + node_intra_recv += volume; + node_intra_send += volume; + + node_inter_send -= volume; + } else { node_inter_recv += volume; } } - node_work += beta * std::max(node_inter_send, node_inter_recv); + vt_debug_print( + verbose, temperedlb, + "node_work={}, intra {} {} inter {} {}\n", + node_work, + node_intra_send, node_intra_recv, node_inter_send, node_inter_recv + ); + + node_work += gamma * std::max(node_intra_send, node_intra_recv); + node_work += beta * std::max(node_inter_send, node_inter_recv); + + // Uninitialized destination means that the cluster is empty + // If to_remove was remote, remove that component from the work + if ( + to_remove.home_node != node and + to_remove.home_node != uninitialized_destination + ) { + node_work -= delta * to_remove.edge_weight; + } + + // If to_add is now remote, add that component to the work + if ( + to_add.home_node != node and + to_add.home_node != uninitialized_destination + ) { + node_work += delta * to_add.edge_weight; + } return node_work; } @@ -1051,6 +1177,8 @@ void TemperedLB::doLBStages(LoadType start_imb) { auto this_node = theContext()->getNode(); + double start_time = MPI_Wtime(); + // Read in memory information if it's available before we do any trials readClustersMemoryData(); @@ -1067,16 +1195,20 @@ void TemperedLB::doLBStages(LoadType start_imb) { max_load_over_iters_.clear(); is_overloaded_ = is_underloaded_ = false; ready_to_satisfy_locks_ = false; + last_n_work.clear(); LoadType best_imb_this_trial = start_imb + 10; for (iter_ = 0; iter_ < num_iters_; iter_++) { bool first_iter = iter_ == 0; iter_time_ = MPI_Wtime(); + done_with_swaps_ = false; + props_done_ = false; if (first_iter) { // Copy this node's object assignments to a local, mutable copy cur_objs_.clear(); + non_cluster_objs_.clear(); int total_num_objs = 0; int num_migratable_objs = 0; for (auto obj : *load_model_) { @@ -1084,6 +1216,11 @@ void TemperedLB::doLBStages(LoadType start_imb) { if (obj.isMigratable()) { num_migratable_objs++; cur_objs_[obj] = getModeledValue(obj); + if (obj_shared_block_.find(obj) == obj_shared_block_.end()) { + non_cluster_objs_.insert(obj); + } + } else { + non_cluster_objs_.insert(obj); } } @@ -1096,10 +1233,7 @@ void TemperedLB::doLBStages(LoadType start_imb) { send_edges_.clear(); recv_edges_.clear(); bool has_comm = false; - auto const& comm = load_model_->getComm( - {balance::PhaseOffset::NEXT_PHASE, balance::PhaseOffset::WHOLE_PHASE} - ); - // vt_print(temperedlb, "comm size={} {}\n", comm.size(), typeid(load_model_).name()); + auto const& comm = *comm_data; for (auto const& [key, volume] : comm) { // vt_print(temperedlb, "Found comm: volume={}\n", volume.bytes); @@ -1113,9 +1247,13 @@ void TemperedLB::doLBStages(LoadType start_imb) { auto const to_obj = key.toObj(); auto const bytes = volume.bytes; - send_edges_[from_obj].emplace_back(to_obj, bytes); - recv_edges_[to_obj].emplace_back(from_obj, bytes); - has_comm = true; + if (from_obj.isMigratable() or to_obj.isMigratable()) { + //vt_print(temperedlb, "Found comm: to={}, from={} volume={}\n", to_obj, from_obj, volume.bytes); + + send_edges_[from_obj].emplace_back(to_obj, bytes); + recv_edges_[to_obj].emplace_back(from_obj, bytes); + has_comm = true; + } } else if (key.commCategory() == elm::CommCategory::WriteShared) { auto const to_node = key.toNode(); auto const shared_id = key.sharedID(); @@ -1138,8 +1276,19 @@ void TemperedLB::doLBStages(LoadType start_imb) { if (has_comm_any_) { runInEpochCollective("symmEdges", [&]{ std::unordered_map edges; + std::unordered_map obj_cluster_id; for (auto const& [from_obj, to_edges] : send_edges_) { + if (from_obj.getCurrNode() == this_node) { + if ( + auto it = obj_shared_block_.find(from_obj); + it != obj_shared_block_.end() + ) { + obj_cluster_id[from_obj] = it->second; + } else { + obj_cluster_id[from_obj] = -1; + } + } for (auto const& [to_obj, volume] : to_edges) { vt_debug_print( verbose, temperedlb, @@ -1154,11 +1303,23 @@ void TemperedLB::doLBStages(LoadType start_imb) { if (curr_to_node != this_node) { edges[curr_to_node][from_obj].emplace_back(to_obj, volume); } + if (to_obj.getCurrNode() == this_node) { + if ( + auto it = obj_shared_block_.find(to_obj); + it != obj_shared_block_.end() + ) { + obj_cluster_id[to_obj] = it->second; + } else { + obj_cluster_id[to_obj] = -1; + } + } } } for (auto const& [dest_node, edge_map] : edges) { - proxy_[dest_node].template send<&TemperedLB::giveEdges>(edge_map); + proxy_[dest_node].template send<&TemperedLB::giveEdges>( + edge_map, obj_cluster_id + ); } }); } @@ -1167,16 +1328,15 @@ void TemperedLB::doLBStages(LoadType start_imb) { this_new_breakdown_ = computeWorkBreakdown(this_node, cur_objs_); this_work = this_new_work_ = this_new_breakdown_.work; - runInEpochCollective("TemperedLB::doLBStages -> Rank_load_modeled", [=] { - // Perform the reduction for Rank_load_modeled -> processor load only - proxy_.allreduce<&TemperedLB::workStatsHandler, collective::PlusOp>( - std::vector{ - {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}}, - {balance::LoadData{Statistic::Rank_strategy_specific_load_modeled, this_new_work_}} - } - ); - }); - + work_stats_handler_ = false; + // Perform the reduction for Rank_load_modeled -> processor load only + proxy_.allreduce<&TemperedLB::workStatsHandler, collective::PlusOp>( + std::vector{ + {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}}, + {balance::LoadData{Statistic::Rank_strategy_specific_load_modeled, this_new_work_}} + } + ); + theSched()->runSchedulerWhile([this]{ return not work_stats_handler_; }); } else { // Clear out data structures from previous iteration selected_.clear(); @@ -1200,7 +1360,7 @@ void TemperedLB::doLBStages(LoadType start_imb) { ); if (has_memory_data_) { - double const memory_usage = computeMemoryUsage(); + double memory_usage = computeMemoryUsage(); vt_debug_print( normal, temperedlb, @@ -1216,15 +1376,6 @@ void TemperedLB::doLBStages(LoadType start_imb) { if (iter_ == 0) { computeClusterSummary(); } - - // Verbose printing about local clusters - for (auto const& [shared_id, cluster_info] : cur_clusters_) { - vt_debug_print( - verbose, temperedlb, - "Local cluster: id={}: {}\n", - shared_id, cluster_info - ); - } } if (isOverloaded(this_new_load_)) { @@ -1245,18 +1396,6 @@ void TemperedLB::doLBStages(LoadType start_imb) { vtAbort("TemperedLB:: Unsupported inform type"); } - // Some very verbose printing about all remote clusters we know about that - // we can shut off later - for (auto const& [node, clusters] : other_rank_clusters_) { - for (auto const& [shared_id, cluster_info] : clusters) { - vt_debug_print( - verbose, temperedlb, - "Remote cluster: node={}, id={}, {}\n", - node, shared_id, cluster_info - ); - } - } - // Move remote cluster information to shared_block_size_ so we have all // the sizes in the same place for (auto const& [node, clusters] : other_rank_clusters_) { @@ -1282,6 +1421,8 @@ void TemperedLB::doLBStages(LoadType start_imb) { vtAbort("TemperedLB:: Unsupported transfer type"); } + total_num_iters_++; + vt_debug_print( verbose, temperedlb, "TemperedLB::doLBStages: (after) running trial={}, iter={}, " @@ -1298,18 +1439,20 @@ void TemperedLB::doLBStages(LoadType start_imb) { ) { this_new_breakdown_ = computeWorkBreakdown(this_node, cur_objs_); this_new_work_ = this_new_breakdown_.work; - runInEpochCollective("TemperedLB::doLBStages -> Rank_load_modeled", [=] { - // Perform the reduction for Rank_load_modeled -> processor load only - proxy_.allreduce<&TemperedLB::loadStatsHandler, collective::PlusOp>( - std::vector{ - {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}}, - {balance::LoadData{Statistic::Rank_strategy_specific_load_modeled, this_new_work_}} - } - ); - }); + load_stats_handler_ = false; + // Perform the reduction for Rank_load_modeled -> processor load only + proxy_.allreduce<&TemperedLB::loadStatsHandler, collective::PlusOp>( + std::vector{ + {balance::LoadData{Statistic::Rank_load_modeled, this_new_load_}}, + {balance::LoadData{Statistic::Rank_strategy_specific_load_modeled, this_new_work_}} + } + ); + theSched()->runSchedulerWhile([this]{ return not load_stats_handler_; }); } - if (rollback_ || (iter_ == num_iters_ - 1)) { + bool const converged = checkConvergence(); + + if (rollback_ || (iter_ == num_iters_ - 1) || converged) { // if known, save the best iteration within any trial so we can roll back if (new_imbalance_ < best_imb && new_imbalance_ <= start_imb) { best_load = this_new_load_; @@ -1321,6 +1464,10 @@ void TemperedLB::doLBStages(LoadType start_imb) { best_imb_this_trial = new_imbalance_; } } + + if (converged) { + break; + } } if (this_node == 0) { @@ -1333,6 +1480,7 @@ void TemperedLB::doLBStages(LoadType start_imb) { // Clear out for next try or for not migrating by default cur_objs_.clear(); + non_cluster_objs_.clear(); send_edges_.clear(); recv_edges_.clear(); this_new_load_ = this_load; @@ -1355,10 +1503,13 @@ void TemperedLB::doLBStages(LoadType start_imb) { // Skip this block when not using SwapClusters if (transfer_type_ == TransferTypeEnum::SwapClusters) { auto remote_block_count = getRemoteBlockCountHere(); - runInEpochCollective("TemperedLB::doLBStages -> compute unhomed", [=] { - proxy_.allreduce<&TemperedLB::remoteBlockCountHandler, - collective::PlusOp>(remote_block_count); - }); + compute_unhomed_done_ = false; + proxy_.reduce< + &TemperedLB::remoteBlockCountHandler, collective::PlusOp + >(proxy_[0], remote_block_count); + if (this_node == 0) { + theSched()->runSchedulerWhile([this]{ return not compute_unhomed_done_; }); + } } } else if (this_node == 0) { vt_debug_print( @@ -1367,18 +1518,60 @@ void TemperedLB::doLBStages(LoadType start_imb) { ); } + double end_time = MPI_Wtime(); + proxy_.allreduce<&TemperedLB::timeLB, + collective::MaxOp>(end_time-start_time); + // Concretize lazy migrations by invoking the BaseLB object migration on new // object node assignments thunkMigrations(); + + theTerm()->setLocalTerminated(true, false); + theTerm()->enableTD(lb_stages_epoch_); + theTerm()->enableTD(vt::term::any_epoch_sentinel); + vt::runSchedulerThrough(lb_stages_epoch_); +} + +bool TemperedLB::checkConvergence(std::size_t last_num_iters) const { + if (last_n_work.size() >= last_num_iters) { + double w_max_i = last_n_work.at(last_n_work.size() - last_num_iters); + double w_max_in = last_n_work.at(last_n_work.size() - 1); + double w_max_rel_d = (w_max_i - w_max_in) / last_num_iters; + if (theContext()->getNode() == 0) { + vt_debug_print( + terse, temperedlb, + "i={} in={} rel={}, tol={}, rel_tol={}\n", + w_max_i, w_max_in, w_max_rel_d, converge_tolerance_, + converge_tolerance_ * w_max_in + ); + } + if (w_max_rel_d < converge_tolerance_ * w_max_in) { + return true; + } + } + return false; +} + +void TemperedLB::timeLB(double total_time) { + auto const this_node = theContext()->getNode(); + if (this_node == 0) { + vt_print(temperedlb, "total time={}, per iter={}\n", total_time, total_time/total_num_iters_); + } } -void TemperedLB::giveEdges(EdgeMapType const& edge_map) { +void TemperedLB::giveEdges( + EdgeMapType const& edge_map, + std::unordered_map obj_cluster_id +) { for (auto const& [from_obj, to_edges] : edge_map) { for (auto const& [to_obj, volume] : to_edges) { send_edges_[from_obj].emplace_back(to_obj, volume); recv_edges_[to_obj].emplace_back(from_obj, volume); } } + for (auto const& [key, value] : obj_cluster_id) { + obj_shared_block_.emplace(key, value); + } } void TemperedLB::hasCommAny(bool has_comm_any) { @@ -1394,6 +1587,8 @@ void TemperedLB::loadStatsHandler(std::vector const& vec) { work_max_ = work.max(); new_work_imbalance_ = work.I(); + last_n_work.push_back(work_max_); + max_load_over_iters_.push_back(in.max()); auto this_node = theContext()->getNode(); @@ -1415,6 +1610,7 @@ void TemperedLB::loadStatsHandler(std::vector const& vec) { work.I() ); } + load_stats_handler_ = true; } void TemperedLB::rejectionStatsHandler( @@ -1454,6 +1650,11 @@ void TemperedLB::remoteBlockCountHandler(int n_unhomed_blocks) { n_unhomed_blocks ); } + compute_unhomed_done_ = true; +} + +void TemperedLB::propsDone() { + props_done_ = true; } void TemperedLB::informAsync() { @@ -1478,17 +1679,17 @@ void TemperedLB::informAsync() { proxy_.allreduce<&TemperedLB::setupDone>(); theSched()->runSchedulerWhile([this]{ return not setup_done_; }); - auto propagate_epoch = theTerm()->makeEpochCollective("TemperedLB: informAsync"); - // Underloaded start the round if (is_underloaded_) { - uint8_t k_cur_async = 0; - propagateRound(k_cur_async, false, propagate_epoch); + runInEpochRooted("informAsync", [&]{ + uint8_t k_cur_async = 0; + propagateRound(k_cur_async, false, no_epoch); + }); } - theTerm()->finishedEpoch(propagate_epoch); - - vt::runSchedulerThrough(propagate_epoch); + props_done_ = false; + proxy_.allreduce<&TemperedLB::propsDone>(); + theSched()->runSchedulerWhile([this]{ return not props_done_ ; }); if (is_overloaded_) { vt_debug_print( @@ -1639,7 +1840,7 @@ void TemperedLB::propagateRound(uint8_t k_cur, bool sync, EpochType epoch) { this_new_load_, this_new_work_, this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, - this_new_breakdown_.shared_vol + this_new_breakdown_.shared_vol, getSharedBlocksHere(), non_cluster_objs_ }; msg->addNodeInfo(this_node, info); if (has_memory_data_) { @@ -1658,7 +1859,7 @@ void TemperedLB::propagateRound(uint8_t k_cur, bool sync, EpochType epoch) { this_new_load_, this_new_work_, this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, - this_new_breakdown_.shared_vol + this_new_breakdown_.shared_vol, getSharedBlocksHere(), non_cluster_objs_ }; msg->addNodeInfo(this_node, info); if (has_memory_data_) { @@ -2154,21 +2355,13 @@ void TemperedLB::originalTransfer() { if (theConfig()->vt_debug_temperedlb) { // compute rejection rate because it will be printed - runInEpochCollective("TemperedLB::originalTransfer -> compute rejection", [=] { - iter_time_ = MPI_Wtime() - iter_time_; - proxy_.allreduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( - n_rejected, n_transfers, 0, 0 - ); - proxy_.allreduce<&TemperedLB::maxIterTime, collective::MaxOp>(iter_time_); - }); - } -} - -void TemperedLB::tryLock(NodeType requesting_node, double criterion_value) { - try_locks_.emplace(requesting_node, criterion_value); - - if (ready_to_satisfy_locks_ and not is_locked_) { - satisfyLockRequest(); + iter_time_ = MPI_Wtime() - iter_time_; + proxy_.reduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( + proxy_[0], n_rejected, n_transfers, 0, 0 + ); + proxy_.reduce<&TemperedLB::maxIterTime, collective::MaxOp>( + proxy_[0], iter_time_ + ); } } @@ -2179,6 +2372,7 @@ auto TemperedLB::removeClusterToSend( std::unordered_map give_obj_shared_block; std::unordered_map give_shared_blocks_size; std::unordered_map give_obj_working_bytes; + EdgeMapType give_send, give_recv; vt_debug_print( verbose, temperedlb, @@ -2241,11 +2435,25 @@ auto TemperedLB::removeClusterToSend( blocks_here_before.size(), blocks_here_after.size() ); + for (auto&& [elm, edge_map] : send_edges_) { + if (give_objs.find(elm) != give_objs.end()) { + give_send[elm] = edge_map; + } + } + + for (auto&& [elm, edge_map] : recv_edges_) { + if (give_objs.find(elm) != give_objs.end()) { + give_recv[elm] = edge_map; + } + } + return std::make_tuple( give_objs, give_obj_shared_block, give_shared_blocks_size, - give_obj_working_bytes + give_obj_working_bytes, + give_send, + give_recv ); } @@ -2280,7 +2488,7 @@ void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { this_new_load_, this_new_work_, this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, - this_new_breakdown_.shared_vol + this_new_breakdown_.shared_vol, getSharedBlocksHere(), non_cluster_objs_ }; auto criterion = [&,this]( @@ -2390,18 +2598,14 @@ void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { auto const src_shared_id = std::get<0>(best_swap); auto const try_shared_id = std::get<1>(best_swap); - vt_debug_print( - normal, temperedlb, - "best_c_try={}, swapping {} for {} on rank ={}\n", - best_c_try, src_shared_id, try_shared_id, try_rank - ); - // FIXME C++20: use structured binding auto const& give_data = removeClusterToSend(src_shared_id); auto const& give_objs = std::get<0>(give_data); auto const& give_obj_shared_block = std::get<1>(give_data); auto const& give_shared_blocks_size = std::get<2>(give_data); auto const& give_obj_working_bytes = std::get<3>(give_data); + auto const& give_send = std::get<4>(give_data); + auto const& give_recv = std::get<5>(give_data); runInEpochRooted("giveCluster", [&]{ vt_debug_print( @@ -2415,6 +2619,8 @@ void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { give_objs, give_obj_shared_block, give_obj_working_bytes, + give_send, + give_recv, try_shared_id ); }); @@ -2430,7 +2636,7 @@ void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { ); } - proxy_[try_rank].template send<&TemperedLB::releaseLock>(); + proxy_[try_rank].template send<&TemperedLB::releaseLock>(false, this_node, 0.0); vt_debug_print( verbose, temperedlb, @@ -2446,6 +2652,9 @@ void TemperedLB::considerSwapsAfterLock(MsgSharedPtr msg) { auto action = pending_actions_.back(); pending_actions_.pop_back(); action(); + } else { + // satisfy another lock + satisfyLockRequest(); } } @@ -2455,6 +2664,8 @@ void TemperedLB::giveCluster( std::unordered_map const& give_objs, std::unordered_map const& give_obj_shared_block, std::unordered_map const& give_obj_working_bytes, + EdgeMapType const& give_send, + EdgeMapType const& give_recv, SharedIDType take_cluster ) { auto const this_node = theContext()->getNode(); @@ -2477,12 +2688,21 @@ void TemperedLB::giveCluster( obj_working_bytes_.emplace(elm); } + for (auto&& [elm, map] : give_send) { + send_edges_[elm] = map; + } + for (auto&& [elm, map] : give_recv) { + recv_edges_[elm] = map; + } + if (take_cluster != no_shared_id) { auto const& [ take_objs, take_obj_shared_block, take_shared_blocks_size, - take_obj_working_bytes + take_obj_working_bytes, + take_send, + take_recv ] = removeClusterToSend(take_cluster); proxy_[from_rank].template send<&TemperedLB::giveCluster>( @@ -2491,6 +2711,8 @@ void TemperedLB::giveCluster( take_objs, take_obj_shared_block, take_obj_working_bytes, + take_send, + take_recv, no_shared_id ); } @@ -2514,11 +2736,25 @@ void TemperedLB::giveCluster( ); } -void TemperedLB::releaseLock() { +void TemperedLB::tryLock(NodeType requesting_node, double criterion_value) { + vt_debug_print( + normal, temperedlb, + "tryLock: requesting_node={}, c_try={}\n", + requesting_node, criterion_value + ); + + try_locks_.emplace(requesting_node, criterion_value); + + if (ready_to_satisfy_locks_ and not is_locked_) { + satisfyLockRequest(); + } +} + +void TemperedLB::releaseLock(bool try_again, NodeType try_lock_node, double c_try) { vt_debug_print( normal, temperedlb, - "releaseLock: pending size={}\n", - pending_actions_.size() + "releaseLock: pending size={}, try_again={}, try_lock_node={}, c_try={}\n", + pending_actions_.size(), try_again, try_lock_node, c_try ); is_locked_ = false; @@ -2543,22 +2779,22 @@ void TemperedLB::lockObtained(LockedInfoMsg* in_msg) { is_locked_, is_swapping_, locking_rank_, msg->locked_node, is_swapping_ ); - auto cur_epoch = theMsg()->getEpoch(); - theTerm()->produce(cur_epoch); + auto action = [this, msg]{ + try_locks_pending_--; + + vt_debug_print( + normal, temperedlb, + "try locks pending={} run action\n", try_locks_pending_ + ); - auto action = [this, msg, cur_epoch]{ - theMsg()->pushEpoch(cur_epoch); considerSwapsAfterLock(msg); - theMsg()->popEpoch(cur_epoch); - theTerm()->consume(cur_epoch); }; if (is_locked_ && locking_rank_ <= msg->locked_node) { cycle_locks_++; - proxy_[msg->locked_node].template send<&TemperedLB::releaseLock>(); - theTerm()->consume(cur_epoch); - try_locks_.emplace(msg->locked_node, msg->locked_c_try, 1); - //pending_actions_.push_back(action); + auto const this_node = theContext()->getNode(); + proxy_[msg->locked_node].template send<&TemperedLB::releaseLock>(true, this_node, in_msg->locked_c_try); + try_locks_pending_--; } else if (is_locked_) { pending_actions_.push_back(action); } else if (is_swapping_) { @@ -2579,8 +2815,8 @@ void TemperedLB::satisfyLockRequest() { // find the best lock to give for (auto&& tl : try_locks_) { vt_debug_print( - verbose, temperedlb, - "satisfyLockRequest: node={}, c_try={}, forced_release={}\n", + normal, temperedlb, + "satisfyLockRequest (iterate): node={}, c_try={}, forced_release={}\n", tl.requesting_node, tl.c_try, tl.forced_release ); } @@ -2603,7 +2839,7 @@ void TemperedLB::satisfyLockRequest() { vt_debug_print( normal, temperedlb, - "satisfyLockRequest: locked obtained for node={}\n", + "satisfyLockRequest: lock obtained for node={}\n", lock.requesting_node ); @@ -2611,7 +2847,7 @@ void TemperedLB::satisfyLockRequest() { this_new_load_, this_new_work_, this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, - this_new_breakdown_.shared_vol + this_new_breakdown_.shared_vol, getSharedBlocksHere(), non_cluster_objs_ }; proxy_[lock.requesting_node].template send<&TemperedLB::lockObtained>( @@ -2634,12 +2870,9 @@ void TemperedLB::swapClusters() { this_new_load_, this_new_work_, this_new_breakdown_.inter_send_vol, this_new_breakdown_.inter_recv_vol, this_new_breakdown_.intra_send_vol, this_new_breakdown_.intra_recv_vol, - this_new_breakdown_.shared_vol + this_new_breakdown_.shared_vol, getSharedBlocksHere(), non_cluster_objs_ }; - auto lazy_epoch = theTerm()->makeEpochCollective("TemperedLB: swapClusters"); - theTerm()->pushEpoch(lazy_epoch); - auto criterion = [&,this]( auto try_rank, auto try_mem, auto const& src_cluster, auto const& try_cluster ) -> double { @@ -2687,7 +2920,14 @@ void TemperedLB::swapClusters() { if (c_try >= 0.0) { // Try to obtain lock for feasible swap found_potential_good_swap = true; + + vt_debug_print( + normal, temperedlb, + "try lock rank={}, c_try={}\n", try_rank, c_try + ); + proxy_[try_rank].template send<&TemperedLB::tryLock>(this_node, c_try); + try_locks_pending_++; break; } } @@ -2699,7 +2939,14 @@ void TemperedLB::swapClusters() { if (c_try >= 0.0) { // Try to obtain lock for feasible swap found_potential_good_swap = true; + + vt_debug_print( + normal, temperedlb, + "try lock rank={}, c_try={}\n", try_rank, c_try + ); + proxy_[try_rank].template send<&TemperedLB::tryLock>(this_node, c_try); + try_locks_pending_++; break; } } // try_clusters @@ -2709,24 +2956,24 @@ void TemperedLB::swapClusters() { } // cur_clusters_ } // other_rank_clusters - // We have to be very careful here since we will allow some reentrancy here. - constexpr int turn_scheduler_times = 10; - for (int i = 0; i < turn_scheduler_times; i++) { - theSched()->runSchedulerOnceImpl(); - } - - while (not theSched()->workQueueEmpty()) { - theSched()->runSchedulerOnceImpl(); - } - ready_to_satisfy_locks_ = true; satisfyLockRequest(); - // Finalize epoch, we have sent our initial round of messages - // from here everything is message driven - theTerm()->finishedEpoch(lazy_epoch); - theTerm()->popEpoch(lazy_epoch); - vt::runSchedulerThrough(lazy_epoch); + vt_debug_print( + normal, temperedlb, + "try locks pending={}\n", try_locks_pending_ + ); + + theSched()->runSchedulerWhile([this]{ + if (not is_locked_) { + satisfyLockRequest(); + } + return try_locks_pending_ > 0 or try_locks_.size() > 0; + }); + + done_with_swaps_ = false; + proxy_.allreduce<&TemperedLB::finishedSwaps>(); + theSched()->runSchedulerWhile([this]{ return not done_with_swaps_; }); vt_debug_print( normal, temperedlb, @@ -2739,16 +2986,20 @@ void TemperedLB::swapClusters() { if (theConfig()->vt_debug_temperedlb) { int n_rejected = 0; auto remote_block_count = getRemoteBlockCountHere(); - runInEpochCollective("TemperedLB::swapClusters -> compute rejection", [=] { - iter_time_ = MPI_Wtime() - iter_time_; - proxy_.allreduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( - n_rejected, n_transfers_swap_, remote_block_count, cycle_locks_ - ); - proxy_.allreduce<&TemperedLB::maxIterTime, collective::MaxOp>(iter_time_); - }); + iter_time_ = MPI_Wtime() - iter_time_; + proxy_.reduce<&TemperedLB::rejectionStatsHandler, collective::PlusOp>( + proxy_[0], n_rejected, n_transfers_swap_, remote_block_count, cycle_locks_ + ); + proxy_.reduce<&TemperedLB::maxIterTime, collective::MaxOp>( + proxy_[0], iter_time_ + ); } } +void TemperedLB::finishedSwaps() { + done_with_swaps_ = true; +} + void TemperedLB::thunkMigrations() { vt_debug_print( normal, temperedlb, diff --git a/src/vt/vrt/collection/balance/temperedlb/temperedlb.h b/src/vt/vrt/collection/balance/temperedlb/temperedlb.h index ca30561b4e..f6f73517bd 100644 --- a/src/vt/vrt/collection/balance/temperedlb/temperedlb.h +++ b/src/vt/vrt/collection/balance/temperedlb/temperedlb.h @@ -96,7 +96,6 @@ struct TemperedLB : BaseLB { LoadType this_new_load, LoadType target_max_load ); -protected: void doLBStages(LoadType start_imb); void informAsync(); void informSync(); @@ -129,12 +128,49 @@ struct TemperedLB : BaseLB { void rejectionStatsHandler( int n_rejected, int n_transfers, int n_unhomed_blocks, int cycle_count ); + void finishedSwaps(); void maxIterTime(double max_iter_time); void remoteBlockCountHandler(int n_unhomed_blocks); + void timeLB(double total_time); void thunkMigrations(); - + void propsDone(); void setupDone(); + ////////////////////////////////////////////////////////////////////////////// + // Setters for the test harness + ////////////////////////////////////////////////////////////////////////////// + void setAlpha(double in_alpha) { alpha = in_alpha; } + void setDelta(double in_delta) { delta = in_delta; } + void setBeta(double in_beta) { beta = in_beta; } + void setGamma(double in_gamma) { gamma = in_gamma; } + void setCurObjs(std::unordered_map const& in_cur_objs) { + cur_objs_ = in_cur_objs; + } + void setRecvEdges(EdgeMapType const& in_recv_edges) { + recv_edges_ = in_recv_edges; + } + void setSendEdges(EdgeMapType const& in_send_edges) { + send_edges_ = in_send_edges; + } + + void setObjSharedBlock( + std::unordered_map const& in_shared + ) { + obj_shared_block_ = in_shared; + } + void setSharedSize( + std::unordered_map const& in_size + ) { + shared_block_size_ = in_size; + } + void setSharedEdge( + std::unordered_map> const& + in_shared_edge + ) { + shared_block_edge_ = in_shared_edge; + } + ////////////////////////////////////////////////////////////////////////////// + /** * \brief Read the memory data from the user-defined json blocks into data * structures @@ -178,6 +214,23 @@ struct TemperedLB : BaseLB { */ ClusterInfo makeClusterSummary(SharedIDType shared_id); + /** + * \brief Helper to add edges to cluster summary + * + * \param[in] shared_id the shared ID + * \param[in] info cluster info + * \param[in] cluster_objs cluster objs + * \param[in] obj the sending or receiving object + * \param[in] is_send whether it's a send or recv edge + * \param[in] edges the edges + */ + void makeClusterSummaryAddEdges( + SharedIDType shared_id, ClusterInfo& info, + std::set const& cluster_objs, + ObjIDType obj, bool is_send, + std::vector> const& edges + ); + /** * \brief Try to lock a rank * @@ -307,12 +360,12 @@ struct TemperedLB : BaseLB { std::unordered_map const& objs, std::set const& exclude = {}, std::unordered_map const& include = {} - ); + ) const; double computeWorkAfterClusterSwap( NodeType node, NodeInfo const& info, ClusterInfo const& to_remove, ClusterInfo const& to_add - ); + ) const; /** * \brief Consider possible swaps with all the up-to-date info from a rank @@ -324,7 +377,7 @@ struct TemperedLB : BaseLB { /** * \brief Release a lock on a rank */ - void releaseLock(); + void releaseLock(bool try_again, NodeType try_lock_node, double c_try); /** * \brief Give a cluster to a rank @@ -334,6 +387,8 @@ struct TemperedLB : BaseLB { * \param[in] give_objs the objects given * \param[in] give_obj_shared_block the shared block the objs are part of * \param[in] give_obj_working_bytes the working bytes for the objs + * \param[in] give_send send edges for given objects + * \param[in] give_recv recv edges for given objects * \param[in] take_cluster (optional) a cluster requested in return */ void giveCluster( @@ -342,9 +397,20 @@ struct TemperedLB : BaseLB { std::unordered_map const& give_objs, std::unordered_map const& give_obj_shared_block, std::unordered_map const& give_obj_working_bytes, + EdgeMapType const& give_send, + EdgeMapType const& give_recv, SharedIDType take_cluster ); + /** + * \brief Check for work convergence within a threshold + * + * \param[in] last_num_iters number of iterations to check + * + * \return whether we have converged + */ + bool checkConvergence(std::size_t last_num_iters = 8) const; + /** * \internal \brief Remove a cluster to send. Does all the bookkeeping * associated with removing the cluster @@ -404,6 +470,7 @@ struct TemperedLB : BaseLB { std::unordered_set underloaded_ = {}; std::unordered_set new_underloaded_ = {}; std::unordered_map cur_objs_ = {}; + std::set non_cluster_objs_ = {}; EdgeMapType send_edges_; EdgeMapType recv_edges_; LoadType this_new_load_ = 0.0; @@ -438,9 +505,20 @@ struct TemperedLB : BaseLB { double iter_time_ = 0.0f; /// Whether any node has communication data bool has_comm_any_ = false; + bool done_with_swaps_ = false; + bool props_done_ = false; + int try_locks_pending_ = 0; + EpochType lb_stages_epoch_ = no_epoch; + int total_num_iters_ = 0; + bool work_stats_handler_ = false; + bool load_stats_handler_ = false; + bool compute_unhomed_done_ = false; void hasCommAny(bool has_comm_any); - void giveEdges(EdgeMapType const& edge_map); + void giveEdges( + EdgeMapType const& edge_map, + std::unordered_map obj_cluster_id + ); ////////////////////////////////////////////////////////////////////////////// // All the memory info (may or may not be present) @@ -459,7 +537,7 @@ struct TemperedLB : BaseLB { double operator<(TryLock const& other) const { // sort in reverse order so the best is first! - return c_try > other.c_try; + return c_try == other.c_try ? requesting_node < other.requesting_node : c_try > other.c_try; } }; @@ -477,6 +555,8 @@ struct TemperedLB : BaseLB { } }; + /// Whether a cluster does not have a shared ID + SharedIDType const no_shared_id = -1; /// Whether we have memory information bool has_memory_data_ = false; /// Working bytes for this rank @@ -524,6 +604,8 @@ struct TemperedLB : BaseLB { /// Ready to satify looks bool ready_to_satisfy_locks_ = false; int consider_swaps_counter_ = 0; + std::vector last_n_work; + double converge_tolerance_ = 0.01; }; }}}} /* end namespace vt::vrt::collection::lb */ diff --git a/tests/unit/lb/test_temperedlb.cc b/tests/unit/lb/test_temperedlb.cc index b7b7e54029..5f32e428e7 100644 --- a/tests/unit/lb/test_temperedlb.cc +++ b/tests/unit/lb/test_temperedlb.cc @@ -61,7 +61,7 @@ std::string writeTemperedLBConfig( auto config_file = getUniqueFilename(); if (this_node == 0) { std::ofstream cfg_file_{config_file.c_str(), std::ofstream::out | std::ofstream::trunc}; - cfg_file_ << "0 TemperedLB iters=10 trials=3 transfer=" << transfer_strategy << + cfg_file_ << "0 TemperedLB converge_tolerance=0.001 iters=10 trials=3 transfer=" << transfer_strategy << " alpha=" << alpha << " beta=" << beta << " gamma=" << gamma << diff --git a/tests/unit/lb/test_temperedlb_work_calc.cc b/tests/unit/lb/test_temperedlb_work_calc.cc new file mode 100644 index 0000000000..7fe3dd944a --- /dev/null +++ b/tests/unit/lb/test_temperedlb_work_calc.cc @@ -0,0 +1,477 @@ +/* +//@HEADER +// ***************************************************************************** +// +// test_temperedlb_work_calc.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2024 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include +#include + +#include "test_helpers.h" +#include "test_parallel_harness.h" + +namespace vt::tests::unit::lb { + +#if vt_check_enabled(lblite) + +using TestTemperedLBWorkCalc = TestParallelHarness; + +/** + * + * This is the graph we are intending to test. + +111111111111111111111111111111111111 2222222222222222222222222222222222222222 +1 1 2 2 +1 1 2 2 +1 1 2 ««o12»» 2 +1 c1 c2 1 2««« »» c3 2 +1 ..... ..... 1 ««« »»» ....... 2 +1 . . . . «« 2 .»» ... 2 +1 . . . . «««1 2 . »» . 2 +1 . . . ««« 1 2 . »» . 2 +1 . o1»»»o2«««««««««««««o3«««««««««««««««««««««««««««««««««o4 . 2 +1 . » »». . «« »» . 1 2 . »« . 2 +1 . »» »» « » » . 1 2 . » « . 2 +1 . » . »» «« . » »» 1 2 ..» « ... 2 +1 ....» »o10« .»... » 1 2 » .«.... 2 +1 » » » 1 2 »» « 2 +111111111111»11111111111»1111111»111 2222222222222»2222«222222222222222222222 + » » » »» « + » » »» »» « + 3333»33333333»333333333333»3333333333»3333333«3333333 + 3 » » » »» « 3 + 3 » »o11« » » « 3 + 3 » « » »» c5 « 3 + 3 » « »» ....... « 3 + 3 » c4« » ». .« 3 + 3 » .....«. »» . »o7«««o8« . 3 + 3 »... « ... » . »» « . 3 + 3 » « »» . » « . 3 + 3 » « » . . »» «« . 3 + 3 . »o5»»»»o6»»»»»»»»»»»»»»»o9«« . 3 + 3 . «««« . .. .. 3 + 3 . . ....... 3 + 3 ... ... 3 + 3 ....... 3 + 3 3 + 33333333333333333333333333333333333333333333333333333 + +**/ + +auto getObjLoads( + std::unordered_map const& map +) -> double { + double load = 0; + for (auto const& [_, value] : map) { + load += value; + } + return load; +} + +auto getNodeEdges( + std::unordered_map const& objs, + std::unordered_map< + elm::ElementIDStruct, std::vector> + > const& edge_map, + bool off +) -> double { + double total_bytes = 0; + for (auto const& [obj_id, _] : objs) { + for (auto const& [obj_id2, vec] : edge_map) { + if (obj_id == obj_id2) { + for (auto const& [obj_id3, bytes] : vec) { + bool edge_off_node = objs.find(obj_id3) == objs.end(); + if (edge_off_node and off) { + total_bytes += bytes; + } else if (not edge_off_node and not off) { + total_bytes += bytes; + } + } + } + } + } + return total_bytes; +} + +auto computeOffHomeVolume( + vt::NodeType node, + std::unordered_map const& objs, + std::unordered_map const& obj_shared_block, + std::unordered_map< + SharedIDType, std::tuple + > const& shared_edge +) -> double { + double total_bytes = 0; + std::set shared_here; + for (auto const& [obj_id, _] : objs) { + if (auto it = obj_shared_block.find(obj_id); it != obj_shared_block.end()) { + auto shared_id = it->second; + auto [shared_node, shared_bytes] = shared_edge.find(shared_id)->second; + if (shared_node != node) { + total_bytes += shared_bytes; + } + } + } + return total_bytes; +} + +using TemperedLB = vt::vrt::collection::lb::TemperedLB; +using ClusterInfo = vt::vrt::collection::lb::ClusterInfo; +using NodeInfo = vt::vrt::collection::lb::NodeInfo; +using ObjIDType = vt::vrt::collection::lb::BaseLB::ObjIDType; +using WorkBreakdown = vt::vrt::collection::lb::WorkBreakdown; + +auto getObjLoad( + ObjIDType obj_id, + std::map> cur_objs +) -> double { + for (auto const& [rank, map] : cur_objs) { + for (auto const& [other_obj_id, load] : map) { + if (other_obj_id == obj_id) { + return load; + } + } + } + return 0; +} + +auto testClusterSwap( + TemperedLB* tlb, + NodeType rank, + std::unordered_map work_init, + std::unordered_map cluster_info, + std::map> cur_objs, + std::unordered_map obj_shared_block, + SharedIDType to_remove, SharedIDType to_add +) { + // Set initial distribution + tlb->setCurObjs(cur_objs[rank]); + // Get shared blocks for initial distribution + auto blocks_here_initial = tlb->getSharedBlocksHere(); + + auto cur_objs_add_remove = cur_objs[rank]; + for (auto const& [obj_id, shared_id] : obj_shared_block) { + if (shared_id == to_add) { + cur_objs_add_remove[obj_id] = getObjLoad(obj_id, cur_objs); + } + if (shared_id == to_remove) { + cur_objs_add_remove.erase(cur_objs_add_remove.find(obj_id)); + } + } + + std::set non_cluster_objs; + for (auto const& [elm_id, _] : cur_objs[rank]) { + if (obj_shared_block.find(elm_id) == obj_shared_block.end()) { + non_cluster_objs.insert(elm_id); + } + } + + tlb->setCurObjs(cur_objs_add_remove); + auto wb2 = tlb->computeWorkBreakdown(rank, cur_objs_add_remove); + + NodeInfo ni{ + getObjLoads(cur_objs_add_remove), + work_init[rank].work, + work_init[rank].inter_send_vol, work_init[rank].inter_recv_vol, + work_init[rank].intra_send_vol, work_init[rank].intra_recv_vol, + work_init[rank].shared_vol, + blocks_here_initial, + non_cluster_objs + }; + + ClusterInfo cluster_to_remove = + to_remove != -1 ? cluster_info[to_remove] : ClusterInfo{}; + ClusterInfo cluster_to_add = + to_add != -1 ? cluster_info[to_add] : ClusterInfo{}; + + auto new_work = tlb->computeWorkAfterClusterSwap( + rank, ni, cluster_to_remove, cluster_to_add + ); + + EXPECT_NEAR(new_work, wb2.work, FLT_EPSILON); + + vt_print(gen, "new_work={}, wb2={}\n", new_work, wb2.work); +} + + +TEST_F(TestTemperedLBWorkCalc, test_work_calc_1) { + using BytesType = vt::vrt::collection::lb::BytesType; + + auto tlb = std::make_unique(); + + double const alpha = 1.0; + double const beta = 0.4; + double const gamma = 0.2; + double const delta = 0.1; + + tlb->setAlpha(alpha); + tlb->setBeta(beta); + tlb->setGamma(gamma); + tlb->setDelta(delta); + + // Test some arbitrary values of each term to make sure they are computed + // correctly + double work = tlb->computeWork(1000, 100, 50, 20); + EXPECT_EQ( + work, + alpha * 1000 + + beta * 100 + + gamma * 50 + + delta * 20 + ); + + // rank 1 objects + auto o1 = elm::ElmIDBits::createCollectionImpl(true, 1, 1, 1); + auto o2 = elm::ElmIDBits::createCollectionImpl(true, 2, 1, 1); + auto o3 = elm::ElmIDBits::createCollectionImpl(true, 3, 1, 1); + auto o10 = elm::ElmIDBits::createCollectionImpl(true, 10, 1, 1); + + // rank 2 objects + auto o4 = elm::ElmIDBits::createCollectionImpl(true, 4, 2, 2); + auto o12 = elm::ElmIDBits::createCollectionImpl(true, 12, 2, 2); + + // rank 3 objects + auto o5 = elm::ElmIDBits::createCollectionImpl(true, 5, 3, 3); + auto o6 = elm::ElmIDBits::createCollectionImpl(true, 6, 3, 3); + auto o7 = elm::ElmIDBits::createCollectionImpl(true, 7, 3, 3); + auto o8 = elm::ElmIDBits::createCollectionImpl(true, 8, 3, 3); + auto o9 = elm::ElmIDBits::createCollectionImpl(true, 9, 3, 3); + auto o11 = elm::ElmIDBits::createCollectionImpl(true, 11, 3, 3); + + // clusters + std::unordered_map obj_shared_block = { + {o1, 1}, {o2, 1}, + {o3, 2}, + {o4, 3}, + {o5, 4}, {o6, 4}, + {o7, 5}, {o8, 5}, {o9, 5} + }; + + std::unordered_map shared_block_size = { + {1, 100}, + {2, 200}, + {3, 150}, + {4, 300}, + {5, 100} + }; + + std::map> cur_objs = { + {1, + {{o1, 20}, + {o2, 5}, + {o3, 10}, + {o10, 3} + } + }, + {2, + {{o4, 30}, + {o12, 9} + } + }, + {3, + {{o5, 10}, + {o6, 15}, + {o7, 3}, + {o8, 2}, + {o9, 8}, + {o11, 1} + } + } + }; + + std::unordered_map> shared_edge = { + {1, {1, 100}}, + {2, {1, 200}}, + {3, {2, 150}}, + {4, {3, 300}}, + {5, {3, 100}} + }; + + std::unordered_map< + elm::ElementIDStruct, std::vector> + > send_edges, recv_edges; + + send_edges = { + {o1, {{o2, 10}, {o5, 10}}}, + {o2, {{o10, 10}}}, + {o3, {{o2, 10}, {o7, 10}, {o10, 10}, {o11, 10}}}, + {o4, {{o3, 10}, {o8, 10}}}, + {o5, {{o6, 10}}}, + {o6, {{o4, 10}, {o5, 10}, {o9, 10}, {o11, 10}}}, + {o7, {{o9, 10}}}, + {o8, {{o7, 10}, {o9, 10}}}, + {o12, {{o3, 10}, {o4, 10}}} + }; + + recv_edges = { + {o2, {{o1, 10}, {o3, 10}}}, + {o3, {{o4, 10}, {o12, 10}}}, + {o4, {{o6, 10}, {o12, 10}}}, + {o5, {{o1, 10}, {o6, 10}}}, + {o6, {{o5, 10}}}, + {o7, {{o3, 10}, {o8, 10}}}, + {o8, {{o4, 10}}}, + {o9, {{o6, 10}, {o7, 10}, {o8, 10}}}, + {o10, {{o2, 10}, {o3, 10}}}, + {o11, {{o3, 10}, {o6, 10}}} + }; + + tlb->setObjSharedBlock(obj_shared_block); + tlb->setSharedSize(shared_block_size); + tlb->setSharedEdge(shared_edge); + tlb->setSendEdges(send_edges); + tlb->setRecvEdges(recv_edges); + + std::vector ranks = {1, 2, 3}; + std::unordered_map node_info; + std::unordered_map cluster_info; + std::unordered_map work_init; + + for (auto const& rank : ranks) { + tlb->setCurObjs(cur_objs[rank]); + + auto wb = tlb->computeWorkBreakdown(rank, cur_objs[rank]); + vt_print(gen, "rank={}, work={}\n", rank, wb.work); + work_init[rank] = wb; + + EXPECT_NEAR( + wb.work, + alpha * getObjLoads(cur_objs[rank]) + + beta * std::max( + getNodeEdges(cur_objs[rank], send_edges, true), + getNodeEdges(cur_objs[rank], recv_edges, true) + ) + + gamma * std::max( + getNodeEdges(cur_objs[rank], send_edges, false), + getNodeEdges(cur_objs[rank], recv_edges, false) + ) + + delta * computeOffHomeVolume( + rank, cur_objs[rank], obj_shared_block, shared_edge + ), + FLT_EPSILON + ); + + node_info[rank] = NodeInfo{ + getObjLoads(cur_objs[rank]), + wb.work, + wb.inter_send_vol, wb.inter_recv_vol, + wb.intra_send_vol, wb.intra_recv_vol, + wb.shared_vol, tlb->getSharedBlocksHere(), {} + }; + + for (auto const& shared_id : tlb->getSharedBlocksHere()) { + cluster_info[shared_id] = tlb->makeClusterSummary(shared_id); + } + } + + // Test removal of all clusters that exist on a given rank (one-by-one) + for (auto const& rank : ranks) { + for (auto const& [shared_id, rank_bytes] : shared_edge) { + auto const shared_rank = std::get<0>(rank_bytes); + if (rank == shared_rank) { + vt_print( + temperedlb, "try remove: rank={}, shared_id={}\n", rank, shared_id + ); + testClusterSwap( + tlb.get(), rank, work_init, cluster_info, cur_objs, obj_shared_block, + shared_id, -1 + ); + } + } + } + + // Test addition of all cluster that don't exist on a given rank + for (auto const& rank : ranks) { + std::set to_try_add; + for (auto const& rank2 : ranks) { + if (rank != rank2) { + for (auto const& [obj, shared_id] : obj_shared_block) { + if (std::get<0>(shared_edge[shared_id]) != rank) { + to_try_add.insert(shared_id); + } + } + } + } + + for (auto const& shared_id : to_try_add) { + vt_print(temperedlb, "try add: rank={}, shared_id={}\n", rank, shared_id); + testClusterSwap( + tlb.get(), rank, work_init, cluster_info, cur_objs, obj_shared_block, + -1, shared_id + ); + } + } + + // Test cluster swaps + for (auto const& rank : ranks) { + std::set to_try_add; + for (auto const& rank2 : ranks) { + if (rank != rank2) { + for (auto const& [obj, shared_id] : obj_shared_block) { + if (std::get<0>(shared_edge[shared_id]) != rank) { + to_try_add.insert(shared_id); + } + } + } + } + + for (auto const& [shared_id_remove, rank_bytes] : shared_edge) { + auto const shared_rank = std::get<0>(rank_bytes); + if (rank == shared_rank) { + for (auto const& shared_id_add : to_try_add) { + vt_print( + temperedlb, + "try swap: rank={}, shared_id_remove={}, shared_id_add={}\n", + rank, shared_id_remove, shared_id_add + ); + testClusterSwap( + tlb.get(), rank, work_init, cluster_info, cur_objs, obj_shared_block, + shared_id_remove, shared_id_add + ); + } + } + } + } + +} + +#endif /* vt_check_enabled(lblite) */ + +} /* end namespace vt::tests::unit::lb */