-
Notifications
You must be signed in to change notification settings - Fork 83
Expand file tree
/
Copy pathutils.py
More file actions
186 lines (147 loc) · 5.46 KB
/
utils.py
File metadata and controls
186 lines (147 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import multiprocessing as mp
from enum import Enum
from pathlib import Path
from typing import List, Tuple
import pandas as pd
import psutil
class KernelType(Enum):
COMMUNICATION = 0
MEMORY = 1
COMPUTATION = 2
class IdleTimeType(Enum):
HOST_WAIT = 0
KERNEL_WAIT = 1
OTHER = 2
def normalize_path(path: str) -> str:
"""
Convert a Linux path to Python path.
Args:
path (str) : a path acceptable by the OS.
Returns:
A path supported by Python.
"""
if path.startswith("./"):
path2 = path[2:]
if len(path2) > 0:
normalized_path = str(Path.cwd().joinpath(path2))
else:
normalized_path = str(Path.cwd())
elif path.startswith("~/"):
path2 = path[2:]
if len(path2) > 0:
normalized_path = str(Path.home().joinpath(path2))
else:
normalized_path = str(Path.home())
else:
normalized_path = path
return normalized_path
def is_comm_kernel(name: str) -> bool:
"""
Check if a given GPU kernel is a communication kernel.
Args:
name (str): name of the GPU kernel.
Returns:
A boolean indicating if the kernel is a communication kernel.
"""
return "ncclKernel" in name or "ncclDev" in name
def is_memory_kernel(name: str) -> bool:
"""
Check if a given GPU kernel is a memory kernel.
Args:
name (str): name of the GPU kernel.
Returns:
A boolean indicating if the kernel is an IO kernel.
"""
return "Memcpy" in name or "Memset" in name
def get_kernel_type(name: str) -> str:
if is_comm_kernel(name):
return KernelType.COMMUNICATION.name
elif is_memory_kernel(name):
return KernelType.MEMORY.name
else:
return KernelType.COMPUTATION.name
def get_memory_kernel_type(name: str) -> str:
"""Memcpy Type is basically a prefix of the kernel name ~ Memcpy DtoH"""
if name[:6] == "Memset":
return "Memset"
if name[:6] != "Memcpy":
return "Memcpy Unknown"
prefix_size = 11 # len("Memcpy DtoH")
return name[:prefix_size]
def merge_kernel_intervals(kernel_df: pd.DataFrame) -> pd.DataFrame:
"""
Merge all kernel intervals in the given dataframe such that there are no overlapping.
"""
kernel_df.sort_values(by="ts", inplace=True)
kernel_df["end"] = kernel_df["ts"] + kernel_df["dur"]
# Operators within the same group need to be merged together to form a larger interval.
kernel_df["group"] = (kernel_df["ts"] > kernel_df["end"].shift().cummax()).cumsum()
kernel_df = (
kernel_df.groupby("group", as_index=False)
.agg({"ts": "min", "end": "max"})
.drop(["group"], axis=1)
.sort_values(by="ts")
)
return kernel_df
def shorten_name(name: str) -> str:
"""Shorten a long operator/kernel name.
The CPU operator and CUDA kernel name in the trace can be too long to follow.
This utility removes the functional arguments, template arguments, and return values
to make the name easy to understand.
"""
s: str = name.replace("->", "")
stack: List[str] = []
for c in s:
if c == ">": # match generic template arguments
while len(stack) and stack[-1] != "<":
stack.pop()
if len(stack) > 0 and stack[-1] == "<":
stack.pop()
elif c == ")": # match arguments or comments
while len(stack) and stack[-1] != "(":
stack.pop()
if len(stack) > 0 and stack[-1] == "(":
stack.pop()
else:
stack.append(c)
return "".join(stack).split(" ")[-1]
def flatten_column_names(df: pd.DataFrame) -> None:
"""Flatten a DataFrame's a multiple index column names to a single string"""
if isinstance(df.columns, pd.MultiIndex):
df.columns = ["_".join(col).rstrip("_") for col in df.columns]
def get_mp_pool_size(obj_size: int, num_objs: int) -> int:
"""
Estimate the maximum pool size for multiprocessing
Args:
obj_size (int): the size of objects to be processed
num_objs (int): the total number of objects to be processed
Returns:
int
the recommend pool size
"""
free_mem = psutil.virtual_memory().available
# Leave 20% buffer for system and other processes
max_np = int(0.8 * free_mem / obj_size)
return min(max_np, num_objs, mp.cpu_count())
def get_symbol_column_names(df: pd.DataFrame) -> Tuple[str, str]:
"""Get the proper column names for the `name` and `cat` attributes of string type in the DataFrame.
Due to the encoding/decoding operations, it is impossible for a generic HTA routine to known which columns
in a trace DataFrame have the symbol values for the events' `name` and `cat` attributes.
Args:
df (pd.DataFrame): A trace DataFrame.
Returns:
(column_name_for_name, column_name_for_cat)
"""
name_column, cat_column = "", ""
for column_name in ["name", "s_name"]:
if column_name in df.columns and df.dtypes[column_name] == "object":
name_column = column_name
break
for column_name in ["cat", "s_cat"]:
if column_name in df.columns and df.dtypes[column_name] == "object":
cat_column = column_name
break
return name_column, cat_column