feat: Add TraceLens integration for trace analysis with MLflow upload#439
feat: Add TraceLens integration for trace analysis with MLflow upload#439
Conversation
- Add TraceLens trace analysis report generation (XLSX, CSV formats) - Add mlflow_upload_tracelens_report config option (default: false) - Add mlflow_tracelens_ranks, mlflow_tracelens_max_reports options - Add mlflow_tracelens_output_format option (all, xlsx, csv) - Auto-install TraceLens from GitHub if not present - Upload analysis reports to MLflow artifacts/trace_analysis/
There was a problem hiding this comment.
Pull request overview
This PR adds TraceLens integration to automatically generate performance analysis reports from PyTorch profiler traces and upload them to MLflow. TraceLens is auto-installed from GitHub if not present, and users can configure rank filtering, report limits, and output formats (XLSX/CSV/HTML).
Key changes:
- New module for MLflow artifact management with TraceLens integration
- Automatic TraceLens installation from GitHub with fallback CSV generation
- Configuration options to control trace analysis (ranks, max reports, output formats)
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 14 comments.
| File | Description |
|---|---|
| primus/backends/megatron/training/mlflow_artifacts.py | New 725-line module implementing trace/log file uploads and TraceLens report generation with fallback CSV summary |
| primus/backends/megatron/training/global_vars.py | Adds import and wrapper function upload_mlflow_artifacts to expose artifact upload functionality |
| primus/modules/trainer/megatron/trainer.py | Calls upload_mlflow_artifacts before ending MLflow run with configuration parameters from args |
| primus/configs/modules/megatron/primus_megatron_module.yaml | Adds 6 new configuration options for controlling trace/log uploads and TraceLens analysis |
Comments suppressed due to low confidence (2)
primus/backends/megatron/training/mlflow_artifacts.py:382
- Variable dfs is not used.
dfs = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path)
primus/backends/megatron/training/mlflow_artifacts.py:370
- This assignment to 'dfs' is unnecessary as it is redefined before this value is used.
dfs = generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 20 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
df2e40a to
2861bdf
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…config parser Addresses Copilot review comment: if mlflow_tracelens_ranks is configured as a string in YAML (e.g., '[0,8]' instead of [0, 8]), the code would receive a string instead of a list, causing _filter_traces_by_rank to silently filter out all trace files. Added ast.literal_eval() conversion in: - generate_tracelens_reports() - upload_tracelens_reports_to_mlflow() Falls back to None (process all ranks) with a warning if parsing fails.
When output_format='all', previously the trace file was parsed twice: - Once for XLSX generation - Once for CSV generation Now when format is 'all', we call generate_perf_report_pytorch once with both output_xlsx_path and output_csvs_dir parameters, parsing the trace file only once and generating both formats from the same data. This improves performance significantly for the common use case of generating both report formats.
After TraceLens reports are successfully uploaded to MLflow, the local tracelens_reports directory is automatically cleaned up to save disk space. This addresses the issue of temporary directories not being cleaned up after artifact upload. The reports remain accessible in MLflow while freeing up local storage. Other directories checked: - tensorboard_dir: Contains original trace files, NOT temporary - exp_root_path/logs: Contains original log files, NOT temporary - tracelens_reports: Processed reports uploaded to MLflow, safe to cleanup
Added mlflow_tracelens_cleanup_after_upload parameter to control whether local TraceLens reports are removed after upload to MLflow. Default: True (cleanup to save disk space) Set to False to keep reports locally for inspection/debugging Changes: - Added cleanup_after_upload parameter to upload_tracelens_reports_to_mlflow() - Added tracelens_cleanup_after_upload to upload_artifacts_to_mlflow() - Added mlflow_tracelens_cleanup_after_upload config in YAML (default: true) - Updated trainer to pass through the parameter Use cases: - True (default): Production runs, save disk space - False: Development/debugging, keep local copies for inspection
Move upload_mlflow_artifacts() from global_vars.py to new mlflow_setup.py to reduce merge conflicts. Includes TraceLens report generation and upload parameters. global_vars.py now matches main, avoiding future conflicts when merging from main branch.
- Document performance tradeoff for "all" format (parses trace twice) - Add explicit warning when rank filtering results in zero matching files - Add defensive checks for dfs return value before using len() - Improve fallback error messages to clarify limitations
| ############################################################################### | ||
| # Copyright (c) 2025, Advanced Micro Devices, Inc. All rights reserved. | ||
| # | ||
| # See LICENSE for license information. | ||
| ############################################################################### | ||
|
|
||
| """ | ||
| MLflow Artifact Logging Utilities with TraceLens Integration | ||
|
|
||
| This module provides functions to upload trace files, log files, and | ||
| TraceLens analysis reports to MLflow when MLflow tracking is enabled. | ||
|
|
||
| Features: | ||
| - Upload profiler trace files from all profiled ranks (including multi-node) | ||
| - Upload log files from all levels and all ranks | ||
| - Generate and upload TraceLens trace analysis reports | ||
| - Supports both local and distributed training scenarios | ||
|
|
||
| MLflow Artifact Structure: | ||
| artifacts/ | ||
| ├── traces/ # PyTorch profiler trace files | ||
| │ ├── rank_0_step_2.json.gz | ||
| │ └── ... | ||
| ├── logs/ # Training log files | ||
| │ └── log_mp_pretrain.txt | ||
| └── trace_analysis/ # TraceLens analysis reports | ||
| ├── rank_0_analysis.xlsx # Multi-tab Excel (default) | ||
| └── ... | ||
|
|
||
| TraceLens Report Formats: | ||
| - xlsx: Multi-tab Excel with sections for kernels, memory, communication, etc. | ||
| - csv: Multiple CSV files per rank (kernels, memory, communication, etc.) | ||
| - all: Both xlsx and csv files (default) | ||
| """ | ||
|
|
||
| import glob | ||
| import os | ||
| import subprocess | ||
| import sys | ||
| from typing import List, Optional | ||
|
|
||
| from primus.modules.module_utils import log_rank_0, warning_rank_0 | ||
|
|
||
|
|
||
| def _get_all_trace_files(tensorboard_dir: str) -> list: | ||
| """ | ||
| Find all profiler trace files in the tensorboard directory. | ||
|
|
||
| Trace files are typically named like: | ||
| - *.pt.trace.json | ||
| - *.pt.trace.json.gz | ||
|
|
||
| Args: | ||
| tensorboard_dir: Path to the tensorboard directory containing trace files | ||
|
|
||
| Returns: | ||
| List of paths to trace files | ||
| """ | ||
| if not tensorboard_dir or not os.path.exists(tensorboard_dir): | ||
| return [] | ||
|
|
||
| trace_files = [] | ||
| # Look for PyTorch profiler trace files (both compressed and uncompressed) | ||
| # Using specific patterns to avoid matching unrelated JSON files | ||
| patterns = ["*.pt.trace.json", "*.pt.trace.json.gz"] | ||
| # Escape directory path to handle special characters like [] in experiment names | ||
| escaped_dir = glob.escape(tensorboard_dir) | ||
| for pattern in patterns: | ||
| trace_files.extend(glob.glob(os.path.join(escaped_dir, pattern))) | ||
| trace_files.extend(glob.glob(os.path.join(escaped_dir, "**", pattern), recursive=True)) | ||
|
|
||
| # Remove duplicates while preserving order | ||
| seen = set() | ||
| unique_files = [] | ||
| for f in trace_files: | ||
| if f not in seen: | ||
| seen.add(f) | ||
| unique_files.append(f) | ||
|
|
||
| return unique_files | ||
|
|
||
|
|
||
| def _get_all_log_files(exp_root_path: str) -> list: | ||
| """ | ||
| Find all log files in the experiment logs directory. | ||
|
|
||
| Log files are organized as: | ||
| - {exp_root_path}/logs/master/master-*.log | ||
| - {exp_root_path}/logs/{module_name}/rank-{rank}/*.log | ||
|
|
||
| Args: | ||
| exp_root_path: Root path of the experiment | ||
|
|
||
| Returns: | ||
| List of paths to log files | ||
| """ | ||
| if not exp_root_path: | ||
| return [] | ||
|
|
||
| logs_dir = os.path.join(exp_root_path, "logs") | ||
| if not os.path.exists(logs_dir): | ||
| return [] | ||
|
|
||
| log_files = [] | ||
| # Find all .log files recursively (escape path to handle special characters) | ||
| log_files.extend(glob.glob(os.path.join(glob.escape(logs_dir), "**", "*.log"), recursive=True)) | ||
|
|
||
| return log_files | ||
|
|
||
|
|
||
| def upload_trace_files_to_mlflow( | ||
| mlflow_writer, | ||
| tensorboard_dir: str, | ||
| artifact_path: str = "traces", | ||
| ) -> int: | ||
| """ | ||
| Upload all profiler trace files to MLflow as artifacts. | ||
|
|
||
| This function collects trace files from the tensorboard directory and | ||
| uploads them to MLflow. In distributed settings, only rank 0 (or the | ||
| last rank where MLflow writer is initialized) should call this. | ||
|
|
||
| Args: | ||
| mlflow_writer: The MLflow module instance (from get_mlflow_writer()) | ||
| tensorboard_dir: Path to the tensorboard directory containing trace files | ||
| artifact_path: MLflow artifact subdirectory for trace files | ||
|
|
||
| Returns: | ||
| Number of trace files uploaded | ||
| """ | ||
| if mlflow_writer is None: | ||
| return 0 | ||
|
|
||
| log_rank_0(f"[MLflow] Searching for trace files in: {tensorboard_dir}") | ||
| trace_files = _get_all_trace_files(tensorboard_dir) | ||
| if len(trace_files) > 5: | ||
| log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files[:5]}...") | ||
| else: | ||
| log_rank_0(f"[MLflow] Found {len(trace_files)} trace files: {trace_files}") | ||
|
|
||
| if not trace_files: | ||
| log_rank_0("[MLflow] No trace files found to upload") | ||
| return 0 | ||
|
|
||
| uploaded_count = 0 | ||
| for trace_file in trace_files: | ||
| try: | ||
| # Get relative path from tensorboard_dir for artifact organization | ||
| rel_path = os.path.relpath(trace_file, tensorboard_dir) | ||
| # Determine artifact subdirectory based on file location | ||
| artifact_subpath = ( | ||
| os.path.join(artifact_path, os.path.dirname(rel_path)) | ||
| if os.path.dirname(rel_path) | ||
| else artifact_path | ||
| ) | ||
|
|
||
| mlflow_writer.log_artifact(trace_file, artifact_path=artifact_subpath) | ||
| uploaded_count += 1 | ||
| log_rank_0(f"[MLflow] Uploaded trace file: {os.path.basename(trace_file)}") | ||
| except Exception as e: | ||
| warning_rank_0(f"[MLflow] Failed to upload trace file {trace_file}: {e}") | ||
|
|
||
| log_rank_0(f"[MLflow] Uploaded {uploaded_count} trace files to '{artifact_path}'") | ||
| return uploaded_count | ||
|
|
||
|
|
||
| def upload_log_files_to_mlflow( | ||
| mlflow_writer, | ||
| exp_root_path: str, | ||
| artifact_path: str = "logs", | ||
| ) -> int: | ||
| """ | ||
| Upload all log files to MLflow as artifacts. | ||
|
|
||
| This function collects log files from all ranks and all log levels | ||
| and uploads them to MLflow. The directory structure is preserved | ||
| in the artifact path. | ||
|
|
||
| Args: | ||
| mlflow_writer: The MLflow module instance (from get_mlflow_writer()) | ||
| exp_root_path: Root path of the experiment | ||
| artifact_path: MLflow artifact subdirectory for log files | ||
|
|
||
| Returns: | ||
| Number of log files uploaded | ||
| """ | ||
| if mlflow_writer is None: | ||
| return 0 | ||
|
|
||
| log_files = _get_all_log_files(exp_root_path) | ||
|
|
||
| if not log_files: | ||
| log_rank_0("[MLflow] No log files found to upload") | ||
| return 0 | ||
|
|
||
| logs_base_dir = os.path.join(exp_root_path, "logs") | ||
| uploaded_count = 0 | ||
|
|
||
| for log_file in log_files: | ||
| try: | ||
| # Preserve directory structure relative to logs base directory | ||
| rel_path = os.path.relpath(log_file, logs_base_dir) | ||
| artifact_subpath = ( | ||
| os.path.join(artifact_path, os.path.dirname(rel_path)) | ||
| if os.path.dirname(rel_path) | ||
| else artifact_path | ||
| ) | ||
|
|
||
| mlflow_writer.log_artifact(log_file, artifact_path=artifact_subpath) | ||
| uploaded_count += 1 | ||
| except Exception as e: | ||
| warning_rank_0(f"[MLflow] Failed to upload log file {log_file}: {e}") | ||
|
|
||
| log_rank_0(f"[MLflow] Uploaded {uploaded_count} log files to '{artifact_path}'") | ||
| return uploaded_count | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # TraceLens Integration | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| def _ensure_openpyxl_installed() -> bool: | ||
| """ | ||
| Ensure openpyxl is installed for XLSX generation. | ||
|
|
||
| Returns: | ||
| True if openpyxl is available, False otherwise | ||
| """ | ||
| try: | ||
| import openpyxl # noqa: F401 | ||
|
|
||
| return True | ||
| except ImportError: | ||
| log_rank_0("[TraceLens] openpyxl not found, installing for XLSX support...") | ||
| try: | ||
| subprocess.check_call( | ||
| [sys.executable, "-m", "pip", "install", "openpyxl", "-q"], | ||
| stdout=subprocess.DEVNULL, | ||
| stderr=subprocess.DEVNULL, | ||
| ) | ||
| log_rank_0("[TraceLens] Successfully installed openpyxl") | ||
| return True | ||
| except subprocess.CalledProcessError as e: | ||
| warning_rank_0(f"[TraceLens] Failed to install openpyxl: {e}") | ||
| return False | ||
|
|
||
|
|
||
| def _ensure_tracelens_installed() -> bool: | ||
| """ | ||
| Ensure TraceLens and its dependencies are installed. | ||
|
|
||
| TraceLens is available from GitHub: https://github.com/AMD-AGI/TraceLens | ||
| XLSX generation requires openpyxl which is installed separately. | ||
|
|
||
| Returns: | ||
| True if TraceLens is available, False otherwise | ||
| """ | ||
| try: | ||
| import TraceLens # noqa: F401 | ||
|
|
||
| log_rank_0("[TraceLens] TraceLens is already installed") | ||
| except ImportError: | ||
| log_rank_0("[TraceLens] TraceLens not found, attempting to install from GitHub...") | ||
| try: | ||
| # TraceLens is on GitHub, not PyPI | ||
| subprocess.check_call( | ||
| [ | ||
| sys.executable, | ||
| "-m", | ||
| "pip", | ||
| "install", | ||
| "git+https://github.com/AMD-AGI/TraceLens.git", | ||
| "-q", | ||
| ], | ||
| stdout=subprocess.DEVNULL, | ||
| stderr=subprocess.DEVNULL, | ||
| ) | ||
| log_rank_0("[TraceLens] Successfully installed TraceLens from GitHub") | ||
| except subprocess.CalledProcessError as e: | ||
| warning_rank_0(f"[TraceLens] Failed to install TraceLens: {e}") | ||
| return False | ||
|
|
||
| # Ensure openpyxl is installed for XLSX generation | ||
| _ensure_openpyxl_installed() | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| def _extract_rank_from_filename(filename: str) -> Optional[int]: | ||
| """ | ||
| Extract rank number from trace filename. | ||
|
|
||
| Expected patterns: | ||
| - rank_0_step_2.json.gz | ||
| - primus-megatron-exp-rank[0].*.json | ||
|
|
||
| Args: | ||
| filename: The trace filename | ||
|
|
||
| Returns: | ||
| Rank number or None if not found | ||
| """ | ||
| import re | ||
|
|
||
| # Try pattern: rank_N_ or rank[N] | ||
| patterns = [ | ||
| r"rank_(\d+)_", | ||
| r"rank\[(\d+)\]", | ||
| r"-rank(\d+)\.", | ||
| r"_rank(\d+)\.", | ||
| ] | ||
|
|
||
| for pattern in patterns: | ||
| match = re.search(pattern, filename) | ||
| if match: | ||
| return int(match.group(1)) | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def _filter_traces_by_rank(trace_files: List[str], ranks: List[int]) -> List[str]: | ||
| """ | ||
| Filter trace files to only include specified ranks. | ||
|
|
||
| Args: | ||
| trace_files: List of trace file paths | ||
| ranks: List of rank numbers to include | ||
|
|
||
| Returns: | ||
| Filtered list of trace files | ||
| """ | ||
| if not ranks: | ||
| return trace_files | ||
|
|
||
| filtered = [] | ||
| for trace_file in trace_files: | ||
| rank = _extract_rank_from_filename(os.path.basename(trace_file)) | ||
| if rank is not None and rank in ranks: | ||
| filtered.append(trace_file) | ||
|
|
||
| return filtered | ||
|
|
||
|
|
||
| def generate_tracelens_report( | ||
| trace_file: str, | ||
| output_dir: str, | ||
| report_name: Optional[str] = None, | ||
| output_format: str = "all", | ||
| ) -> List[str]: | ||
| """ | ||
| Generate a TraceLens analysis report for a single trace file. | ||
|
|
||
| Args: | ||
| trace_file: Path to the PyTorch profiler trace file (JSON/JSON.GZ) | ||
| output_dir: Directory to save the report | ||
| report_name: Optional custom name for the report (base name for CSVs) | ||
| output_format: Output format: | ||
| - "all" (default): Both XLSX and CSV files | ||
| - "xlsx": Single multi-tab Excel file with detailed analysis | ||
| - "csv": Multiple CSV files (kernels, memory, communication, etc.) | ||
|
|
||
| Returns: | ||
| List of paths to generated report files | ||
| """ | ||
| if not os.path.exists(trace_file): | ||
| warning_rank_0(f"[TraceLens] Trace file not found: {trace_file}") | ||
| return [] | ||
|
|
||
| os.makedirs(output_dir, exist_ok=True) | ||
|
|
||
| # Generate base name from trace filename if not provided | ||
| if report_name is None: | ||
| base_name = os.path.basename(trace_file) | ||
| # Remove extensions like .json.gz | ||
| for trace_ext in [".json.gz", ".json", ".pt.trace.json.gz", ".pt.trace.json"]: | ||
| if base_name.endswith(trace_ext): | ||
| base_name = base_name[: -len(trace_ext)] | ||
| break | ||
| report_name = base_name | ||
|
|
||
| try: | ||
| # Try using TraceLens Python API directly | ||
| from TraceLens.Reporting import generate_perf_report_pytorch | ||
|
|
||
| generated_files = [] | ||
|
|
||
| # For "all" format: TraceLens uses either/or logic - if output_csvs_dir is set, | ||
| # it ONLY generates CSVs. So we need to call it twice for both formats. | ||
| # Note: This means the trace file is parsed twice, roughly doubling processing time | ||
| # compared to a single format. This is a TraceLens limitation, not a bug. | ||
| if output_format == "all": | ||
| xlsx_path = os.path.join(output_dir, f"{report_name}_analysis.xlsx") | ||
| csv_subdir = os.path.join(output_dir, report_name) | ||
| os.makedirs(csv_subdir, exist_ok=True) | ||
|
|
||
| # First call: Generate XLSX only | ||
| dfs = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path) | ||
|
|
||
| # Check XLSX output | ||
| if os.path.exists(xlsx_path): | ||
| num_tabs = len(dfs) if dfs else 0 | ||
| log_rank_0( | ||
| f"[TraceLens] Generated XLSX report with {num_tabs} tabs: {os.path.basename(xlsx_path)}" | ||
| ) | ||
| generated_files.append(xlsx_path) | ||
|
|
||
| # Second call: Generate CSVs only | ||
| generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) | ||
|
|
||
| # Check CSV outputs (escape path to handle [] characters in filenames) | ||
| csv_files = glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv")) | ||
| if csv_files: | ||
| log_rank_0(f"[TraceLens] Generated {len(csv_files)} CSV files for {report_name}") | ||
| generated_files.append(csv_subdir) # Upload directory to preserve structure | ||
|
|
||
| elif output_format == "xlsx": | ||
| # XLSX only: Single file with multiple tabs | ||
| xlsx_path = os.path.join(output_dir, f"{report_name}_analysis.xlsx") | ||
| dfs = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path) | ||
| if os.path.exists(xlsx_path): | ||
| num_tabs = len(dfs) if dfs else 0 | ||
| log_rank_0( | ||
| f"[TraceLens] Generated XLSX report with {num_tabs} tabs: {os.path.basename(xlsx_path)}" | ||
| ) | ||
| generated_files.append(xlsx_path) | ||
|
|
||
| elif output_format == "csv": | ||
| # CSV only: Multiple files in a subdirectory per rank | ||
| csv_subdir = os.path.join(output_dir, report_name) | ||
| os.makedirs(csv_subdir, exist_ok=True) | ||
| dfs = generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) | ||
|
|
||
| # Collect all generated CSV files (escape path to handle [] characters in filenames) | ||
| csv_files = glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv")) | ||
| if csv_files: | ||
| log_rank_0(f"[TraceLens] Generated {len(csv_files)} CSV files for {report_name}") | ||
| generated_files.append(csv_subdir) # Upload directory to preserve structure | ||
|
|
||
| if generated_files: | ||
| return generated_files | ||
|
|
||
| warning_rank_0(f"[TraceLens] No output files generated for: {trace_file}") | ||
| return [] | ||
|
|
||
| except ImportError: | ||
| warning_rank_0( | ||
| "[TraceLens] TraceLens not available. Using simplified fallback CSV summary. " | ||
| "Install TraceLens for comprehensive kernel, memory, and communication analysis." | ||
| ) | ||
| # Fallback to simple CSV summary (basic stats only, may not handle all trace formats) | ||
| csv_path = _generate_trace_summary_csv(trace_file, output_dir, f"{report_name}_summary.csv") | ||
| return [csv_path] if csv_path else [] | ||
|
|
||
| except Exception as e: | ||
| warning_rank_0( | ||
| f"[TraceLens] Error generating report: {e}. " | ||
| "Using simplified fallback CSV summary with basic statistics only." | ||
| ) | ||
| # Fallback to simple CSV summary (basic stats only, may not handle all trace formats) | ||
| csv_path = _generate_trace_summary_csv(trace_file, output_dir, f"{report_name}_summary.csv") | ||
| return [csv_path] if csv_path else [] | ||
|
|
||
|
|
||
| def _generate_trace_summary_csv( | ||
| trace_file: str, | ||
| output_dir: str, | ||
| report_name: str, | ||
| ) -> Optional[str]: | ||
| """ | ||
| Generate a CSV summary from a PyTorch profiler trace file. | ||
|
|
||
| This is a fallback when TraceLens is not available. | ||
| Extracts key metrics from the trace JSON and writes to CSV. | ||
|
|
||
| Args: | ||
| trace_file: Path to the trace file | ||
| output_dir: Output directory | ||
| report_name: Name for the CSV file | ||
|
|
||
| Returns: | ||
| Path to generated CSV or None if failed | ||
| """ | ||
| import csv | ||
| import gzip | ||
| import json | ||
|
|
||
| try: | ||
| # Load trace file | ||
| if trace_file.endswith(".gz"): | ||
| with gzip.open(trace_file, "rt", encoding="utf-8") as f: | ||
| trace_data = json.load(f) | ||
| else: | ||
| with open(trace_file, "r", encoding="utf-8") as f: | ||
| trace_data = json.load(f) | ||
|
|
||
| # Extract events from trace | ||
| events = trace_data.get("traceEvents", []) | ||
| if not events: | ||
| warning_rank_0(f"[TraceLens] No events found in trace: {trace_file}") | ||
| return None | ||
|
|
||
| # Aggregate kernel/operation statistics | ||
| op_stats = {} | ||
| for event in events: | ||
| if event.get("cat") in ["kernel", "gpu_memcpy", "cuda_runtime", "cpu_op"]: | ||
| name = event.get("name", "unknown") | ||
| dur = event.get("dur", 0) # duration in microseconds | ||
|
|
||
| if name not in op_stats: | ||
| op_stats[name] = {"count": 0, "total_us": 0, "min_us": float("inf"), "max_us": 0} | ||
|
|
||
| op_stats[name]["count"] += 1 | ||
| op_stats[name]["total_us"] += dur | ||
| op_stats[name]["min_us"] = min(op_stats[name]["min_us"], dur) | ||
| op_stats[name]["max_us"] = max(op_stats[name]["max_us"], dur) | ||
|
|
||
| if not op_stats: | ||
| warning_rank_0(f"[TraceLens] No kernel/op events found in trace: {trace_file}") | ||
| return None | ||
|
|
||
| # Sort by total time descending | ||
| sorted_ops = sorted(op_stats.items(), key=lambda x: x[1]["total_us"], reverse=True) | ||
|
|
||
| # Write CSV | ||
| output_path = os.path.join(output_dir, report_name) | ||
| with open(output_path, "w", newline="", encoding="utf-8") as csvfile: | ||
| writer = csv.writer(csvfile) | ||
| writer.writerow( | ||
| [ | ||
| "Operation", | ||
| "Count", | ||
| "Total Time (ms)", | ||
| "Avg Time (ms)", | ||
| "Min Time (ms)", | ||
| "Max Time (ms)", | ||
| "% of Total", | ||
| ] | ||
| ) | ||
|
|
||
| total_time = sum(stats["total_us"] for _, stats in sorted_ops) | ||
| for name, stats in sorted_ops: | ||
| avg_us = stats["total_us"] / stats["count"] if stats["count"] > 0 else 0 | ||
| pct = (stats["total_us"] / total_time * 100) if total_time > 0 else 0 | ||
| writer.writerow( | ||
| [ | ||
| name, | ||
| stats["count"], | ||
| f"{stats['total_us'] / 1000:.3f}", | ||
| f"{avg_us / 1000:.3f}", | ||
| f"{stats['min_us'] / 1000:.3f}", | ||
| f"{stats['max_us'] / 1000:.3f}", | ||
| f"{pct:.2f}", | ||
| ] | ||
| ) | ||
|
|
||
| log_rank_0(f"[TraceLens] Generated CSV summary: {report_name} ({len(sorted_ops)} operations)") | ||
| return output_path | ||
|
|
||
| except json.JSONDecodeError as e: | ||
| warning_rank_0(f"[TraceLens] Failed to parse trace JSON: {e}") | ||
| return None | ||
| except Exception as e: | ||
| warning_rank_0(f"[TraceLens] Error generating CSV summary: {e}") | ||
| return None | ||
|
|
||
|
|
||
| def generate_tracelens_reports( | ||
| tensorboard_dir: str, | ||
| output_dir: str, | ||
| ranks: Optional[List[int]] = None, | ||
| output_format: str = "all", | ||
| ) -> List[str]: | ||
| """ | ||
| Generate TraceLens analysis reports for trace files. | ||
|
|
||
| Args: | ||
| tensorboard_dir: Directory containing PyTorch profiler trace files | ||
| output_dir: Directory to save the generated reports | ||
| ranks: List of ranks to generate reports for (None = all ranks) | ||
| To limit number of reports, specify fewer ranks in the list | ||
| output_format: Output format: | ||
| - "all" (default): Both XLSX and CSV files | ||
| - "xlsx": Multi-tab Excel with detailed analysis | ||
| - "csv": Multiple CSV files per rank (kernels, memory, comm, etc.) | ||
|
|
||
| Returns: | ||
| List of paths to all generated report files | ||
| """ | ||
| # Try to install tracelens, but continue with fallback if not available | ||
| _ensure_tracelens_installed() | ||
|
|
||
| # Normalize ranks parameter: handle string input from config parser | ||
| if ranks is not None and isinstance(ranks, str): | ||
| import ast | ||
|
|
||
| try: | ||
| ranks = ast.literal_eval(ranks) | ||
| if not isinstance(ranks, list): | ||
| log_rank_0( | ||
| f"[TraceLens] Warning: ranks evaluated to {type(ranks).__name__}, expected list. Using None." | ||
| ) | ||
| ranks = None | ||
| except (ValueError, SyntaxError) as e: | ||
| log_rank_0(f"[TraceLens] Warning: Failed to parse ranks '{ranks}': {e}. Using None.") | ||
| ranks = None | ||
|
|
||
| trace_files = _get_all_trace_files(tensorboard_dir) | ||
| if not trace_files: | ||
| log_rank_0("[TraceLens] No trace files found for analysis") | ||
| return [] | ||
|
|
||
| # Filter by ranks if specified | ||
| if ranks is not None: | ||
| original_count = len(trace_files) | ||
| trace_files = _filter_traces_by_rank(trace_files, ranks) | ||
| log_rank_0(f"[TraceLens] Filtered to {len(trace_files)} trace files for ranks: {ranks}") | ||
| if not trace_files and original_count > 0: | ||
| warning_rank_0( | ||
| f"[TraceLens] Warning: No trace files match the specified ranks {ranks}. " | ||
| f"Found {original_count} trace files but none matched. " | ||
| "Check that the rank numbers are correct." | ||
| ) | ||
|
|
||
| log_rank_0( | ||
| f"[TraceLens] Generating {output_format.upper()} reports for {len(trace_files)} trace files..." | ||
| ) | ||
|
|
||
| generated_reports = [] | ||
| for trace_file in trace_files: | ||
| # generate_tracelens_report now returns a list of files | ||
| report_paths = generate_tracelens_report(trace_file, output_dir, output_format=output_format) | ||
| generated_reports.extend(report_paths) | ||
|
|
||
| log_rank_0(f"[TraceLens] Generated {len(generated_reports)} report files from {len(trace_files)} traces") | ||
| return generated_reports | ||
|
|
||
|
|
||
| def generate_tracelens_reports_locally( | ||
| tensorboard_dir: str, | ||
| exp_root_path: str, | ||
| ranks: Optional[List[int]] = None, | ||
| output_format: str = "all", | ||
| ) -> int: | ||
| """ | ||
| Generate TraceLens analysis reports locally (without MLflow upload). | ||
|
|
||
| This function generates TraceLens reports and saves them to | ||
| exp_root_path/tracelens_reports/ for local inspection. | ||
|
|
||
| Args: | ||
| tensorboard_dir: Directory containing PyTorch profiler trace files | ||
| exp_root_path: Root path of the experiment (for saving reports) | ||
| ranks: List of ranks to analyze (None = all ranks, [0] = rank 0 only) | ||
| Specify fewer ranks to limit number of reports | ||
| output_format: Report format - "all" (default, xlsx+csv), "xlsx", or "csv" | ||
|
|
||
| Returns: | ||
| Number of reports generated | ||
|
|
||
| Example: | ||
| >>> generate_tracelens_reports_locally( | ||
| ... tensorboard_dir="/path/to/tensorboard", | ||
| ... exp_root_path="/path/to/experiment", | ||
| ... ranks=[0, 8], # Only 2 ranks = 2 reports | ||
| ... output_format="all" | ||
| ... ) | ||
| 26 # Generated 26 report files (XLSX + CSVs for 2 ranks) | ||
| """ | ||
| # Create output directory for reports | ||
| reports_dir = os.path.join(exp_root_path, "tracelens_reports") | ||
| os.makedirs(reports_dir, exist_ok=True) | ||
|
|
||
| log_rank_0(f"[TraceLens] Generating reports from traces in: {tensorboard_dir}") | ||
| log_rank_0(f"[TraceLens] Reports will be saved to: {reports_dir}") | ||
| if ranks: | ||
| log_rank_0(f"[TraceLens] Analyzing ranks: {ranks}") | ||
|
|
||
| # Generate reports | ||
| reports = generate_tracelens_reports( | ||
| tensorboard_dir=tensorboard_dir, | ||
| output_dir=reports_dir, | ||
| ranks=ranks, | ||
| output_format=output_format, | ||
| ) | ||
|
|
||
| if not reports: | ||
| log_rank_0("[TraceLens] No reports generated") | ||
| return 0 | ||
|
|
||
| log_rank_0(f"[TraceLens] Generated {len(reports)} report files locally") | ||
| return len(reports) | ||
|
|
||
|
|
||
| def upload_tracelens_reports_to_mlflow( | ||
| mlflow_writer, | ||
| tensorboard_dir: str, | ||
| exp_root_path: str, | ||
| ranks: Optional[List[int]] = None, | ||
| output_format: str = "all", | ||
| artifact_path: str = "trace_analysis", | ||
| cleanup_after_upload: bool = False, | ||
| ) -> int: | ||
| """ | ||
| Generate TraceLens reports and upload them to MLflow. | ||
|
|
||
| This function: | ||
| 1. Finds PyTorch profiler trace files | ||
| 2. Generates TraceLens analysis reports for specified ranks | ||
| 3. Uploads the reports to MLflow under the trace_analysis artifact path | ||
| 4. Optionally cleans up local report files after successful upload | ||
|
|
||
| Args: | ||
| mlflow_writer: The MLflow module instance (from get_mlflow_writer()) | ||
| tensorboard_dir: Directory containing PyTorch profiler trace files | ||
| exp_root_path: Root path of the experiment (for saving reports) | ||
| ranks: List of ranks to analyze (None = all ranks, [0] = rank 0 only) | ||
| Specify fewer ranks to limit number of reports | ||
| output_format: Report format - "all" (default, xlsx+csv), "xlsx", or "csv" | ||
| artifact_path: MLflow artifact subdirectory for reports | ||
| cleanup_after_upload: If True, removes local reports after upload to save disk space. | ||
| If False, keeps reports locally for inspection. Default: False. | ||
|
|
||
| Returns: | ||
| Number of reports uploaded to MLflow | ||
|
|
||
| Note: | ||
| Reports are saved to exp_root_path/tracelens_reports/ and kept locally by default. | ||
| Set cleanup_after_upload=True to remove them after upload and save disk space. | ||
| """ | ||
| if mlflow_writer is None: | ||
| log_rank_0("[TraceLens] MLflow writer not available, skipping report upload") | ||
| return 0 | ||
|
|
||
| # Normalize ranks parameter: handle string input from config parser | ||
| if ranks is not None and isinstance(ranks, str): | ||
| import ast | ||
|
|
||
| try: | ||
| ranks = ast.literal_eval(ranks) | ||
| if not isinstance(ranks, list): | ||
| log_rank_0( | ||
| f"[TraceLens] Warning: ranks evaluated to {type(ranks).__name__}, expected list. Using None." | ||
| ) | ||
| ranks = None | ||
| except (ValueError, SyntaxError) as e: | ||
| log_rank_0(f"[TraceLens] Warning: Failed to parse ranks '{ranks}': {e}. Using None.") | ||
| ranks = None | ||
|
|
||
| # Create output directory for reports | ||
| reports_dir = os.path.join(exp_root_path, "tracelens_reports") | ||
| os.makedirs(reports_dir, exist_ok=True) | ||
|
|
||
| log_rank_0(f"[TraceLens] Generating reports from traces in: {tensorboard_dir}") | ||
| log_rank_0(f"[TraceLens] Reports will be saved to: {reports_dir}") | ||
| if ranks: | ||
| log_rank_0(f"[TraceLens] Analyzing ranks: {ranks}") | ||
|
|
||
| # Generate reports | ||
| reports = generate_tracelens_reports( | ||
| tensorboard_dir=tensorboard_dir, | ||
| output_dir=reports_dir, | ||
| ranks=ranks, | ||
| output_format=output_format, | ||
| ) | ||
|
|
||
| if not reports: | ||
| log_rank_0("[TraceLens] No reports generated, nothing to upload") | ||
| return 0 | ||
|
|
||
| # Upload reports to MLflow | ||
| uploaded_count = 0 | ||
| for report_path in reports: | ||
| try: | ||
| mlflow_writer.log_artifact(report_path, artifact_path=artifact_path) | ||
| uploaded_count += 1 | ||
| log_rank_0(f"[MLflow] Uploaded TraceLens report: {os.path.basename(report_path)}") | ||
| except Exception as e: | ||
| warning_rank_0(f"[MLflow] Failed to upload report {report_path}: {e}") | ||
|
|
||
| log_rank_0(f"[TraceLens] Uploaded {uploaded_count} reports to '{artifact_path}'") | ||
|
|
||
| # Optionally clean up local reports after successful upload to save disk space | ||
| if cleanup_after_upload: | ||
| try: | ||
| import shutil | ||
|
|
||
| shutil.rmtree(reports_dir) | ||
| log_rank_0(f"[TraceLens] Cleaned up local reports directory: {reports_dir}") | ||
| except Exception as e: | ||
| warning_rank_0(f"[TraceLens] Failed to cleanup reports directory: {e}") | ||
| else: | ||
| log_rank_0(f"[TraceLens] Keeping local reports at: {reports_dir}") | ||
|
|
||
| return uploaded_count | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Main Entry Point | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| def upload_artifacts_to_mlflow( | ||
| mlflow_writer, | ||
| tensorboard_dir: Optional[str] = None, | ||
| exp_root_path: Optional[str] = None, | ||
| upload_traces: bool = True, | ||
| upload_logs: bool = True, | ||
| generate_tracelens_report: bool = False, | ||
| upload_tracelens_report: bool = False, | ||
| tracelens_ranks: Optional[List[int]] = None, | ||
| tracelens_output_format: str = "all", | ||
| tracelens_cleanup_after_upload: bool = False, | ||
| ) -> dict: | ||
| """ | ||
| Upload all artifacts (trace files, log files, TraceLens reports) to MLflow. | ||
|
|
||
| This is the main entry point for uploading artifacts to MLflow. | ||
| It handles: | ||
| - Trace files from PyTorch profiler | ||
| - Log files from training | ||
| - TraceLens analysis reports (optional - generate locally and/or upload to MLflow) | ||
|
|
||
| MLflow Artifact Structure: | ||
| artifacts/ | ||
| ├── traces/ # PyTorch profiler trace files | ||
| ├── logs/ # Training log files | ||
| └── trace_analysis/ # TraceLens analysis reports (if uploaded) | ||
|
|
||
| TraceLens Report Generation Logic: | ||
| - If upload_tracelens_report=True: Generate AND upload (auto-enables generation) | ||
| - If generate_tracelens_report=True and upload_tracelens_report=False: Generate locally only | ||
| - If both False: No report generation | ||
|
|
||
| Examples: | ||
| generate=False, upload=False → No reports | ||
| generate=True, upload=False → Generate locally only | ||
| generate=False, upload=True → Generate AND upload (auto-enabled) | ||
| generate=True, upload=True → Generate AND upload (explicit) | ||
|
|
||
| Args: | ||
| mlflow_writer: The MLflow module instance (from get_mlflow_writer()) | ||
| tensorboard_dir: Path to the tensorboard directory containing trace files | ||
| exp_root_path: Root path of the experiment for log files | ||
| upload_traces: Whether to upload trace files | ||
| upload_logs: Whether to upload log files | ||
| generate_tracelens_report: Whether to generate TraceLens reports locally | ||
| upload_tracelens_report: Whether to upload TraceLens reports to MLflow (implies generation) | ||
| tracelens_ranks: List of ranks to generate TraceLens reports for | ||
| (None = all ranks, [0, 8] = ranks 0 and 8 only) | ||
| Specify fewer ranks to limit number of reports | ||
| tracelens_output_format: Report format - "all" (default, xlsx+csv), "xlsx", or "csv" | ||
| tracelens_cleanup_after_upload: If True, removes local reports after upload to save disk space. | ||
| If False, keeps reports locally for inspection (default). | ||
|
|
||
| Returns: | ||
| Dictionary with counts of uploaded files: | ||
| { | ||
| "traces": <number of trace files uploaded>, | ||
| "logs": <number of log files uploaded>, | ||
| "tracelens_reports": <number of TraceLens reports uploaded> | ||
| } | ||
| """ | ||
| if mlflow_writer is None: | ||
| log_rank_0("[MLflow] MLflow writer not available, skipping artifact upload") | ||
| return {"traces": 0, "logs": 0, "tracelens_reports": 0} | ||
|
|
||
| log_rank_0("[MLflow] Starting artifact upload to MLflow...") | ||
| log_rank_0(f"[MLflow] tensorboard_dir: {tensorboard_dir}") | ||
| log_rank_0(f"[MLflow] exp_root_path: {exp_root_path}") | ||
| log_rank_0(f"[MLflow] upload_traces: {upload_traces}, upload_logs: {upload_logs}") | ||
| log_rank_0( | ||
| f"[MLflow] generate_tracelens_report: {generate_tracelens_report}, " | ||
| f"upload_tracelens_report: {upload_tracelens_report}" | ||
| ) | ||
|
|
||
| result = {"traces": 0, "logs": 0, "tracelens_reports": 0} | ||
|
|
||
| # Upload trace files | ||
| if upload_traces and tensorboard_dir: | ||
| result["traces"] = upload_trace_files_to_mlflow( | ||
| mlflow_writer, tensorboard_dir, artifact_path="traces" | ||
| ) | ||
|
|
||
| # Upload log files | ||
| if upload_logs and exp_root_path: | ||
| result["logs"] = upload_log_files_to_mlflow(mlflow_writer, exp_root_path, artifact_path="logs") | ||
|
|
||
| # TraceLens report generation and upload logic | ||
| # If upload=True, auto-enable generation (even if generate=False) | ||
| should_generate = generate_tracelens_report or upload_tracelens_report | ||
|
|
||
| if should_generate and tensorboard_dir and exp_root_path: | ||
| if upload_tracelens_report: | ||
| # Generate AND upload to MLflow | ||
| log_rank_0("[TraceLens] Mode: Generate and upload to MLflow") | ||
| result["tracelens_reports"] = upload_tracelens_reports_to_mlflow( | ||
| mlflow_writer=mlflow_writer, | ||
| tensorboard_dir=tensorboard_dir, | ||
| exp_root_path=exp_root_path, | ||
| ranks=tracelens_ranks, | ||
| output_format=tracelens_output_format, | ||
| artifact_path="trace_analysis", | ||
| cleanup_after_upload=tracelens_cleanup_after_upload, | ||
| ) | ||
| else: | ||
| # Generate locally only (no MLflow upload) | ||
| log_rank_0("[TraceLens] Mode: Generate locally only (no MLflow upload)") | ||
| num_generated = generate_tracelens_reports_locally( | ||
| tensorboard_dir=tensorboard_dir, | ||
| exp_root_path=exp_root_path, | ||
| ranks=tracelens_ranks, | ||
| output_format=tracelens_output_format, | ||
| ) | ||
| # Don't count as "uploaded" since they're local-only | ||
| log_rank_0(f"[TraceLens] Generated {num_generated} report files (not uploaded to MLflow)") | ||
|
|
||
| log_rank_0( | ||
| f"[MLflow] Artifact upload complete: " | ||
| f"{result['traces']} traces, {result['logs']} logs, " | ||
| f"{result['tracelens_reports']} TraceLens reports" | ||
| ) | ||
|
|
||
| return result |
There was a problem hiding this comment.
The new mlflow_artifacts.py module adds significant functionality (TraceLens integration, artifact uploads, report generation) but no corresponding unit tests have been added. The codebase shows comprehensive test coverage for other megatron modules, but this 924-line module with complex logic (file operations, subprocess calls, external API interactions, error handling) is completely untested.
Consider adding unit tests for at least the critical functionality:
- Trace file discovery and filtering
- Rank extraction from filenames
- Report generation with mocked TraceLens API
- Upload logic with mocked mlflow_writer
- Error handling and fallback behavior
- Cleanup logic validation
| # Aggregate kernel/operation statistics | ||
| op_stats = {} | ||
| for event in events: | ||
| if event.get("cat") in ["kernel", "gpu_memcpy", "cuda_runtime", "cpu_op"]: |
There was a problem hiding this comment.
The fallback CSV generation function filters events by category using hardcoded categories including "cuda_runtime" (line 506). However, this code is supposed to work with ROCm/AMD GPUs based on the copyright header mentioning AMD. ROCm traces might use different category names (e.g., "hip_runtime" instead of "cuda_runtime"), which could result in incomplete or empty reports when TraceLens is not available and the fallback is used on AMD systems.
Consider either:
- Adding ROCm-equivalent categories (e.g., "hip_runtime", "hip_memcpy")
- Using a more generic filtering approach
- Documenting that the fallback is CUDA-specific and may not work properly on ROCm systems
| if event.get("cat") in ["kernel", "gpu_memcpy", "cuda_runtime", "cpu_op"]: | |
| if event.get("cat") in [ | |
| "kernel", | |
| "gpu_memcpy", | |
| "cuda_runtime", | |
| "cpu_op", | |
| "hip_runtime", | |
| "hip_memcpy", | |
| ]: |
| if ranks is not None and isinstance(ranks, str): | ||
| import ast | ||
|
|
||
| try: | ||
| ranks = ast.literal_eval(ranks) | ||
| if not isinstance(ranks, list): | ||
| log_rank_0( | ||
| f"[TraceLens] Warning: ranks evaluated to {type(ranks).__name__}, expected list. Using None." | ||
| ) | ||
| ranks = None | ||
| except (ValueError, SyntaxError) as e: | ||
| log_rank_0(f"[TraceLens] Warning: Failed to parse ranks '{ranks}': {e}. Using None.") | ||
| ranks = None |
There was a problem hiding this comment.
The string parsing logic uses ast.literal_eval to convert string representations of lists to actual lists (lines 597-606 and 739-748). While this handles the case where config values might be strings, it's unclear why the ranks parameter would ever be a string when it's declared as Optional[List[int]] in the function signature.
If this is handling a specific edge case with config parsing, consider adding a comment explaining when and why ranks would be a string. Otherwise, this type conversion might be unnecessary defensive coding that adds complexity.
| mlflow_writer.log_artifact(report_path, artifact_path=artifact_path) | ||
| uploaded_count += 1 | ||
| log_rank_0(f"[MLflow] Uploaded TraceLens report: {os.path.basename(report_path)}") |
There was a problem hiding this comment.
The mlflow_writer.log_artifact() is being called with both files (XLSX) and directories (CSV subdirectories) based on lines 406, 415, 426, and 438. However, the behavior of log_artifact when given a directory path depends on the MLflow implementation. If it only expects file paths, uploading directories could fail or behave unexpectedly.
The code assumes that log_artifact can handle both files and directories (see line 415 and 438 where csv_subdir is added to generated_files), but this should be verified or documented. If MLflow's log_artifact doesn't support directories, you'll need to iterate through files in the directory or use a different API method.
| mlflow_writer.log_artifact(report_path, artifact_path=artifact_path) | |
| uploaded_count += 1 | |
| log_rank_0(f"[MLflow] Uploaded TraceLens report: {os.path.basename(report_path)}") | |
| if os.path.isdir(report_path): | |
| # If the report is a directory (e.g., CSV reports), upload its files individually | |
| base_dir_name = os.path.basename(report_path.rstrip(os.sep)) | |
| for root, _, files in os.walk(report_path): | |
| for fname in files: | |
| file_path = os.path.join(root, fname) | |
| # Preserve the directory structure relative to the report directory | |
| rel_path = os.path.relpath(file_path, report_path) | |
| rel_dir = os.path.dirname(rel_path) | |
| if rel_dir and rel_dir != os.curdir: | |
| dest_artifact_path = os.path.join(artifact_path, base_dir_name, rel_dir) | |
| else: | |
| dest_artifact_path = os.path.join(artifact_path, base_dir_name) | |
| mlflow_writer.log_artifact(file_path, artifact_path=dest_artifact_path) | |
| uploaded_count += 1 | |
| log_rank_0( | |
| f"[MLflow] Uploaded TraceLens report file: {os.path.join(base_dir_name, rel_path)}" | |
| ) | |
| else: | |
| # Regular file report (e.g., XLSX) | |
| mlflow_writer.log_artifact(report_path, artifact_path=artifact_path) | |
| uploaded_count += 1 | |
| log_rank_0(f"[MLflow] Uploaded TraceLens report: {os.path.basename(report_path)}") |
| if output_format == "all": | ||
| xlsx_path = os.path.join(output_dir, f"{report_name}_analysis.xlsx") | ||
| csv_subdir = os.path.join(output_dir, report_name) | ||
| os.makedirs(csv_subdir, exist_ok=True) | ||
|
|
||
| # First call: Generate XLSX only | ||
| dfs = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path) | ||
|
|
||
| # Check XLSX output | ||
| if os.path.exists(xlsx_path): | ||
| num_tabs = len(dfs) if dfs else 0 | ||
| log_rank_0( | ||
| f"[TraceLens] Generated XLSX report with {num_tabs} tabs: {os.path.basename(xlsx_path)}" | ||
| ) | ||
| generated_files.append(xlsx_path) | ||
|
|
||
| # Second call: Generate CSVs only | ||
| generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) |
There was a problem hiding this comment.
The function generates reports by parsing trace files twice when output_format is "all" (lines 398 and 409). This is explicitly documented as intentional due to TraceLens API limitations, but it significantly impacts performance - doubling processing time for traces which can be large files (potentially hundreds of MB).
Consider whether this performance tradeoff is acceptable for the default configuration, or if the default should be changed to a single format (xlsx or csv) to avoid the double-parsing overhead. Document this performance implication in the configuration comments so users can make an informed choice.
| try: | ||
| import shutil | ||
|
|
||
| shutil.rmtree(reports_dir) | ||
| log_rank_0(f"[TraceLens] Cleaned up local reports directory: {reports_dir}") | ||
| except Exception as e: | ||
| warning_rank_0(f"[TraceLens] Failed to cleanup reports directory: {e}") |
There was a problem hiding this comment.
The cleanup logic uses shutil.rmtree to remove the entire reports directory after upload (line 788). However, this cleanup happens after uploading reports, not after verifying that all uploads were successful. If some uploads failed (as caught in the exception handler on line 779), the local reports would still be deleted, potentially losing data.
Consider only performing cleanup if all uploads were successful (e.g., check if uploaded_count == len(reports) before cleanup), or at least document this behavior so users understand that failed uploads won't prevent cleanup.
| try: | |
| import shutil | |
| shutil.rmtree(reports_dir) | |
| log_rank_0(f"[TraceLens] Cleaned up local reports directory: {reports_dir}") | |
| except Exception as e: | |
| warning_rank_0(f"[TraceLens] Failed to cleanup reports directory: {e}") | |
| if uploaded_count == len(reports): | |
| try: | |
| import shutil | |
| shutil.rmtree(reports_dir) | |
| log_rank_0(f"[TraceLens] Cleaned up local reports directory: {reports_dir}") | |
| except Exception as e: | |
| warning_rank_0(f"[TraceLens] Failed to cleanup reports directory: {e}") | |
| else: | |
| warning_rank_0( | |
| f"[TraceLens] Skipping cleanup because some reports failed to upload; " | |
| f"keeping local reports at: {reports_dir}" | |
| ) |
| # CSV only: Multiple files in a subdirectory per rank | ||
| csv_subdir = os.path.join(output_dir, report_name) | ||
| os.makedirs(csv_subdir, exist_ok=True) | ||
| dfs = generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) |
There was a problem hiding this comment.
Variable dfs is not used.
| dfs = generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) | |
| generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) |
When mlflow_upload_traces, mlflow_upload_logs, or mlflow_upload_tracelens_report is True: - Auto-enable mlflow (set disable_mlflow=False) - Auto-enable profiling if trace or tracelens upload is requested This removes the need to explicitly set: - --disable_mlflow=False - --profile=True - --use_pytorch_profiler=True
The profiler saves traces to tensorboard_dir, which is None when tensorboard is disabled. This caused a TypeError during trace save. Moved auto-enable logic before tensorboard section and added tensorboard auto-enable when mlflow_upload_traces or mlflow_upload_tracelens_report is True.
| # For "all" format: TraceLens uses either/or logic - if output_csvs_dir is set, | ||
| # it ONLY generates CSVs. So we need to call it twice for both formats. | ||
| # Note: This means the trace file is parsed twice, roughly doubling processing time | ||
| # compared to a single format. This is a TraceLens limitation, not a bug. | ||
| if output_format == "all": | ||
| xlsx_path = os.path.join(output_dir, f"{report_name}_analysis.xlsx") | ||
| csv_subdir = os.path.join(output_dir, report_name) | ||
| os.makedirs(csv_subdir, exist_ok=True) | ||
|
|
||
| # First call: Generate XLSX only | ||
| dfs = generate_perf_report_pytorch(trace_file, output_xlsx_path=xlsx_path) | ||
|
|
||
| # Check XLSX output | ||
| if os.path.exists(xlsx_path): | ||
| num_tabs = len(dfs) if dfs else 0 | ||
| log_rank_0( | ||
| f"[TraceLens] Generated XLSX report with {num_tabs} tabs: {os.path.basename(xlsx_path)}" | ||
| ) | ||
| generated_files.append(xlsx_path) | ||
|
|
||
| # Second call: Generate CSVs only | ||
| generate_perf_report_pytorch(trace_file, output_csvs_dir=csv_subdir) | ||
|
|
||
| # Check CSV outputs (escape path to handle [] characters in filenames) | ||
| csv_files = glob.glob(os.path.join(glob.escape(csv_subdir), "*.csv")) | ||
| if csv_files: | ||
| log_rank_0(f"[TraceLens] Generated {len(csv_files)} CSV files for {report_name}") | ||
| generated_files.append(csv_subdir) # Upload directory to preserve structure | ||
|
|
There was a problem hiding this comment.
Inefficient double-parsing of trace files when output_format is "all". The code explicitly notes in comments that when both XLSX and CSV formats are requested, the trace file must be parsed twice because TraceLens uses either/or logic. This effectively doubles the processing time for trace analysis. While the comment acknowledges this is a TraceLens limitation, consider: (1) warning users about the performance impact in the documentation, (2) suggesting "xlsx" or "csv" as format options to avoid the overhead, or (3) implementing a workaround by generating CSV files from the DataFrame objects returned by the XLSX generation call, if TraceLens returns these objects in a suitable format.
| except Exception as e: | ||
| warning_rank_0(f"[MLflow] Failed to upload report {report_path}: {e}") | ||
|
|
||
| log_rank_0(f"[TraceLens] Uploaded {uploaded_count} reports to '{artifact_path}'") |
There was a problem hiding this comment.
Misleading upload count for CSV directory uploads. When CSV format is used, the function adds the CSV subdirectory to the generated_files list (line 415, 438), but when these directories are uploaded, uploaded_count is incremented by 1 per directory even though each directory contains multiple CSV files. This means the reported count in the log message "Uploaded X reports" (line 781) is misleading - it counts directories as single items rather than counting the actual number of files uploaded. For example, if 2 ranks generate CSV reports with 10 files each, the count would be 2 instead of 20. Consider either counting individual files within directories or clarifying in the log message that the count represents report sets/directories rather than individual files.
| log_rank_0(f"[TraceLens] Uploaded {uploaded_count} reports to '{artifact_path}'") | |
| log_rank_0(f"[TraceLens] Uploaded {uploaded_count} report item(s) to '{artifact_path}'") |
| # generate=false, upload=false → No reports generated | ||
| # generate=true, upload=false → Generate reports locally only | ||
| # generate=false, upload=true → Generate AND upload (auto-enabled) | ||
| # generate=true, upload=true → Generate AND upload (explicit) |
There was a problem hiding this comment.
Confusing comment uses "→" arrow character instead of standard ASCII. The usage pattern comments on lines 19-22 in the configuration file use the Unicode arrow character "→" which may not render correctly in all text editors or terminals, especially in environments with limited Unicode support. Consider using standard ASCII alternatives like "=>" or "->" for better compatibility, or use plain text like "results in" or "produces".
| # generate=false, upload=false → No reports generated | |
| # generate=true, upload=false → Generate reports locally only | |
| # generate=false, upload=true → Generate AND upload (auto-enabled) | |
| # generate=true, upload=true → Generate AND upload (explicit) | |
| # generate=false, upload=false -> No reports generated | |
| # generate=true, upload=false -> Generate reports locally only | |
| # generate=false, upload=true -> Generate AND upload (auto-enabled) | |
| # generate=true, upload=true -> Generate AND upload (explicit) |
| # Upload artifacts to MLflow before ending the run | ||
| upload_mlflow_artifacts( | ||
| tensorboard_dir=args.tensorboard_dir, | ||
| exp_root_path=self.exp_root_path, | ||
| upload_traces=getattr(args, "mlflow_upload_traces", True), | ||
| upload_logs=getattr(args, "mlflow_upload_logs", True), |
There was a problem hiding this comment.
Inconsistent default values between configuration and code. The configuration file sets mlflow_upload_traces: true and mlflow_upload_logs: true as defaults, but the getattr calls here use True as the fallback. This means when MLflow is enabled but these attributes are not explicitly set, the behavior will differ from what's documented in the configuration. The defaults should match: either both should default to true or both should default to false. Given the comment in the config file "When disable_mlflow=false, traces and logs are uploaded by default", these defaults appear correct, but this creates coupling between the config and the code that could lead to confusion.
| # Upload artifacts to MLflow before ending the run | |
| upload_mlflow_artifacts( | |
| tensorboard_dir=args.tensorboard_dir, | |
| exp_root_path=self.exp_root_path, | |
| upload_traces=getattr(args, "mlflow_upload_traces", True), | |
| upload_logs=getattr(args, "mlflow_upload_logs", True), | |
| # Determine default upload behavior based on disable_mlflow setting: | |
| # when disable_mlflow=False, traces and logs are uploaded by default. | |
| default_mlflow_upload = not getattr(args, "disable_mlflow", False) | |
| # Upload artifacts to MLflow before ending the run | |
| upload_mlflow_artifacts( | |
| tensorboard_dir=args.tensorboard_dir, | |
| exp_root_path=self.exp_root_path, | |
| upload_traces=getattr(args, "mlflow_upload_traces", default_mlflow_upload), | |
| upload_logs=getattr(args, "mlflow_upload_logs", default_mlflow_upload), |
TraceLens report generation and upload are now disabled by default. They are auto-enabled when mlflow_upload_tracelens_report=true is set. This prevents TraceLens from running when only testing other features like mlflow_upload_performance_metrics.
feat: Add TraceLens integration for trace analysis with MLflow upload
Adds TraceLens trace analysis capability to automatically generate performance
reports from PyTorch profiler traces and upload them to MLflow.
Features
artifacts/trace_analysis/Config Options