Skip to content

Commit 1af3be2

Browse files
committed
Add experimental slow rank mitigation
1 parent 0ab147b commit 1af3be2

File tree

5 files changed

+341
-5
lines changed

5 files changed

+341
-5
lines changed

detection/core/SlowNodeDetector.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ def __parseOutput(self):
9090
"""Parses text output from slow_node.cc"""
9191
self.__rank_times, \
9292
self.__rank_breakdowns, \
93-
self.__rank_to_node_map = parseOutput(self.__filepath, self.__benchmark, self.__datatype)
93+
self.__rank_to_node_map, \
94+
self.__rank_info = parseOutput(self.__filepath, self.__benchmark, self.__datatype)
9495

9596
self.__num_ranks = len(self.__rank_times)
9697

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
import os
2+
import numpy as np
3+
import matplotlib.pyplot as plt
4+
5+
from detection.utils.Parse import parseSensors, parseOutput
6+
from detection.utils.Plot import plotData, plotDroppedNodes
7+
from detection.utils.Time import timeFtn
8+
9+
10+
class SlowRankMitigator:
11+
"""
12+
The SlowRankMitigator analyzes the output from the `slow_node` executable
13+
and outputs relevant information related to the processing speed and temperature
14+
of the ranks used during execution.
15+
16+
There are two main methods of the SlowRankMitigator:
17+
18+
detect(): This will print out information regarding slow and/or over-heated
19+
ranks, along with the sockets and nodes they reside on.
20+
21+
createHostfile(): This will generate a `hostfile.txt` with all "good" nodes.
22+
This file can be used in future jobs to ensure that slow nodes are
23+
avoided. Importantly, nodes are only omitted from the hostfile if
24+
the number of slow ranks on that node surpasses the size of a socket.
25+
26+
Optional: Use `-N` argument to specify the number of nodes that should be
27+
included in the hostfile.
28+
29+
The following terminology will be used through the SlowRankMitigator:
30+
31+
Rank: An MPI process
32+
Core: Processing unit on a socket
33+
Socket: Collection of cores on a node
34+
Node: Computing unit in a cluster
35+
"""
36+
37+
def __init__(
38+
self, path, sensors, num_nodes, pct, weight, benchmark, type, spn, rpn, plot_rank_breakdowns):
39+
# Create empty dicts for storing data
40+
self.__rank_times = {}
41+
self.__rank_breakdowns = {}
42+
self.__rank_to_node_map = {} # Maps each rank to the name of its corresponding node
43+
self.__rank_info = {}
44+
45+
# Initialize variables
46+
self.__filepath = path
47+
self.__num_nodes = int(num_nodes) if num_nodes is not None else None
48+
self.__threshold_pct = float(pct)
49+
self.__weight = float(weight)
50+
self.__benchmark = benchmark
51+
self.__datatype = type
52+
self.__spn = int(spn)
53+
self.__rpn = int(rpn)
54+
self.__rps = self.__rpn / self.__spn
55+
self.__plot_rank_breakdowns = plot_rank_breakdowns
56+
self.__num_ranks = 0
57+
58+
# Initialize outliers
59+
self.__slow_ranks = {}
60+
self.__slow_rank_slowdowns = {}
61+
self.__slow_node_names = []
62+
63+
# Initialize (and create) directories
64+
self.__output_dir = os.path.join(
65+
os.path.dirname(path),
66+
"output")
67+
self.__plots_dir = os.path.join(
68+
self.__output_dir,
69+
"plots")
70+
os.makedirs(self.__plots_dir, exist_ok=True)
71+
72+
73+
###########################################################################
74+
## Utilities
75+
76+
def __s(self, lst: list):
77+
"""Helper function for the print statements."""
78+
return "s" if len(lst) != 1 else ""
79+
80+
81+
###########################################################################
82+
## Parsing
83+
84+
def __parseOutput(self):
85+
"""Parses text output from slow_node.cc"""
86+
self.__rank_times, \
87+
self.__rank_breakdowns, \
88+
self.__rank_to_node_map, \
89+
self.__rank_info = parseOutput(self.__filepath, self.__benchmark, self.__datatype)
90+
91+
self.__num_ranks = len(self.__rank_times)
92+
93+
###########################################################################
94+
## Secondary analytical functions
95+
96+
def __getNumberOfSlowRanksOnNode(self, node_name):
97+
"""
98+
Returns the number of ranks in self.__slow_ranks that
99+
belong to the given node.
100+
"""
101+
return sum(1 for r_id in self.__slow_ranks if self.__rank_to_node_map[r_id] == node_name)
102+
103+
def __isSlowNode(self, node_name):
104+
"""
105+
Returns True if all of the ranks on one socket of the node
106+
are considered slow.
107+
108+
For example, if there are two sockets per node, and half of
109+
the ranks on a node are "slow," the function will return True.
110+
"""
111+
# Exit early if possible
112+
if len(self.__slow_ranks) < self.__rps:
113+
return False
114+
115+
# Determine how many slow ranks are on this node
116+
n_slow_ranks = self.__getNumberOfSlowRanksOnNode(node_name)
117+
118+
return n_slow_ranks >= self.__rps
119+
120+
def __sortNodesByExecutionTime(self, nodes: list):
121+
"""
122+
Takes in a list of node names and sorts them based on total execution time.
123+
The fastest nodes will be first, and the slowest will be last.
124+
"""
125+
node_times = {}
126+
for r, n in self.__rank_to_node_map.items():
127+
if n in nodes:
128+
if n not in node_times:
129+
node_times[n] = 0.0
130+
node_times[n] += self.__rank_times[r]
131+
# Alternative:
132+
# return sorted(nodes, key=lambda n: self.__getNumberOfSlowRanksOnNode(n))
133+
return sorted(node_times, key=lambda t: node_times[t])
134+
135+
def __sortNodesByMaxRankExecutionTime(self, nodes: list):
136+
"""
137+
Takes in a list of node names and sorts them based on total execution time.
138+
The fastest nodes will be first, and the slowest will be last.
139+
"""
140+
node_times = {}
141+
for r, n in self.__rank_to_node_map.items():
142+
if n in nodes:
143+
if n not in node_times:
144+
node_times[n] = 0.0
145+
if self.__rank_times[r] > node_times[n]:
146+
node_times[n] = self.__rank_times[r]
147+
# Alternative:
148+
# return sorted(nodes, key=lambda n: self.__getNumberOfSlowRanksOnNode(n))
149+
return sorted(node_times, key=lambda t: node_times[t])
150+
151+
def __findHighOutliers(self, data):
152+
"""
153+
Finds data points that are some percentage (given by self.__threshold_pct)
154+
higher than the mean of the data.
155+
"""
156+
avg = np.mean(data)
157+
threshold = avg * (1.0 + self.__threshold_pct)
158+
outliers = [elt for elt in data if elt > threshold]
159+
diffs = [t / avg for t in outliers]
160+
assert len(outliers) == len(diffs) # sanity check
161+
return outliers, diffs
162+
163+
###########################################################################
164+
## Primary analytical functions
165+
166+
def __analyzeAcrossRanks(self):
167+
"""
168+
Compares the total execution time across all ranks to
169+
find any slow (self.__threshold_pct slower than the mean) ranks.
170+
"""
171+
rank_ids, total_times = zip(*self.__rank_times.items())
172+
outliers, slowdowns = self.__findHighOutliers(total_times)
173+
174+
plotData(rank_ids, total_times,
175+
"Across-Rank Comparison", "Rank ID",
176+
self.__plots_dir, self.__threshold_pct,
177+
outliers)
178+
179+
for r_id, time in self.__rank_times.items():
180+
if time in outliers:
181+
self.__slow_ranks[r_id] = time
182+
self.__slow_rank_slowdowns[r_id] = slowdowns[outliers.index(time)]
183+
184+
for r_id in self.__slow_ranks.keys():
185+
node_name = self.__rank_to_node_map[r_id]
186+
if self.__isSlowNode(node_name) and node_name not in self.__slow_node_names:
187+
self.__slow_node_names.append(node_name)
188+
189+
###########################################################################
190+
## Public getters
191+
192+
def getSlowRanks(self) -> dict:
193+
"""Return map of slow rank IDs to their times."""
194+
return self.__slow_ranks
195+
196+
def getSlowNodes(self) -> list:
197+
"""Return list of slow node names."""
198+
return self.__slow_node_names
199+
200+
###########################################################################
201+
## Public functions
202+
203+
def detect(self, print_results=True):
204+
"""
205+
Main function of the SlowRankMitigator class.
206+
Parses the output file from the slow_node executable
207+
and identifies any slow ranks or iterations.
208+
209+
Plots are generated in the same directory as the output
210+
file.
211+
"""
212+
timeFtn(self.__parseOutput)
213+
timeFtn(self.__analyzeAcrossRanks)
214+
215+
# Gather results
216+
rank_ids, total_times = zip(*self.__rank_times.items())
217+
slow_rank_ids = sorted(list(self.__slow_ranks.keys()), reverse=True, key=lambda r: self.__slow_rank_slowdowns[r])
218+
219+
# Print results
220+
if print_results:
221+
s = self.__s(slow_rank_ids)
222+
n = len(str(abs(int(self.__num_ranks))))
223+
print(f"\nPrinting analysis from {self.__benchmark}_{self.__datatype} benchmark...")
224+
print("\n----------------------------------------------------------")
225+
print("Across-Rank Analysis")
226+
print()
227+
print(f" {len(slow_rank_ids)} Outlier Rank{s} (at least {self.__threshold_pct:.0%} slower than the mean): {slow_rank_ids}")
228+
if len(slow_rank_ids) > 0:
229+
print()
230+
print(f" Slowdown % (Relative to Average) and Node for Slow Rank{s}:")
231+
for rank in slow_rank_ids:
232+
slowdown = self.__slow_rank_slowdowns[rank]
233+
node = self.__rank_to_node_map[rank]
234+
print(f" {rank:>{n}}: {slowdown:.2%} ({node})")
235+
print()
236+
print(f" Slowest Rank: {rank_ids[np.argmax(total_times)]} ({np.max(total_times)}s)")
237+
print(f" Fastest Rank: {rank_ids[np.argmin(total_times)]} ({np.min(total_times)}s)")
238+
print(f" Avg Time Across All Ranks: {np.mean(total_times)} s")
239+
print(f" Std Dev Across All Ranks: {np.std(total_times)} s")
240+
print()
241+
242+
print(f"View generated plots in {self.__plots_dir}.")
243+
print("----------------------------------------------------------")
244+
print()
245+
246+
def createAlphafile(self):
247+
alphafile_path = os.path.join(self.__output_dir, "alphafile.dat")
248+
with open(alphafile_path, "w") as alphafile:
249+
for rank_id, rank_info in self.__rank_info.items():
250+
if rank_id in self.__slow_ranks:
251+
alpha = self.__slow_rank_slowdowns[rank_id] * self.__weight
252+
else:
253+
alpha = 1.0
254+
alphafile.write(f"{rank_info[0]} {rank_info[1]} {alpha}\n")
255+
print("Alpha map has been written to alphafile.dat")

detection/mitigate_slow_ranks.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import os
2+
import sys
3+
import argparse
4+
5+
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
6+
7+
from detection.core.SlowRankMitigator import SlowRankMitigator
8+
from detection.utils.Time import timeFtn
9+
10+
def main():
11+
"""
12+
See documentation of SlowRankMitigator class, as well as
13+
the detect() and createHostfile() methods, for more information.
14+
"""
15+
parser = argparse.ArgumentParser(description='Slow Rank Detector script.')
16+
parser.add_argument('-f', '--filepath', help='Absolute or relative path to the output file from running slow_node executable', required=True)
17+
parser.add_argument('-s', '--sensors', help='Absolute or relative path to the sensors file that will be analyzed', default=None)
18+
parser.add_argument('-N', '--num_nodes', help='The number of nodes required by the application', default=None)
19+
parser.add_argument('-t', '--threshold', help='Percentage above average time that indicates a "slow" rank', default=0.05)
20+
parser.add_argument('-w', '--weight', help='Weight for penalizing slow rank alphas', default=1.5)
21+
parser.add_argument('-b', '--benchmark', help='Benchmark to analyze: [level1, level2, level3, dpotrf]', default='level3')
22+
parser.add_argument('-d', '--datatype', help='Datatype of benchmark to analyze: [double, complex]', default='double')
23+
parser.add_argument('-spn', '--spn', help='Number of sockets per node', default=2)
24+
parser.add_argument('-rpn', '--rpn', help='Number of ranks per node', default=48)
25+
parser.add_argument('-p', '--plot_all_ranks', action='store_true', help='Plot the breakdowns for every rank')
26+
args = parser.parse_args()
27+
28+
filepath = os.path.abspath(args.filepath)
29+
sensors_filepath = os.path.abspath(args.sensors) if args.sensors is not None else None
30+
31+
slowRankMitigator = SlowRankMitigator(
32+
path=filepath,
33+
sensors=sensors_filepath,
34+
num_nodes=args.num_nodes,
35+
pct=args.threshold,
36+
weight=args.weight,
37+
benchmark=args.benchmark,
38+
type=args.datatype,
39+
spn=args.spn,
40+
rpn=args.rpn,
41+
plot_rank_breakdowns=args.plot_all_ranks)
42+
43+
timeFtn(slowRankMitigator.detect)
44+
timeFtn(slowRankMitigator.createAlphafile)
45+
46+
if __name__ == "__main__":
47+
timeFtn(main)

detection/utils/Parse.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ def parseOutput(slownode_file, benchmark, datatype):
1212
rank_times = {}
1313
rank_breakdowns = {}
1414
rank_to_node_map = {}
15+
rank_info_map = {}
1516
is_parsing=False
1617
with open(slownode_file, "r") as output:
1718
for line in output:
18-
if line.startswith(f"{benchmark}_{datatype}"):
19+
if line.startswith("NodeInfo:"):
20+
# splits: ['NodeInfo:', hostname, world_rank, shared_rank]
21+
splits = line.split(" ")
22+
rank_info_map[int(splits[2])] = (splits[1], int(splits[3]))
23+
elif line.startswith(f"{benchmark}_{datatype}"):
1924
is_parsing = True
20-
21-
if is_parsing:
25+
elif is_parsing:
2226
if line.startswith("gather"):
2327
# splits: ['gather', rank_info, total_time, 'breakdown', [times]]
2428
splits = line.split(":")
@@ -48,7 +52,7 @@ def parseOutput(slownode_file, benchmark, datatype):
4852
elif line.strip() == "":
4953
is_parsing = False
5054

51-
return rank_times, rank_breakdowns, rank_to_node_map
55+
return rank_times, rank_breakdowns, rank_to_node_map, rank_info_map
5256

5357
def parseSensors(sensors_file):
5458
"""

src/benchmarks.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include "benchmarks.h"
77

88
#include <iostream>
9+
#include <tuple>
10+
#include <string>
911

1012
namespace benchmarks {
1113

@@ -223,6 +225,27 @@ all_results_t runAllBenchmarks(std::vector<int> sizes, int iters) {
223225
return all_results;
224226
}
225227

228+
std::tuple<std::string, int, int> getNodeRank() {
229+
int world_rank = -1, world_size = -1;
230+
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
231+
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
232+
233+
char hostname[MPI_MAX_PROCESSOR_NAME];
234+
int hostname_len = -1;
235+
MPI_Get_processor_name(hostname, &hostname_len);
236+
237+
MPI_Comm shared_comm;
238+
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shared_comm);
239+
240+
int shared_rank = -1, shared_size = -1;
241+
MPI_Comm_rank(shared_comm, &shared_rank);
242+
MPI_Comm_size(shared_comm, &shared_size);
243+
244+
MPI_Comm_free(&shared_comm);
245+
246+
return std::make_tuple(std::string{hostname}, world_rank, shared_rank);
247+
}
248+
226249
void printBenchmarkOutput(all_results_t benchmark_results, int iters)
227250
{
228251
int rank = -1;
@@ -285,5 +308,11 @@ void printBenchmarkOutput(all_results_t benchmark_results, int iters)
285308
}
286309
}
287310
}
311+
312+
MPI_Barrier(MPI_COMM_WORLD);
313+
auto rank_info = getNodeRank();
314+
std::cout << "NodeInfo: " << std::get<0>(rank_info) << " "
315+
<< std::get<1>(rank_info) << " " << std::get<2>(rank_info) << std::endl;
316+
288317
}
289318
} // end namespace benchmarks

0 commit comments

Comments
 (0)