Skip to content

Commit 2764e2a

Browse files
committed
#5: Add uniformity based second step node filtering
1 parent 6b06acf commit 2764e2a

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

detection/detect_slow_nodes.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class SlowNodeDetector:
4444
"""
4545

4646
def __init__(
47-
self, path, sensors, num_nodes, pct, spn, rpn, plot_rank_breakdowns, use_clstr, output_dir=None):
47+
self, path, sensors, num_nodes, pct, spn, rpn, plot_rank_breakdowns, use_clstr, use_unfrm, output_dir=None):
4848
# Create empty dicts for storing data
4949
self.__rank_times = {}
5050
self.__rank_breakdowns = {}
@@ -64,12 +64,14 @@ def __init__(
6464
self.__plot_rank_breakdowns = plot_rank_breakdowns
6565
self.__num_ranks = 0
6666
self.__use_clustering = use_clstr
67+
self.__use_uniformity = use_unfrm
6768

6869
# Initialize outliers
6970
self.__slow_ranks = {}
7071
self.__slow_rank_slowdowns = {}
7172
self.__slow_node_names = []
7273
self.__slow_iterations = {}
74+
self.__node_variances = {}
7375

7476
# Initialize (and create) directories
7577
if output_dir:
@@ -378,7 +380,7 @@ def __clusterTimes(self, data):
378380

379381
data = np.array(data)
380382

381-
ms = MeanShift().fit(data.reshape(-1, 1))
383+
ms = MeanShift(n_jobs=-1).fit(data.reshape(-1, 1))
382384
clusters = ms.predict(data.reshape(-1, 1))
383385

384386

@@ -561,6 +563,18 @@ def __analyzeAcrossRanks(self):
561563
if self.__isSlowNode(node_name) and node_name not in self.__slow_node_names:
562564
self.__slow_node_names.append(node_name)
563565

566+
if self.__use_uniformity:
567+
node_variances = {}
568+
for r_id, time in self.__rank_times.items():
569+
node_name = self.__rank_to_node_map[r_id]
570+
if node_name not in node_variances:
571+
node_variances[node_name] = []
572+
node_variances[node_name].append(time)
573+
574+
for node_name, times in node_variances.items():
575+
variance = np.var(times)
576+
self.__node_variances[node_name] = variance
577+
564578
def __analyzeWithinRanks(self):
565579
"""
566580
Compares the execution of each iteration on a single rank to
@@ -752,14 +766,23 @@ def createHostfile(self):
752766
elif num_good_nodes > self.__num_nodes:
753767
n_nodes_to_drop = num_good_nodes - self.__num_nodes
754768
assert n_nodes_to_drop > 0, f"Cannot drop {n_nodes_to_drop}"
755-
sorted_nodes = self.__sortNodesByExecutionTime(good_node_names)
756769
print(
757770
f"Since the SlowNodeDetector originally found {num_good_nodes} good node{s}, "
758771
f"but only {self.__num_nodes} are needed, the following nodes will also be "
759772
f"omitted from the hostfile:")
760-
for node in sorted_nodes[-n_nodes_to_drop:]:
761-
print(f" {node} ({self.__getNumberOfSlowRanksOnNode(node)} slow ranks)")
762-
good_node_names = sorted_nodes[:-n_nodes_to_drop]
773+
774+
if self.__use_uniformity:
775+
node_variances = {node: self.__node_variances[node] for node in good_node_names}
776+
sorted_nodes_by_variance = sorted(node_variances.items(), key=lambda item: item[1], reverse=True)
777+
nodes_to_drop = [node for node, _ in sorted_nodes_by_variance[:n_nodes_to_drop]]
778+
for node in nodes_to_drop:
779+
print(f" {node} ({self.__getNumberOfSlowRanksOnNode(node)} slow ranks)")
780+
good_node_names -= set(nodes_to_drop)
781+
else:
782+
sorted_nodes = self.__sortNodesByExecutionTime(good_node_names)
783+
for node in sorted_nodes[-n_nodes_to_drop:]:
784+
print(f" {node} ({self.__getNumberOfSlowRanksOnNode(node)} slow ranks)")
785+
good_node_names = sorted_nodes[:-n_nodes_to_drop]
763786

764787
hostfile_path = os.path.join(self.__output_dir, "hostfile.txt")
765788
with open(hostfile_path, "w") as hostfile:
@@ -785,6 +808,7 @@ def main():
785808
parser.add_argument('-rpn', '--rpn', help='Number of ranks per node', default=48)
786809
parser.add_argument('-p', '--plot_all_ranks', action='store_true', help='Plot the breakdowns for every rank')
787810
parser.add_argument('-c', '--use_clustering', action='store_true', help='Use clustering outlier detection')
811+
parser.add_argument('-u', '--use_uniformity', action='store_true', help='Use rank execution time uniformity to identify slow nodes')
788812
args = parser.parse_args()
789813

790814
filepath = os.path.abspath(args.filepath)
@@ -799,6 +823,7 @@ def main():
799823
rpn=args.rpn,
800824
plot_rank_breakdowns=args.plot_all_ranks,
801825
use_clstr=args.use_clustering,
826+
use_unfrm=args.use_uniformity,
802827
output_dir=args.output_dir)
803828

804829
slowNodeDetector.detect()

tests/unit/detection/test_slow_node_detector.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ def setUp(self):
3131
spn=self.spn,
3232
rpn=self.rpn,
3333
plot_rank_breakdowns=False,
34-
use_clstr=False
34+
use_clstr=False,
35+
use_unfrm=False
3536
)
3637

3738
# Run detection
@@ -85,7 +86,8 @@ def setUp(self):
8586
spn=self.spn,
8687
rpn=self.rpn,
8788
plot_rank_breakdowns=False,
88-
use_clstr=True
89+
use_clstr=True,
90+
use_unfrm=False
8991
)
9092

9193
# Run detection
@@ -129,7 +131,8 @@ def setUp(self):
129131
spn=self.spn,
130132
rpn=self.rpn,
131133
plot_rank_breakdowns=False,
132-
use_clstr=True
134+
use_clstr=True,
135+
use_unfrm=False
133136
)
134137

135138
# Run detection

0 commit comments

Comments
 (0)