Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions gprofiler/hw_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import base64
import gzip
import os
import platform
import shutil
import subprocess
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from threading import Event, RLock, Thread
from typing import Optional

DEFAULT_POLLING_INTERVAL_SECONDS = 5
STOP_TIMEOUT_SECONDS = 2
PERFSPECT_DATA_DIRECTORY = "/tmp/perfspect_data"


@dataclass
class HWMetrics:
# HW metrics data in json format
metrics_data: Optional[dict]
# base64 encoded HTML data
metrics_html: Optional[str]


class HWMetricsMonitorBase(metaclass=ABCMeta):
@abstractmethod
def start(self) -> None:
pass

@abstractmethod
def stop(self) -> None:
pass

@abstractmethod
def _get_hw_metrics_dict(self) -> Optional[dict]:
"""
Returns the HW metrics in dictionary data structure
"""
raise NotImplementedError

@abstractmethod
def _get_hw_metrics_html(self) -> Optional[str]:
"""
Returns the base64 encoded string with HW metrics in HTML format
"""
raise NotImplementedError

def get_hw_metrics(self) -> HWMetrics:
return HWMetrics(self._get_hw_metrics_dict(), self._get_hw_metrics_html())


class HWMetricsMonitor(HWMetricsMonitorBase):
def __init__(
self,
stop_event: Event,
perfspect_path: Optional[Path] = None,
perfspect_duration: int = 60,
polling_rate_seconds: int = DEFAULT_POLLING_INTERVAL_SECONDS,
):
self._polling_rate_seconds = polling_rate_seconds
self._stop_event = stop_event
self._thread: Optional[Thread] = None
self._lock = RLock()
self._ps_process: Optional[subprocess.Popen[bytes]] = None
self._perfspect_path: Optional[Path] = perfspect_path
self._perfspect_duration = perfspect_duration

self._ps_raw_csv_filename = PERFSPECT_DATA_DIRECTORY + "/" + platform.node() + "_metrics.csv"
self._ps_summary_csv_filename = PERFSPECT_DATA_DIRECTORY + "/" + platform.node() + "_metrics_summary.csv"
self._ps_summary_html_filename = PERFSPECT_DATA_DIRECTORY + "/" + platform.node() + "_metrics_summary.html"
self._ps_latest_csv_filename = PERFSPECT_DATA_DIRECTORY + "/" + platform.node() + "_metrics_summary_latest.csv"
self._ps_latest_html_filename = (
PERFSPECT_DATA_DIRECTORY + "/" + platform.node() + "_metrics_summary_latest.html"
)

self._cleanup()

def start(self) -> None:
if (
self._perfspect_path is None
or not os.path.isfile(self._perfspect_path)
or not os.access(self._perfspect_path, os.X_OK)
):
return None

ps_cmd = [
str(self._perfspect_path),
"metrics",
"--duration",
str(self._perfspect_duration),
"--output",
PERFSPECT_DATA_DIRECTORY,
]

self._ps_process = subprocess.Popen(ps_cmd, stdout=subprocess.PIPE)

def stop(self) -> None:
Comment thread
mlim19 marked this conversation as resolved.
if self._ps_process:
self._ps_process.terminate()

self._cleanup()
self._thread = None

def _cleanup(self) -> None:
# Remove the directory if it exists
# and create a new one
# to avoid any conflicts
# with the old data
# and to ensure that the directory is empty
# before starting the new process
if not os.path.exists(PERFSPECT_DATA_DIRECTORY):
os.makedirs(PERFSPECT_DATA_DIRECTORY)
else:
if os.path.exists(self._ps_raw_csv_filename):
os.remove(self._ps_raw_csv_filename)
if os.path.exists(self._ps_summary_csv_filename):
os.remove(self._ps_summary_csv_filename)
if os.path.exists(self._ps_summary_html_filename):
os.remove(self._ps_summary_html_filename)

def _get_hw_metrics_dict(self) -> Optional[dict]:
summary_dict = {}
if os.path.exists(self._ps_summary_csv_filename) and os.path.isfile(self._ps_summary_csv_filename):
shutil.copy(self._ps_summary_csv_filename, self._ps_latest_csv_filename)
with open(self._ps_latest_csv_filename, "r") as f:
next(f) # Skip the first line
for line in f:
csv_data = line.split(",")
summary_dict[csv_data[0]] = csv_data[1]

os.remove(self._ps_latest_csv_filename)
return summary_dict

else:
return None

def _get_hw_metrics_html(self) -> Optional[str]:
if os.path.exists(self._ps_summary_html_filename) and os.path.isfile(self._ps_summary_html_filename):
encoded_html_data = None
shutil.copy(self._ps_summary_html_filename, self._ps_latest_html_filename)
with open(self._ps_latest_html_filename, "rb") as f:
html_data = f.read()
# Compress the HTML data using gzip
compressed_html_data = gzip.compress(html_data)

# For debug, save the compressed HTML data to a file
# compressed_html_filename = self._ps_latest_html_filename + ".gz"
# with open(compressed_html_filename, "wb") as compressed_html_file:
# compressed_html_file.write(compressed_html_data)
# compressed_html_file.close()

# Encode the compressed HTML data to base64
encoded_html_data = base64.b64encode(compressed_html_data).decode("utf-8")

# For debug, save the base64 encoded HTML data to a file
# encoded_html_filename = self._ps_latest_html_filename + ".b64"
# with open(encoded_html_filename, "w") as encoded_html_file:
# encoded_html_file.write(encoded_html_data)
# encoded_html_file.close()

os.remove(self._ps_latest_html_filename)
return encoded_html_data

else:
return None


class NoopHWMetricsMonitor(HWMetricsMonitorBase):
def start(self) -> None:
pass

def stop(self) -> None:
pass

def _get_hw_metrics_dict(self) -> Optional[dict]:
return None

def _get_hw_metrics_html(self) -> Optional[str]:
return None
73 changes: 72 additions & 1 deletion gprofiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from gprofiler.diagnostics import log_diagnostics, set_diagnostics
from gprofiler.exceptions import APIError, NoProfilersEnabledError
from gprofiler.gprofiler_types import ProcessToProfileData, UserArgs, integers_list, positive_integer
from gprofiler.hw_metrics import HWMetricsMonitor, HWMetricsMonitorBase, NoopHWMetricsMonitor
from gprofiler.log import RemoteLogsHandler, initial_root_logger_setup
from gprofiler.merge import concatenate_from_external_file, concatenate_profiles, merge_profiles
from gprofiler.metadata import ProfileMetadata
Expand All @@ -56,7 +57,7 @@
from gprofiler.metadata.external_metadata import ExternalMetadataStaleError, read_external_metadata
from gprofiler.metadata.metadata_collector import get_current_metadata, get_static_metadata
from gprofiler.metadata.system_metadata import get_hostname, get_run_mode, get_static_system_info
from gprofiler.platform import is_linux, is_windows
from gprofiler.platform import is_aarch64, is_linux, is_windows
from gprofiler.profiler_state import ProfilerState
from gprofiler.profilers.factory import get_profilers
from gprofiler.profilers.profiler_base import NoopProfiler, ProcessProfilerBase, ProfilerInterface
Expand Down Expand Up @@ -116,12 +117,15 @@ def __init__(
duration: int,
profile_api_version: str,
profiling_mode: str,
collect_hw_metrics: bool,
processes_to_profile: Optional[List[Process]],
profile_spawned_processes: bool = True,
remote_logs_handler: Optional[RemoteLogsHandler] = None,
controller_process: Optional[Process] = None,
external_metadata_path: Optional[Path] = None,
heartbeat_file_path: Optional[Path] = None,
perfspect_path: Optional[Path] = None,
perfspect_duration: int = 60,
):
self._output_dir = output_dir
self._flamegraph = flamegraph
Expand All @@ -142,6 +146,9 @@ def __init__(
self._duration = duration
self._external_metadata_path = external_metadata_path
self._heartbeat_file_path = heartbeat_file_path
self._collect_hw_metrics = collect_hw_metrics
self._perfspect_path = perfspect_path
self._perfspect_duration = perfspect_duration
if self._collect_metadata:
self._static_metadata = get_static_metadata(self._spawn_time, user_args, self._external_metadata_path)
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
Expand Down Expand Up @@ -169,6 +176,16 @@ def __init__(
else:
self._system_metrics_monitor = NoopSystemMetricsMonitor()

if self._collect_hw_metrics and self._perfspect_path is not None:
logger.info(f"Using perfspect path: {self._perfspect_path}")
self._hw_metrics_monitor: HWMetricsMonitorBase = HWMetricsMonitor(
self._profiler_state.stop_event,
perfspect_path=self._perfspect_path,
perfspect_duration=self._perfspect_duration,
)
else:
self._hw_metrics_monitor = NoopHWMetricsMonitor()

if isinstance(self.system_profiler, NoopProfiler) and not self.process_profilers:
raise NoProfilersEnabledError()

Expand Down Expand Up @@ -255,8 +272,10 @@ def _strip_extra_data(self, collapsed_data: str) -> str:
return "\n".join(lines)

def start(self) -> None:
logger.info("Starting ...")
self._profiler_state.stop_event.clear()
self._system_metrics_monitor.start()
self._hw_metrics_monitor.start()

for prof in list(self.all_profilers):
try:
Expand All @@ -275,6 +294,7 @@ def stop(self) -> None:
logger.info("Stopping ...")
self._profiler_state.stop_event.set()
self._system_metrics_monitor.stop()
self._hw_metrics_monitor.stop()
for prof in self.all_profilers:
prof.stop()

Expand Down Expand Up @@ -314,6 +334,19 @@ def _snapshot(self) -> None:
)
metadata.update({"profiling_mode": self._profiler_state.profiling_mode})
metrics = self._system_metrics_monitor.get_metrics()
hwmetrics = self._hw_metrics_monitor.get_hw_metrics()
if hwmetrics is None:
logger.info("No hw metrics were collected")
else:
if hwmetrics.metrics_data is None:
logger.info("No hw metrics_data were collected")
else:
logger.info("Collected hw metrics_data")

if hwmetrics.metrics_html is None:
logger.info("No hw metrics_html were collected")
else:
logger.info("Collected hw metrics_html")

try:
external_app_metadata = read_external_metadata(self._external_metadata_path).application
Expand All @@ -329,6 +362,7 @@ def _snapshot(self) -> None:
enrichment_options=self._enrichment_options,
metadata=metadata,
metrics=metrics,
hwmetrics=hwmetrics,
external_app_metadata=external_app_metadata,
)

Expand All @@ -340,6 +374,7 @@ def _snapshot(self) -> None:
enrichment_options=self._enrichment_options,
metadata=metadata,
metrics=metrics,
hwmetrics=hwmetrics,
external_app_metadata=external_app_metadata,
)

Expand Down Expand Up @@ -815,6 +850,32 @@ def parse_cmd_args() -> configargparse.Namespace:
"The file modification indicates the last snapshot time.",
)

if is_linux() and not is_aarch64():
hw_metrics_options = parser.add_argument_group("hardware metrics")
hw_metrics_options.add_argument(
"--enable-hw-metrics-collection",
action="store_true",
default=False,
dest="collect_hw_metrics",
help="Enable to collect HW metrics through Perfspect tool which need to be installed separately.",
)
hw_metrics_options.add_argument(
"--perfspect-path",
type=str,
dest="tool_perfspect_path",
default=None,
help="Enable HW metrics collection with PerfSpect tool."
" Provide path to PerfSpect binary to enable collection.",
)
hw_metrics_options.add_argument(
"--perfspect-duration",
action="store",
type=positive_integer,
dest="tool_perfspect_duration",
default=60,
help="The default perfspect tool collection time is 60 second.",
)

args = parser.parse_args()

args.perf_inject = args.nodejs_mode == "perf"
Expand Down Expand Up @@ -1073,6 +1134,13 @@ def main() -> None:
if args.heartbeat_file is not None:
heartbeat_file_path = Path(args.heartbeat_file)

perfspect_path: Optional[Path] = None
if args.tool_perfspect_path is not None:
perfspect_path = Path(args.tool_perfspect_path)
if not perfspect_path.is_file():
logger.error(f"PerfSpect tool {args.tool_perfspect_path} does not exist!")
sys.exit(1)

try:
log_system_info()
except Exception:
Expand Down Expand Up @@ -1151,12 +1219,15 @@ def main() -> None:
duration=args.duration,
profile_api_version=args.profile_api_version,
profiling_mode=args.profiling_mode,
collect_hw_metrics=args.collect_hw_metrics,
profile_spawned_processes=args.profile_spawned_processes,
remote_logs_handler=remote_logs_handler,
controller_process=controller_process,
processes_to_profile=processes_to_profile,
external_metadata_path=external_metadata_path,
heartbeat_file_path=heartbeat_file_path,
perfspect_path=args.tool_perfspect_path,
perfspect_duration=args.tool_perfspect_duration,
)
logger.info("gProfiler initialized and ready to start profiling")
if args.continuous:
Expand Down
Loading