diff --git a/config_explorer/README.md b/config_explorer/README.md index 116a9860..b2eca237 100644 --- a/config_explorer/README.md +++ b/config_explorer/README.md @@ -84,16 +84,6 @@ The Streamlit frontend includes the following pages: 1. **Capacity Planner** - Analyze GPU memory requirements and capacity planning for LLM models 2. **GPU Recommender** - Get optimal GPU recommendations based on model and workload requirements -3. **Sweep Visualizer** - Visualize benchmark results and configuration sweeps - -### Using the Sweep Visualizer - -The Sweep Visualizer page supports visualizing a collection of `llm-d-benchmark` report files. To get started easily, you may download the data from the [public llm-d-benchmark community Google Drive](https://drive.google.com/drive/u/0/folders/1r2Z2Xp1L0KonUlvQHvEzed8AO9Xj8IPm). Preset options have been selected for each scenario. For example, we recommend viewing - -- `qwen-qwen-3-0-6b` using the Chatbot application highlight Inference Scheduling -- `meta-llama/Llama-3.1-70B-Instruct` using the Document Summarization application highlight PD Disaggregation - -Default values will be populated once those options are selected. Advanced users may further conduct their own configuration. ### Using the GPU Recommender @@ -131,6 +121,4 @@ The GPU Recommender displays cost information to help you find cost-effective GP ## Library -Configuration exploration and benchmark sweep performance comparison is best demonstrated in the Jupyter notebook [analysis.ipynb](../analysis/analysis.ipynb). This notebook can be used for interactive analysis of benchmarking data results, and it utilizes the same core functions as the "Sweep Visualizer" page of the web app. For instructions on using the notebook see [../analysis/README.md](../analysis/README.md). - For GPU recommender API usage see [./examples/gpu_recommender_example.py](./examples/gpu_recommender_example.py). diff --git a/config_explorer/pages/3_Sweep_Visualizer.py b/config_explorer/pages/3_Sweep_Visualizer.py deleted file mode 100644 index c1fdfb80..00000000 --- a/config_explorer/pages/3_Sweep_Visualizer.py +++ /dev/null @@ -1,671 +0,0 @@ -from typing import Any, Dict, List -from numpy import float64 -from pandas import DataFrame -import streamlit as st -from streamlit.delta_generator import DeltaGenerator -import util - -import src.config_explorer.explorer as xp -import src.config_explorer.plotting as xplotting - -BENCHMARK_PATH_KEY = "benchmark_path" -BENCHMARK_DATA_KEY = "benchmark_data" -SELECTED_SCENARIO_KEY = "selected_scenario" -SELECTED_SLO_METRICS_KEY = "selected_slo_metrics" - -# ------- Scenario presets ------- - -DEFAULT_SLOS = [ - 'Total_Token_Throughput', - 'P90_TTFT_ms', - ] -PD_DISAGG = "PD Disaggregation" -INFERENCE_SCHEDULING = "Inference Scheduling" - -scenarios_config_keys_mapping = { - PD_DISAGG: { - "description": "Compares inference performance of aggregate vs. prefill/decode disaggregate set up.", - "columns": ['Model', 'GPU', 'ISL', 'OSL'], - "config_keys": [ - ['Replicas', 'TP'], - ['P_Replicas', 'P_TP', 'D_Replicas', 'D_TP'], - ], - "col_seg_by": 'Directory_Base', - "col_x": 'Max_Concurrency', - "col_y": 'Thpt_per_GPU', - "pareto": { - "col_x": 'Thpt_per_User', - "col_y": 'Thpt_per_GPU', - "col_z": 'Max_Concurrency', - } - }, - - INFERENCE_SCHEDULING: { - "description": "Examines effects of inference scheduler scorer plugin weights.", - "columns": ['Model', 'GPU', 'System_Prompt_Length', 'Question_Length', 'OSL_500', 'Groups', 'Prompts_Per_Group'], - "config_keys": ['KV_Cache_Scorer_Weight', 'Queue_Scorer_Weight', 'Prefix_Cache_Scorer_Weight', 'Prefix_Cache_Scorer_Mode'], - "col_seg_by": 'Directory', - "col_x": 'Max_QPS', - "col_y": 'P90_TTFT_ms', - "pareto": { - "col_x": 'Total_Token_Throughput', - "col_y": 'P90_TTFT_ms', - "col_z": 'Max_QPS', - } - }, - - "Custom": { - "description": "Carve your own scenario", - "columns": ['Model'], - "config_keys": ['GPU'] - } -} - -preset_scenarios = { - "Chatbot": { - "description": "This application typically has high QPS, concurrency, and prefix hit rate, and favors low latency.", - - # Default inputs - - # Default SLOs - "P90_E2EL_ms": 100.0, - "Total_Token_Throughput": 100.0, - "P90_TTFT_ms": 2000.0, - "P90_ITL_ms": 50.0, - }, - "Document summarization": { - "description": "This application maps to workload requests with high input length and short output length.", - - # Default inputs - - # Default SLOs - "P90_E2EL_ms": 100000.0, - "Total_Token_Throughput": 100.0, - "P90_TTFT_ms": 10000.0, - "P90_ITL_ms": 100.0, - }, - "Custom": { - "description": "Design the workload patterns for your own custom application type.", - "ISL": 300, - "OSL": 1000, - "P90_E2EL_ms": 200.0, - "Total_Token_Throughput": 200.0, - "P90_TTFT_ms": 1000.0, - "P90_ITL_ms": 50.0, - } -} - -def filter_greater_or_equal(input_list: List[int], threshold: int) -> List[int]: - """ - Returns a list of values greater than or equal to threshold - """ - return [item for item in input_list if item >= threshold] - -def init_session_state(): - """ - Inits session state for data persistence - """ - if BENCHMARK_DATA_KEY not in st.session_state: - st.session_state[BENCHMARK_DATA_KEY] = xp.make_benchmark_runs_df() - - # Default SLOs - if SELECTED_SLO_METRICS_KEY not in st.session_state: - st.session_state[SELECTED_SLO_METRICS_KEY] = DEFAULT_SLOS - -@st.cache_data -def read_benchmark_path(benchmark_path: str) -> DataFrame: - """ - Reads the data at the path - """ - - runs = xp.make_benchmark_runs_df() - - report_files = xp.get_benchmark_report_files( - benchmark_path, - recurse_symlinks=True) - for br_file in report_files: - - # Update session state data - xp.add_benchmark_report_to_df(runs, br_file) - - return runs - -def user_benchmark_path(): - """ - Obtains path to user data - """ - - benchmark_path = st.text_input("Enter absolute path to `llm-d` benchmark data", - value="", - # key=BENCHMARK_PATH_KEY, - help="Navigate to the [llm-d community Google Drive](https://drive.google.com/drive/u/0/folders/1r2Z2Xp1L0KonUlvQHvEzed8AO9Xj8IPm) to download data.", - ) - - if st.button("Import data", type='primary'): - # Populate the runs DataFrame with new path - # benchmark_path = st.session_state[BENCHMARK_PATH_KEY] - if benchmark_path != "": - st.toast(f'Searching for benchmark report files within `{benchmark_path}`') - - try: - st.session_state[BENCHMARK_DATA_KEY] = read_benchmark_path(benchmark_path) - - st.toast(f"Successfully imported {len(st.session_state[BENCHMARK_DATA_KEY])} report files. You may view the raw data below.", icon="🎉") - except Exception: - st.toast("File not found, please double check path.", icon='⚠️') - - -@st.dialog("Add SLO metric") -def add_metric_dialog(): - """ - Dialogue to add a SLO metric - """ - - st.write(":blue[Add custom metrics to further filter for performance.] \ - For example, chatbot user may care about TTFT, while a summarization tool may care more about mean throughput. \ - For repeated metrics, the value that is defined later on in the list will be used for analysis.") - - curr_metrics = st.session_state[SELECTED_SLO_METRICS_KEY] - - # Remove curr metrics from all performance metrics - - all_metrics = dict(xp.METRICS_COLUMNS) - for metric in curr_metrics: - all_metrics.pop(metric, None) # None avoids KeyError if key is missing - - to_add = st.selectbox("Select a metric to add", - options=all_metrics.keys(), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - if st.button("Add", use_container_width=True, type='primary'): - st.session_state[SELECTED_SLO_METRICS_KEY].append(to_add) - st.rerun() - -@st.dialog("Delete SLO metric") -def delete_metric_dialog(): - """ - Dialogue to delete a SLO metric - """ - - st.write(f"Deleting a metric means that the optimal configuration does not take this metric into account. Any of the non-default (`{', '.join(DEFAULT_SLOS)}`) metrics can be deleted.\n\nIf you'd like to disable the default metrics, set them to an extremely high or low value to disable their effect.") - - curr_metrics = st.session_state[SELECTED_SLO_METRICS_KEY] - - to_delete = st.selectbox("Select a metric to delete", - options=curr_metrics, - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - if st.button("Delete", use_container_width=True, type='primary'): - st.session_state[SELECTED_SLO_METRICS_KEY].remove(to_delete) - st.rerun() - -def filter_data_on_inputs(data: DataFrame, user_inputs: dict) -> DataFrame: - """ - Filters data on inputs and SLOs - """ - - return data[ - (data['Model'] == user_inputs['model']) & - (data['GPU'] == user_inputs['gpu_type']) & - (data['Num_GPUs'] <= user_inputs['num_gpus']) & - (data['ISL'] >= user_inputs['isl']) & - (data['OSL'] >= user_inputs['osl']) - ] - -@st.dialog("Histogram overview of bounds") -def histogram_dialog(runs: DataFrame, scenario): - """ - Dialog to show histogram - """ - plot = xplotting.plot_scenario_histogram( - runs, scenario - ) - - selected_metric = st.selectbox( - "Select a workload metric", - options=plot.keys() - ) - - if selected_metric: - st.pyplot(plot[selected_metric]) - -def inputs(tab: DeltaGenerator): - """ - Inputs to the Visualizer - """ - - tab.subheader("Define Scenario") - tab.caption("Select initial filters on benchmarking data such as model and workload characteristics. This is your **:blue[scenario]**.") - - benchmark_data = st.session_state[BENCHMARK_DATA_KEY] - data_to_return = {} - selected_slos = {} - scenario_to_return = {} - scenario_bounds = {} - - if len(benchmark_data) == 0: - tab.info("Import data above.") - return None - - with tab.container(border=True): - scenario_to_return['Model'] = st.selectbox( - "Select a model", - options=benchmark_data['Model'].unique() - ) - - scenario_to_return['GPU'] = st.selectbox( - "Select an accelerator type", - options=benchmark_data['GPU'].unique() - ) - - with tab.container(border=True): - st.write("**Workload Profiles**") - st.caption("Define the type of workload for the LLM. Based on the model and environment inputs, the available options are shown below.") - - # Show available combinations - runs = benchmark_data[ - (benchmark_data["Model"] == scenario_to_return['Model']) & - (benchmark_data["GPU"] == scenario_to_return['GPU']) - ] - - selected_workload = st.radio("Select workload", options=preset_scenarios.keys()) - - info = preset_scenarios[selected_workload] - - st.caption(info['description']) - - if selected_workload == "Chatbot": - # Show scenario options for Chatbot application - scenario_to_return['System_Prompt_Length'] = st.selectbox( - "System prompt length", - options=runs['System_Prompt_Length'].unique(), - help="The number of tokens (words or characters) in the initial instructions given to a large language model" - ) - - scenario_to_return['Question_Length'] = st.selectbox( - "Question length", - options=runs['Question_Length'].unique(), - help="The user input part of the prompt as they interact with the chatbot. This is different from system prompt, which is the shared prefix of the prompt which is likely to be the same for different users and sessions." - ) - - scenario_to_return['Groups'] = st.selectbox( - "Number of groups", - options=runs['Groups'].unique(), - help="The number of shared prefix groups in the workload traffic" - ) - - scenario_to_return['Prompts_Per_Group'] = st.selectbox( - "Number of prompts per group", - options=runs['Prompts_Per_Group'].unique(), - help="The number of unique questions per group." - ) - - bounds_col_min, bounds_col_max = st.columns(2) - min_osl_options = runs['OSL'].unique() - min_osl_options.sort() - min_osl = bounds_col_min.selectbox("Min output sequence length", - options=min_osl_options, - ) - - max_osl_options = filter_greater_or_equal(min_osl_options, min_osl) - max_osl = bounds_col_max.selectbox("Max output sequence length", - options=max_osl_options, - # Default select the greatest number - index=len(max_osl_options) - 1 - ) - scenario_to_return['__ge__OSL'] = min_osl - scenario_to_return['__le__OSL'] = max_osl - - if selected_workload == "Document summarization": - # Show scenario options for Document summary application - - st.caption("Exact matching is required for now. Click below to see the available combinations of ISL and OSL.") - - bounds_col_min, bounds_col_max = st.columns(2) - # Enable bounds for I/O length in doc summary - min_isl_options = runs['ISL'].unique() - min_isl_options.sort() - min_isl = bounds_col_min.selectbox("Min input sequence length", - options=min_isl_options, - ) - - max_isl_options = filter_greater_or_equal(min_isl_options, min_isl) - max_isl = bounds_col_max.selectbox("Max input sequence length", - options=max_isl_options, - - # Default select the greatest number - index=len(max_isl_options) - 1 - ) - - - min_osl_options = runs['OSL'].unique() - min_osl_options.sort() - min_osl = bounds_col_min.selectbox("Min output sequence length", - options=min_osl_options, - ) - - max_osl_options = filter_greater_or_equal(min_osl_options, min_osl) - max_osl = bounds_col_max.selectbox("Max output sequence length", - options=max_osl_options, - - # Default select the greatest number - index=len(max_osl_options) - 1 - ) - - scenario_to_return.update({ - '__ge__ISL': float(min_isl), - '__le__ISL': float(max_isl), - '__ge__OSL': float(min_osl), - '__le__OSL': float(max_osl), - }) - - if selected_workload == "Custom": - st.warning("This feature is not yet available. To perform you own data exploration, see this [example Jupyter notebook](https://github.com/llm-d/llm-d-benchmark/blob/main/analysis/analysis.ipynb) for analysis using the `config_explorer` library.") - - # Show summary stats (histogram) - if st.button("Summary statistics", use_container_width=True): - histogram_dialog( - runs, scenario_to_return - ) - - # SLOs - with tab.container(border=True): - st.write("**Goals / SLOs**") - st.caption("Define the desire constraints to reach for your application. Default values for a selective set of SLO metrics are suggested for the given application type.") - - if selected_workload: - scenario = preset_scenarios[selected_workload] - - # Display SLO metrics - for metric in st.session_state[SELECTED_SLO_METRICS_KEY]: - metric_prop = xp.METRICS_COLUMNS[metric] - metric_value = 0.0 - - # If there is a default, show the default value - if metric in scenario: - metric_value = scenario[metric] - - selected_slos[metric] = st.number_input( - metric_prop.label_with_units(), - value=metric_value, - key=metric, - min_value=0.0, - step=0.01, - ) - - if st.button("Add a metric", use_container_width=True): - add_metric_dialog() - - if st.button("Delete a metric", use_container_width=True): - delete_metric_dialog() - - data_to_return["scenario"] = scenario_to_return - data_to_return["slo"] = selected_slos - return data_to_return - -def display_optimal_config_overview(container: DeltaGenerator, - config_columns: List[str], - slo_columns: List[str], - original_benchmark_data: DataFrame, - user_inputs: dict, - user_selected_scenario: Dict[str, Any] - ): - """ - Displays the optimal configuration overview (Pareto charts) - """ - - container.subheader("Examine optimal configuration") - - # Define SLOs - slos = [] - for metric, value in user_inputs["slo"].items(): - slos.append( - xp.SLO(metric, value) - ) - - # Columns for metrics of interest to optimize - col_x = 'Mean_TTFT_ms' - col_y = 'Thpt_per_GPU' - - # Select linear or log scales - log_x = True - log_y = False - - metric_col1, metric_col2 = container.columns(2) - - col_y = metric_col1.selectbox("Select y-axis performance metric for Pareto front", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(col_y), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - col_x = metric_col2.selectbox("Select x-axis input metric for Pareto front", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(col_x), - format_func=lambda p: f"{xp.METRICS_COLUMNS[p].label}", - ) - - # Configuration columns of interest - tradeoff_plot = xplotting.plot_pareto_tradeoff( - runs_df=original_benchmark_data, - scenario=user_selected_scenario, - col_x=col_x, - col_y=col_y, - slos=slos, - log_x=log_x, - log_y=log_y - ) - container.pyplot(tradeoff_plot) - - # Print tab1le of optimal configurations - # Get scenario rows from all runs in dataset - runs_scenario = xp.get_scenario_df(original_benchmark_data, user_selected_scenario) - - # Get just the rows that meet SLOs - runs_meet_slo = xp.get_meet_slo_df(runs_scenario, slos) - - # Get rows on Pareto front - runs_pareto_front = xp.get_pareto_front_df(runs_meet_slo, col_x, col_y, True) - - # Print the rows on Pareto front, showing just the columns of interest - columns_of_interest = config_columns + slo_columns - - # Display info - container.info(f"Out of the {len(runs_meet_slo)} configurations that meet SLO requirements, {len(runs_pareto_front)} are optimal, meaning no metric can be improved without degrading another. Their configuration and performance metrics are shown below.") - - container.dataframe(runs_pareto_front[columns_of_interest]) - -def outputs(tab: DeltaGenerator, user_inputs: dict): - """ - Outputs to the Visualizer - """ - - tab.subheader("Configuration Performance for Scenario") - tab.caption("Understand the **:blue[configurations]** for your scenario by examining and comparing performance between configurations.") - original_benchmark_data = st.session_state[BENCHMARK_DATA_KEY] - - with tab.expander("Review raw data"): - st.dataframe(original_benchmark_data) - - if len(original_benchmark_data) == 0: - tab.info("Import data above.") - return None - - selected_display_preset = tab.radio( - "Select display presets", - options=list(scenarios_config_keys_mapping.keys()), - help="Scenario presents define a set of parameters to filter that showcase a certain feature or capability. For example, comparing throughput per user vs. throughput per GPU tradeoff for PD disaggregation scenarios." - ) - - slos_cols = [] - if selected_display_preset: - - tab1, tab2 = tab.tabs(["📈 Performance overview", "🌟 Optimal configuration overview"]) - - # Describe each tab - tab1.info("View a summary of the data based on the selected preset. Each preset groups configurations define a specific scenario, helping to highlight its performance characteristics.") - tab2.info("Given SLO requirements, filter for the best configurations of parallelism and replicas in aggregate and disaggregated setup.") - - # Get the scenario - scenario_preset = scenarios_config_keys_mapping[selected_display_preset] - user_selected_scenario = user_inputs['scenario'] - - if selected_display_preset == PD_DISAGG: - - tab1.write("""The prefill/decode disaggregation scenario compares the effects of :blue[aggregate] inference vs. :blue[disaggregated] inference.""") - - tab1.markdown("#### Configuration performance") - - metric_col1, metric_col2 = tab1.columns(2) - - col_y = metric_col1.selectbox("Select y-axis performance metric", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(scenario_preset['col_y']), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - col_x = metric_col2.selectbox("Select x-axis input metric", - options=xp.INPUT_COLUMNS.keys(), - index=list(xp.INPUT_COLUMNS.keys()).index(scenario_preset['col_x']), - format_func=lambda p: f"{xp.INPUT_COLUMNS[p].label}", - ) - - plot = xplotting.plot_scenario( - runs_df=original_benchmark_data, - scenario=user_selected_scenario, - config_keys=scenario_preset['config_keys'], - col_x=col_x, - col_y=col_y, - col_seg_by=scenario_preset['col_seg_by'], - ) - tab1.pyplot(plot) - - tab1.divider() - - tab1.markdown("#### Performance tradeoff comparison") - metric_col1, metric_col2, metric_col3 = tab1.columns(3) - tradeoff_y = metric_col1.selectbox("Select y-axis performance tradeoff metric", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(scenario_preset['pareto']['col_y']), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - tradeoff_x = metric_col2.selectbox("Select x-axis performance tradeoff metric", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(scenario_preset['pareto']['col_x']), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - tradeoff_z = metric_col3.selectbox("Select z-axis input metric (point label)", - options=xp.INPUT_COLUMNS.keys(), - index=list(xp.INPUT_COLUMNS.keys()).index(scenario_preset['pareto']['col_z']), - format_func=lambda p: xp.INPUT_COLUMNS[p].label, - ) - - tradeoff_plot = xplotting.plot_scenario_tradeoff( - runs_df=original_benchmark_data, - scenario=user_selected_scenario, - config_keys=scenario_preset['config_keys'], - col_x=tradeoff_x, - col_y=tradeoff_y, - col_z=tradeoff_z, - col_seg_by=scenario_preset['col_seg_by'], - ) - tab1.pyplot(tradeoff_plot) - - # Add slos - config_cols = ['Replicas', 'TP', 'P_Replicas', 'P_TP', 'D_Replicas', 'D_TP'] - slos_cols = ['Mean_TTFT_ms', 'Thpt_per_GPU', 'Num_GPUs'] - - if selected_display_preset == INFERENCE_SCHEDULING: - tab1.markdown("#### Configuration performance") - - metric_col1, metric_col2 = tab1.columns(2) - - col_y = metric_col1.selectbox("Select y-axis performance metric", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(scenario_preset['col_y']), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - col_x = metric_col2.selectbox("Select x-axis input metric", - options=xp.INPUT_COLUMNS.keys(), - index=list(xp.INPUT_COLUMNS.keys()).index(scenario_preset['col_x']), - format_func=lambda p: f"{xp.INPUT_COLUMNS[p].label}", - ) - plot = xplotting.plot_scenario( - runs_df=original_benchmark_data, - scenario=user_selected_scenario, - config_keys=scenario_preset['config_keys'], - col_x=col_x, - col_y=col_y, - col_seg_by=scenario_preset['col_seg_by'], - ) - tab1.pyplot(plot) - - # Plot the tradeoff - tab1.divider() - - tab1.markdown("#### Performance tradeoff comparison") - metric_col1, metric_col2, metric_col3 = tab1.columns(3) - tradeoff_y = metric_col1.selectbox("Select y-axis performance tradeoff metric", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(scenario_preset['pareto']['col_y']), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - tradeoff_x = metric_col2.selectbox("Select x-axis performance tradeoff metric", - options=xp.METRICS_COLUMNS.keys(), - index=list(xp.METRICS_COLUMNS.keys()).index(scenario_preset['pareto']['col_x']), - format_func=lambda p: xp.METRICS_COLUMNS[p].label_with_units(), - ) - - tradeoff_z = metric_col3.selectbox("Select z-axis input metric (point label)", - options=xp.INPUT_COLUMNS.keys(), - index=list(xp.INPUT_COLUMNS.keys()).index(scenario_preset['pareto']['col_z']), - format_func=lambda p: xp.INPUT_COLUMNS[p].label, - ) - - tradeoff_plot = xplotting.plot_scenario_tradeoff( - runs_df=original_benchmark_data, - scenario=user_selected_scenario, - config_keys=scenario_preset['config_keys'], - col_x=tradeoff_x, - col_y=tradeoff_y, - col_z=tradeoff_z, - col_seg_by=scenario_preset['col_seg_by'], - ) - tab1.pyplot(tradeoff_plot) - - config_cols = scenario_preset['config_keys'] - slos_cols = ["P90_TTFT_ms", "P90_TPOT_ms", "Total_Token_Throughput", "Num_GPUs"] - - if selected_display_preset == "Custom": - tab1.warning("This feature is not yet available. To perform you own data exploration, see this [example Jupyter notebook](https://github.com/llm-d/llm-d-benchmark/blob/main/analysis/analysis.ipynb) for analysis using the `config_explorer` library.") - config_cols = scenario_preset['config_keys'] - - display_optimal_config_overview(tab2, config_cols, slos_cols, original_benchmark_data, user_inputs, user_selected_scenario) - -if __name__ == "__main__": - # Set up streamlit config - st.set_page_config(page_title="Configuration Explorer", - page_icon=None, - layout="wide", - initial_sidebar_state="expanded", - menu_items=None) - st.title("Configuration Explorer") - st.caption("This tool helps you find the most cost-effective, optimal configuration for serving models on llm-d based on hardware specification, workload characteristics, and SLO requirements.") - - init_session_state() - - # Display Sweep Explorer headings - st.header("Configuration Sweep Explorer") - st.caption("Explore, examine, and visualize existing benchmarking data for optimal `llm-d` configurations.") - - user_benchmark_path() - col1, col2 = st.columns([0.3, 0.7], gap="large") - col1_container = col1.container(height=1000, border=False) - col2_container = col2.container(height=1000, border=False) - user_inputs = inputs(col1_container) - outputs(col2_container, user_inputs) diff --git a/config_explorer/src/config_explorer/benchmark_report b/config_explorer/src/config_explorer/benchmark_report deleted file mode 120000 index dc744c4a..00000000 --- a/config_explorer/src/config_explorer/benchmark_report +++ /dev/null @@ -1 +0,0 @@ -../../../benchmark_report/ \ No newline at end of file diff --git a/config_explorer/src/config_explorer/constants.py b/config_explorer/src/config_explorer/constants.py deleted file mode 100644 index 3bc93a31..00000000 --- a/config_explorer/src/config_explorer/constants.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Library constants -""" - -# Length of column bound prefix -BOUND_PREFIX_LEN = 6 - -# Column bound prefixes and printable string representations. -# Order is from lower bounds to upper bounds. -COLUMN_BOUND_STR = { - '__ge__': '≥', - '__gt__': '>', - '__lt__': '<', - '__le__': '≤', -} - -# Reverse mapping of possible string descriptors of bounds to internal column -# prefix representation. -STR_TO_COLUMN_BOUND = { - 'ge': '__ge__', - '>=': '__ge__', - '≥': '__ge__', - '__ge__': '__ge__', - - 'gt': '__gt__', - '>': '__gt__', - '__gt__': '__gt__', - - 'lt': '__lt__', - '<': '__lt__', - '__lt__': '__lt__', - - 'le': '__le__', - '<=': '__le__', - '≤': '__le__', - '__le__': '__le__', -} diff --git a/config_explorer/src/config_explorer/explorer.py b/config_explorer/src/config_explorer/explorer.py deleted file mode 100644 index 410b41a1..00000000 --- a/config_explorer/src/config_explorer/explorer.py +++ /dev/null @@ -1,1653 +0,0 @@ -""" -This file contains function for configuration exploration using benchmarking -data from llm-d-benchmark. - -The entrypoint is make_benchmark_runs_df() to initialize an empty Pandas -DataFrame which will store benchmark results, and add_benchmark_report_to_df() -to populate the DataFrame with data from a benchmark report file. The columns -in the DataFrame are described in the COLUMNS dictionary. - -To assist with loading benchmark report files, get_benchmark_report_files() can -be used to find all benchmark report files within a search directory. - -Once a DataFrame has been populated, analysis can proceed by selecting a set of -columns to be held constant during analysis. These columns should describe a -particular use case, such as the AI model, workload, and accelerator hardware. -For example, of we want to analyze throughput and latency performance of -prefill/decode disaggregated and aggregated setups, we may key together -['Model', 'GPU', 'ISL', 'OSL']. A unique set of values for these columns is -referred to as a "scenario". We can find what unique combinations of these -parameters exist within the dataset with get_scenarios(). If we are using a -text interface, such as a CLI or Jupyter notebook, we can use print_scenarios() -to view a table of scenarios available. - -Upon selection of a particular scenario, we now need to choose another grouping -of columns which we will use to uniquely define a configuration. We can -describe disaggregated configurations with the columns -['P_Replicas', 'P_TP', 'D_Replicas', 'D_TP'], which will be our configuration -key. - -Using our selected scenario and configuration key, we can begin plotting -metrics of interest using functions in plotting.py. We can also define -service-level objectives (SLOs) with the SLO class, creating a list of SLOs and -using get_meet_slo_df() to make a DataFrame of only rows that meet our SLOs. -We can use get_pareto_front_df() to find optimal configurations against pairs -of metrics, showing, for example, the tradeoff between throughput and latency. -""" - -import builtins -from dataclasses import dataclass -import os -from pathlib import Path -import sys -from typing import Any - -import pandas as pd -import yaml - -from .benchmark_report import get_nested, import_benchmark_report -from .benchmark_report.schema_v0_1 import ( - BenchmarkReport, - HostType, - Units, - WorkloadGenerator, -) -from .constants import ( - BOUND_PREFIX_LEN, - COLUMN_BOUND_STR, - STR_TO_COLUMN_BOUND, -) - - -class Text: - """ANSI SGR control codes for text formatting""" - DEFAULT = "\x1b[0m" - BOLD = "\x1b[1m" - BOLD_OFF = "\x1b[22m" - UNDERLINE = "\x1b[4m" - UNDERLINE_OFF = "\x1b[24m" - DEFAULT_COLOR = "\x1b[39m" - DEFAULT_BG_COLOR = "\x1b[49m" - RED = "\x1b[31m" - YELLOW = "\x1b[33m" - GREEN = "\x1b[32m" - CYAN = "\x1b[36m" - BLUE = "\x1b[34m" - MAGENTA = "\x1b[35m" - BLACK = "\x1b[30m" - WHITE = "\x1b[37m" - BG_RED = "\x1b[41m" - BG_YELLOW = "\x1b[43m" - BG_GREEN = "\x1b[42m" - BG_CYAN = "\x1b[46m" - BG_BLUE = "\x1b[44m" - BG_MAGENTA = "\x1b[45m" - BG_BLACK = "\x1b[40m" - BG_WHITE = "\x1b[47m" - - -class Pref: - """Preferred direction for a metric.""" - - # Lower is better - LOW = -1 - # No preference - NEUTRAL = 0 - # Higher is better - HIGH = 1 - - -@dataclass -class ColumnProperties: - """Dataset column properties.""" - - # String description of data type - dtype: str - # Label for plots and tables - label: str - # Preferred direction - pref: Pref = Pref.NEUTRAL - # Units - units: str = '' - - def label_with_units(self) -> str: - """ - Pretty print the label of the column with the units - """ - - if self.units == "": - return self.label - - return f"{self.label} ({self.units})" - - -# Dataset columns about benchmark run -RUN_COLUMNS = { - # Details about particular run - 'Directory': ColumnProperties( - dtype='str', - label='Directory', - ), - 'Directory_Base': ColumnProperties( - dtype='str', - label='Base Directory' - ), - 'Start': ColumnProperties( - dtype='float', - label='Start Time' - ), - 'Duration': ColumnProperties( - dtype='float', - units='s', - label='Duration', - ), -} - -# Dataset columns about configuration -CONFIGURATION_COLUMNS = { - 'Platform': ColumnProperties( - dtype='str', - label='Platform', - ), - # AI model name - 'Model': ColumnProperties( - dtype='str', - label='Model', - ), - # Accelerator and parallelism - 'GPU': ColumnProperties( - dtype='str', - label='Accelerator', - ), - 'Num_GPUs': ColumnProperties( - dtype='int', - label='Number of GPUs', - pref=Pref.LOW, - ), - 'DP': ColumnProperties( - dtype='int', - label='DP', - ), - 'TP': ColumnProperties( - dtype='int', - label='TP', - ), - 'PP': ColumnProperties( - dtype='int', - label='PP', - ), - 'EP': ColumnProperties( - dtype='int', - label='EP', - ), - 'Replicas': ColumnProperties( - dtype='int', - label='Replicas', - ), - 'P_DP': ColumnProperties( - dtype='int', - label='P DP', - ), - 'P_TP': ColumnProperties( - dtype='int', - label='P TP', - ), - 'P_PP': ColumnProperties( - dtype='int', - label='P PP', - ), - 'P_EP': ColumnProperties( - dtype='int', - label='P EP', - ), - 'P_Replicas': ColumnProperties( - dtype='int', - label='P Replicas', - ), - 'D_DP': ColumnProperties( - dtype='int', - label='D DP', - ), - 'D_TP': ColumnProperties( - dtype='int', - label='D TP', - ), - 'D_PP': ColumnProperties( - dtype='int', - label='D PP', - ), - 'D_EP': ColumnProperties( - dtype='int', - label='D EP', - ), - 'D_Replicas': ColumnProperties( - dtype='int', - label='D Replicas', - ), - 'Is_PD': ColumnProperties( - dtype='bool', - label='Is P/D', - ), - # Inference scheduler settings - 'KV_Cache_Scorer_Weight': ColumnProperties( - dtype='float', - label='KV Cache', - ), - 'Queue_Scorer_Weight': ColumnProperties( - dtype='float', - label='Queue', - ), - 'Prefix_Cache_Scorer_Weight': ColumnProperties( - dtype='float', - label='Prefix Cache', - ), - 'Prefix_Cache_Scorer_Block_Size': ColumnProperties( - dtype='int', - label='Block Size', - ), - 'Prefix_Cache_Scorer_LRU_Capacity_Per_Server': ColumnProperties( - dtype='int', - label='LRU/Server', - ), - 'Prefix_Cache_Scorer_Max_Blocks_To_Match': ColumnProperties( - dtype='int', - label='Max Blocks', - ), - 'Prefix_Cache_Scorer_Mode': ColumnProperties( - dtype='bool', - label='Prefix Mode', - ), -} - -# Dataset columns about workload -WORKLOAD_COLUMNS = { - # Workload - 'Workload_Generator': ColumnProperties( - dtype='str', - label='Workload Generator', - ), - 'ISL': ColumnProperties( - dtype='int', - label='Input Sequence Length', - ), - 'OSL': ColumnProperties( - dtype='int', - label='Output Sequence Length', - ), - 'Target_OSL': ColumnProperties( - dtype='int', - label='Target OSL', - ), - 'Max_Concurrency': ColumnProperties( - dtype='int', - label='Concurrency', - ), - 'Max_QPS': ColumnProperties( - dtype='float', - label='Request Rate', - units='queries/s', - ), - # Common prefix length - 'System_Prompt_Length': ColumnProperties( - dtype='int', - label='System Prompt Length', - ), - # Length after common prefix - 'Question_Length': ColumnProperties( - dtype='int', - label='Question Length', - ), - # Number of user groups with distinct prompts - 'Groups': ColumnProperties( - dtype='int', - label='Groups', - ), - # Common prefixes within a group - 'Prompts_Per_Group': ColumnProperties( - dtype='int', - label='Prompts per Group', - ), -} - -# Dataset metrics columns -METRICS_COLUMNS = { - # Requests - 'Total_Requests': ColumnProperties( - dtype='int', - label='Total Requests', - ), - 'Failures': ColumnProperties( - dtype='int', - label='Failures', - ), - # Performance metrics - # Throughput - 'Request_Throughput': ColumnProperties( - dtype='float', - label='Request Throughput', - pref=Pref.HIGH, - units='req/s', - ), - 'Output_Token_Throughput': ColumnProperties( - dtype='float', - label='Output Token Throughput', - pref=Pref.HIGH, - units='tok/s', - ), - 'Total_Token_Throughput': ColumnProperties( - dtype='float', - label='Total Token Throughput', - pref=Pref.HIGH, - units='tok/s', - ), - 'Thpt_per_GPU': ColumnProperties( - dtype='float', - label='Throughput per GPU', - pref=Pref.HIGH, - units='tok/s/GPU', - ), - 'Thpt_per_User': ColumnProperties( - dtype='float', - label='Throughput per User', - pref=Pref.HIGH, - units='tok/s/user', - ), - # Latency - # TTFT - 'Mean_TTFT_ms': ColumnProperties( - dtype='float', - label='Mean Time to First Token', - pref=Pref.LOW, - units='ms', - ), - 'StdDev_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token StDev', - pref=Pref.LOW, - units='ms', - ), - 'Min_TTFT_ms': ColumnProperties( - dtype='float', - label='Min Time to First Token', - pref=Pref.LOW, - units='ms', - ), - 'P0.1_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P0.1', - pref=Pref.LOW, - units='ms', - ), - 'P1_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P1', - pref=Pref.LOW, - units='ms', - ), - 'P5_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P5', - pref=Pref.LOW, - units='ms', - ), - 'P10_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P10', - pref=Pref.LOW, - units='ms', - ), - 'P25_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P25', - pref=Pref.LOW, - units='ms', - ), - 'P50_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P50', - pref=Pref.LOW, - units='ms', - ), - 'P75_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P75', - pref=Pref.LOW, - units='ms', - ), - 'P90_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P90', - pref=Pref.LOW, - units='ms', - ), - 'P95_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P95', - pref=Pref.LOW, - units='ms', - ), - 'P99_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P99', - pref=Pref.LOW, - units='ms', - ), - 'P99.9_TTFT_ms': ColumnProperties( - dtype='float', - label='Time to First Token P99.9', - pref=Pref.LOW, - units='ms', - ), - 'Max_TTFT_ms': ColumnProperties( - dtype='float', - label='Max Time to First Token', - pref=Pref.LOW, - units='ms', - ), - # TPOT - 'Mean_TPOT_ms': ColumnProperties( - dtype='float', - label='Mean Time per Output Token', - pref=Pref.LOW, - units='ms', - ), - 'StdDev_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token StdDev', - pref=Pref.LOW, - units='ms', - ), - 'Min_TPOT_ms': ColumnProperties( - dtype='float', - label='Min Time per Output Token', - pref=Pref.LOW, - units='ms', - ), - 'P0.1_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P0.1', - pref=Pref.LOW, - units='ms', - ), - 'P1_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P1', - pref=Pref.LOW, - units='ms', - ), - 'P5_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P5', - pref=Pref.LOW, - units='ms', - ), - 'P10_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P10', - pref=Pref.LOW, - units='ms', - ), - 'P25_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P25', - pref=Pref.LOW, - units='ms', - ), - 'P50_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P50', - pref=Pref.LOW, - units='ms', - ), - 'P75_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P75', - pref=Pref.LOW, - units='ms', - ), - 'P90_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P90', - pref=Pref.LOW, - units='ms', - ), - 'P95_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P95', - pref=Pref.LOW, - units='ms', - ), - 'P99_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P99', - pref=Pref.LOW, - units='ms', - ), - 'P99.9_TPOT_ms': ColumnProperties( - dtype='float', - label='Time per Output Token P99.9', - pref=Pref.LOW, - units='ms', - ), - 'Max_TPOT_ms': ColumnProperties( - dtype='float', - label='Max Time per Output Token', - pref=Pref.LOW, - units='ms', - ), - # ITL - 'Mean_ITL_ms': ColumnProperties( - dtype='float', - label='Mean Inter-Token Latency', - pref=Pref.LOW, - units='ms', - ), - 'StdDev_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency StdDev', - pref=Pref.LOW, - units='ms', - ), - 'Min_ITL_ms': ColumnProperties( - dtype='float', - label='Min Inter-Token Latency', - pref=Pref.LOW, - units='ms', - ), - 'P0.1_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P0.1', - pref=Pref.LOW, - units='ms', - ), - 'P1_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P1', - pref=Pref.LOW, - units='ms', - ), - 'P5_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P5', - pref=Pref.LOW, - units='ms', - ), - 'P10_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P10', - pref=Pref.LOW, - units='ms', - ), - 'P25_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P25', - pref=Pref.LOW, - units='ms', - ), - 'P50_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P50', - pref=Pref.LOW, - units='ms', - ), - 'P75_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P75', - pref=Pref.LOW, - units='ms', - ), - 'P90_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P90', - pref=Pref.LOW, - units='ms', - ), - 'P95_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P95', - pref=Pref.LOW, - units='ms', - ), - 'P99_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P99', - pref=Pref.LOW, - units='ms', - ), - 'P99.9_ITL_ms': ColumnProperties( - dtype='float', - label='Inter-Token Latency P99.9', - pref=Pref.LOW, - units='ms', - ), - 'Max_ITL_ms': ColumnProperties( - dtype='float', - label='Max Inter-Token Latency', - pref=Pref.LOW, - units='ms', - ), - # E2EL - 'Mean_E2EL_ms': ColumnProperties( - dtype='float', - label='Mean End-to-End Latency', - pref=Pref.LOW, - units='ms', - ), - 'StdDev_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency StdDev', - pref=Pref.LOW, - units='ms', - ), - 'Min_E2EL_ms': ColumnProperties( - dtype='float', - label='Min End-to-End Latency', - pref=Pref.LOW, - units='ms', - ), - 'P0.1_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P0.1', - pref=Pref.LOW, - units='ms', - ), - 'P1_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P1', - pref=Pref.LOW, - units='ms', - ), - 'P5_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P5', - pref=Pref.LOW, - units='ms', - ), - 'P10_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P10', - pref=Pref.LOW, - units='ms', - ), - 'P25_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P25', - pref=Pref.LOW, - units='ms', - ), - 'P50_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P50', - pref=Pref.LOW, - units='ms', - ), - 'P75_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P75', - pref=Pref.LOW, - units='ms', - ), - 'P90_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P90', - pref=Pref.LOW, - units='ms', - ), - 'P95_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P95', - pref=Pref.LOW, - units='ms', - ), - 'P99_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P99', - pref=Pref.LOW, - units='ms', - ), - 'P99.9_E2EL_ms': ColumnProperties( - dtype='float', - label='End-to-End Latency P99.9', - pref=Pref.LOW, - units='ms', - ), - 'Max_E2EL_ms': ColumnProperties( - dtype='float', - label='Max End-to-End Latency', - pref=Pref.LOW, - units='ms', - ), -} - -# Non-metrics columns, which may be used as an independent input variable -INPUT_COLUMNS = {} -INPUT_COLUMNS.update(RUN_COLUMNS) -INPUT_COLUMNS.update(CONFIGURATION_COLUMNS) -INPUT_COLUMNS.update(WORKLOAD_COLUMNS) - -# All dataset columns -COLUMNS = {} -COLUMNS.update(RUN_COLUMNS) -COLUMNS.update(CONFIGURATION_COLUMNS) -COLUMNS.update(WORKLOAD_COLUMNS) -COLUMNS.update(METRICS_COLUMNS) - - -@dataclass -class SLO: - """Service level objective.""" - - # Column of metric associated with the SLO - col: str - # Value the metric must be no worse than - value: float - - def __post_init__(self): - if self.col not in COLUMNS: - raise ValueError(f'Column does not exist: {self.col}') - if COLUMNS[self.col].dtype != 'float': - raise TypeError(f'Column must have float datatype: {self.col}') - if COLUMNS[self.col].pref == Pref.NEUTRAL: - raise Exception(f'Column must have a preferred direction: {self.col}') - - -def col_base(col: str) -> str: - """Get original column name, removing bound prefixes if present. - - Args: - col (str): Column name, which may include a bound prefix. - - Returns: - str: Column name, without any bound prefixes. - """ - if col[:BOUND_PREFIX_LEN] in COLUMN_BOUND_STR: - return col[BOUND_PREFIX_LEN:] - return col - - -def check_dir(dir: str) -> None: - """Print an error if directory does not exist. - - Args: - dir (str): Directory to check existence of. - """ - if not os.path.isdir(dir): - raise Exception(f'Invalid path: {dir}') - - -def check_file(file: str) -> None: - """Print an error if file does not exist. - - Args: - file (str): File to check existence of. - """ - if not os.path.isfile(file): - raise Exception(f'Invalid file: {file}') - - -def mul(a: int | float | None, b: int | float | None) -> int | float | None: - """Multiply two values, returning None if either value is None. - - Args: - a (int | float | None): First multiplicand. - b (int | float | None): Second multiplicand. - - Returns: - int | float | None: Multiplied result if multiplicands exist, otherwise None. - """ - if a is not None and b is not None: - return a * b - return None - - -def div(a: int | float | None, b: int | float | None) -> float | None: - """Divide two values, returning None if either value is None or divisor is 0. - - Args: - a (int | float | None): Dividend. - b (int | float | None): Divisor. - - Returns: - float | None: Result if inputs exist, otherwise None. - """ - if a is not None and b: - return a / b - return None - - -def get_benchmark_report_files( - source_dir: str, - recurse_symlinks: bool = False -) -> list[str]: - """Get a list of benchmark report files within provided path (recursive). - - Args: - source_dir (str): Directory to recursively search for results files. - recurse_symlinks (bool): Recurse through symbolic links. - - Returns: - list: List of paths to benchmark report files. - """ - rb_files = [] - check_dir(source_dir) - path = Path(source_dir) - - symlinks_supported = False - if recurse_symlinks: - if sys.version_info.major >= 3 and sys.version_info.minor >= 13: - symlinks_supported = True - else: - sys.stderr.write( - 'Symbolic link recursion not supported below Python 3.13\n') - - if recurse_symlinks and symlinks_supported: - for file in path.rglob( - 'benchmark_report,_*.yaml', - recurse_symlinks=True): - rb_files.append(str(file)) - else: - for file in path.rglob('benchmark_report,_*.yaml'): - rb_files.append(str(file)) - - return rb_files - - -def make_benchmark_runs_df() -> pd.DataFrame: - """Create DataFrame for benchmark run results. - - Returns: - DataFrame: Empty DataFrame for benchmark runs. - """ - schema = {} - for col, props in COLUMNS.items(): - schema[col] = pd.Series(dtype=props.dtype) - return pd.DataFrame(schema) - - -def _get_replicas_and_parallelism( - report: BenchmarkReport) -> dict[str, int | None]: - """Get the number of replicas and parallelisms. - - Args: - report (BenchmarkReport): Benchmark run to evaluate. - - Returns: - dict[str, int | None]: Replicas and parallelisms for standalone or - prefill/decode configuration. Irrelevant fields will have a value - of None. - """ - rp = { - 'replicas': None, - 'tp': None, - 'dp': None, - 'pp': None, - 'ep': None, - 'p_replicas': None, - 'p_tp': None, - 'p_dp': None, - 'p_pp': None, - 'p_ep': None, - 'd_replicas': None, - 'd_tp': None, - 'd_dp': None, - 'd_pp': None, - 'd_ep': None, - 'is_pd': None, - } - - if not report.scenario.host: - # Host details are not available - return rp - - rp['replicas'] = report.scenario.host.type.count(HostType.REPLICA) - rp['p_replicas'] = report.scenario.host.type.count(HostType.PREFILL) - rp['d_replicas'] = report.scenario.host.type.count(HostType.DECODE) - if rp['replicas'] == 0: - rp['replicas'] = None - if rp['p_replicas'] == 0: - rp['p_replicas'] = None - if rp['d_replicas'] == 0: - rp['d_replicas'] = None - - if rp['replicas']: - # We have an aggregate setup - rp['is_pd'] = False - rp['tp'] = report.scenario.host.accelerator[0].parallelism.tp - rp['dp'] = report.scenario.host.accelerator[0].parallelism.dp - rp['pp'] = report.scenario.host.accelerator[0].parallelism.pp - rp['ep'] = report.scenario.host.accelerator[0].parallelism.ep - return rp - # We have a P/D setup - rp['is_pd'] = True - for ii, accel in enumerate(report.scenario.host.accelerator): - if report.scenario.host.type[ii] is HostType.PREFILL and rp['p_tp'] is None: - rp['p_tp'] = accel.parallelism.tp - rp['p_dp'] = accel.parallelism.dp - rp['p_pp'] = accel.parallelism.pp - rp['p_ep'] = accel.parallelism.ep - if report.scenario.host.type[ii] is HostType.DECODE and rp['d_tp'] is None: - rp['d_tp'] = accel.parallelism.tp - rp['d_dp'] = accel.parallelism.dp - rp['d_pp'] = accel.parallelism.pp - rp['d_ep'] = accel.parallelism.ep - if rp['p_tp'] and rp['d_tp']: - break - return rp - - -def add_benchmark_report_to_df( - runs_df: pd.DataFrame, - br_file: str) -> None: - """Load a results file and add it to the DataFrame of benchmark runs. - - Args: - runs_df (DataFrame): DataFrame to add a row to for the provided run. - br_file (str): Benchmark report file to import. - """ - # Import benchmark report. - # We will parse through this to populate a row in the DataFrame - report = import_benchmark_report(br_file) - - # Get parallelism and replica details - rp = _get_replicas_and_parallelism(report) - if rp['is_pd']: - num_gpus = 0 - # We assume that EP = TP, where EP is used on expert layers, so no - # need to add EP into the GPU count. - if rp['p_replicas']: - num_gpus += rp['p_tp'] * rp['p_dp'] * rp['p_pp'] * rp['p_replicas'] - if rp['d_replicas']: - num_gpus += rp['d_tp'] * rp['d_dp'] * rp['d_pp'] * rp['d_replicas'] - elif rp['is_pd'] == False: - num_gpus = rp['tp'] * rp['replicas'] - else: - # Cannot determine number of GPUs - num_gpus = None - - # Get inference scheduler plugin parameters - prefix_cache_scorer_block_size = None - prefix_cache_scorer_lur_capacity_per_server = None - prefix_cache_scorer_max_blocks_to_match = None - prefix_cache_scorer_mode = '' - if report.scenario.platform and report.scenario.platform.metadata and isinstance( - report.scenario.platform.metadata, dict): - for plugin in get_nested( - report.scenario.platform.metadata, [ - 'inferenceScheduler', 'plugins'], []): - if plugin.get('type') == 'prefix-cache-scorer': - if 'parameters' not in plugin: - continue - prefix_cache_scorer_block_size = plugin['parameters'].get( - 'blockSize', 16) - prefix_cache_scorer_lur_capacity_per_server = plugin['parameters'].get( - 'lruCapacityPerServer', 31250) - prefix_cache_scorer_max_blocks_to_match = plugin['parameters'].get( - 'maxPrefixBlocksToMatch', 256) - # If mode is 'cache_tracking', then precise prefix scoring is - # used - prefix_cache_scorer_mode = plugin['parameters'].get( - 'mode', 'default') - # Set default plugin weights to zero (disabled) - # TODO: capture other settings for prefix cache scorer - # https://gateway-api-inference-extension.sigs.k8s.io/guides/epp-configuration/prefix-aware/ - prefix_cache_scorer_weight = 0 - kv_cache_scorer_weight = 0 - queue_scorer_weight = 0 - # TODO: this analysis assumes only a single scheduling profile. - # In addition we assume the plugins have not been renamed, and the pluginRef - # is the same as the plugin type. - # https://gateway-api-inference-extension.sigs.k8s.io/guides/epp-configuration/config-text/ - if report.scenario.platform and report.scenario.platform.metadata and isinstance( - report.scenario.platform.metadata, dict): - plugins = get_nested( - report.scenario.platform.metadata, [ - 'inferenceScheduler', 'schedulingProfiles'], [ - {}])[0].get( - 'plugins', []) - for plugin in plugins: - if plugin.get('pluginRef') == 'prefix-cache-scorer': - prefix_cache_scorer_weight = plugin.get('weight', 1) - if plugin.get('pluginRef') == 'kv-cache-scorer': - kv_cache_scorer_weight = plugin.get('weight', 1) - if plugin.get('pluginRef') == 'queue-scorer': - queue_scorer_weight = plugin.get('weight', 1) - - # Get workload details - max_qps = None - concurrency = None - system_prompt_length = None # Common prefix length - question_length = None # Length after common prefix - groups = None # Number of user groups with distinct prompts - prompts_per_group = None # Common prefixes within a group - target_osl = None - args = report.scenario.load.args - if report.scenario.load.name == WorkloadGenerator.INFERENCE_PERF: - # Workload generator stage - # If stage metadata is not present in benchmark report, we cannot know - # which Inference Perf result this data came from. - stage = report.scenario.load.metadata.get('stage') - # Get rate - if stage is not None: - stage_list = get_nested(args, ['load', 'stages']) - max_qps = stage_list[stage].get('rate') - # Request characteristics - system_prompt_length = get_nested( - args, ['data', 'shared_prefix', 'system_prompt_len']) - question_length = get_nested( - args, ['data', 'shared_prefix', 'question_len']) - groups = get_nested(args, ['data', 'shared_prefix', 'num_groups']) - prompts_per_group = get_nested( - args, ['data', 'shared_prefix', 'num_prompts_per_group']) - - target_osl = int( - get_nested( - args, [ - 'data', 'shared_prefix', 'output_len'], -1)) - elif report.scenario.load.name == WorkloadGenerator.VLLM_BENCHMARK: - concurrency = args.get('max_concurrency') - elif report.scenario.load.name == WorkloadGenerator.GUIDELLM: - # Workload generator stage - # If stage metadata is missing, this benchmark report is from an older - # version of convert.py that only took stage 0 results. - stage = report.scenario.load.metadata.get('stage', 0) - - if 'rate' in args: - max_qps = args['rate'][stage] - concurrencies = get_nested(args, ['profile', 'measured_concurrencies']) - if concurrencies: - concurrency = concurrencies[stage] - data_list = args.get('data') - if data_list: - data = yaml.safe_load(data_list[0]) - system_prompt_length = data.get('prefix_tokens') - question_length = data.get('prompt_tokens') - groups = 1 - prompts_per_group = data.get('prefix_count') - target_osl = data.get('output_tokens') - - # Multipliers to ensure values are in ms - ttft_mult = 1000 if report.metrics.latency.time_to_first_token.units == Units.S else 1 - tpot_mult = 1000 if report.metrics.latency.time_per_output_token.units == Units.S_PER_TOKEN else 1 - itl_mult = 1000 if report.metrics.latency.inter_token_latency.units == Units.S_PER_TOKEN else 1 - e2el_mult = 1000 if report.metrics.latency.request_latency.units == Units.S else 1 - - # Calculated metrics - thpt_per_gpu = div(report.metrics.throughput.output_tokens_per_sec, num_gpus) - thpt_per_user = div(1, (div(mul(report.metrics.latency.time_per_output_token.mean, tpot_mult), 1000))) - - # Scenario details - engine = None - gpu_model = None - if report.scenario.platform: - engine = report.scenario.platform.engine[0].name - if report.scenario.host: - gpu_model = report.scenario.host.accelerator[0].model - - # Add row to DataFrame - runs_df.loc[len(runs_df)] = { - # Details about particular run - 'Directory': os.path.abspath(br_file).rsplit(os.sep, 1)[0], - 'Directory_Base': os.path.abspath(br_file).rsplit(os.sep, 2)[0], - 'Start': report.metrics.time.start, - 'Duration': report.metrics.time.duration, - 'Platform': engine, - # AI model name - 'Model': report.scenario.model.name, - # Accelerator and parallelism - # Assume only a single GPU type - 'GPU': gpu_model, - 'Num_GPUs': num_gpus, - 'DP': rp['dp'], - 'TP': rp['tp'], - 'PP': rp['pp'], - 'EP': rp['ep'], - 'Replicas': rp['replicas'], - 'P_DP': rp['p_dp'], - 'P_TP': rp['p_tp'], - 'P_PP': rp['p_pp'], - 'P_EP': rp['p_ep'], - 'P_Replicas': rp['p_replicas'], - 'D_DP': rp['d_dp'], - 'D_TP': rp['d_tp'], - 'D_PP': rp['d_pp'], - 'D_EP': rp['d_ep'], - 'D_Replicas': rp['d_replicas'], - 'Is_PD': rp['is_pd'], - # Inference scheduler settings - 'KV_Cache_Scorer_Weight': kv_cache_scorer_weight, - 'Queue_Scorer_Weight': queue_scorer_weight, - 'Prefix_Cache_Scorer_Weight': prefix_cache_scorer_weight, - 'Prefix_Cache_Scorer_Block_Size': prefix_cache_scorer_block_size, - 'Prefix_Cache_Scorer_LRU_Capacity_Per_Server': prefix_cache_scorer_lur_capacity_per_server, - 'Prefix_Cache_Scorer_Max_Blocks_To_Match': prefix_cache_scorer_max_blocks_to_match, - 'Prefix_Cache_Scorer_Mode': prefix_cache_scorer_mode, - # Workload - 'Workload_Generator': report.scenario.load.name, - 'ISL': int(round(report.metrics.requests.input_length.mean)), - 'OSL': int(round(report.metrics.requests.output_length.mean)), - 'Target_OSL': target_osl, - 'Max_Concurrency': concurrency, - 'Max_QPS': max_qps, - 'System_Prompt_Length': system_prompt_length, - 'Question_Length': question_length, - 'Groups': groups, - 'Prompts_Per_Group': prompts_per_group, - # Requests - 'Total_Requests': report.metrics.requests.total, - 'Failures': report.metrics.requests.failures, - # Performance metrics - # Throughput - 'Request_Throughput': report.metrics.throughput.requests_per_sec, - 'Output_Token_Throughput': report.metrics.throughput.output_tokens_per_sec, - 'Total_Token_Throughput': report.metrics.throughput.total_tokens_per_sec, - 'Thpt_per_GPU': thpt_per_gpu, - 'Thpt_per_User': thpt_per_user, - # Latency - # TTFT - 'Mean_TTFT_ms': mul(report.metrics.latency.time_to_first_token.mean, ttft_mult), - 'StdDev_TTFT_ms': mul(report.metrics.latency.time_to_first_token.stddev, ttft_mult), - 'Min_TTFT_ms': mul(report.metrics.latency.time_to_first_token.min, ttft_mult), - 'P0.1_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p0p1, ttft_mult), - 'P1_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p1, ttft_mult), - 'P5_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p5, ttft_mult), - 'P10_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p10, ttft_mult), - 'P25_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p25, ttft_mult), - 'P50_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p50, ttft_mult), - 'P75_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p75, ttft_mult), - 'P90_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p90, ttft_mult), - 'P95_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p95, ttft_mult), - 'P99_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p99, ttft_mult), - 'P99.9_TTFT_ms': mul(report.metrics.latency.time_to_first_token.p99p9, ttft_mult), - 'Max_TTFT_ms': mul(report.metrics.latency.time_to_first_token.max, ttft_mult), - # TPOT - 'Mean_TPOT_ms': mul(report.metrics.latency.time_per_output_token.mean, tpot_mult), - 'StdDev_TPOT_ms': mul(report.metrics.latency.time_per_output_token.stddev, tpot_mult), - 'Min_TPOT_ms': mul(report.metrics.latency.time_per_output_token.min, tpot_mult), - 'P0.1_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p0p1, tpot_mult), - 'P1_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p1, tpot_mult), - 'P5_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p5, tpot_mult), - 'P10_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p10, tpot_mult), - 'P25_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p25, tpot_mult), - 'P50_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p50, tpot_mult), - 'P75_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p75, tpot_mult), - 'P90_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p90, tpot_mult), - 'P95_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p95, tpot_mult), - 'P99_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p99, tpot_mult), - 'P99.9_TPOT_ms': mul(report.metrics.latency.time_per_output_token.p99p9, tpot_mult), - 'Max_TPOT_ms': mul(report.metrics.latency.time_per_output_token.max, tpot_mult), - # ITL - 'Mean_ITL_ms': mul(report.metrics.latency.inter_token_latency.mean, itl_mult), - 'StdDev_ITL_ms': mul(report.metrics.latency.inter_token_latency.stddev, itl_mult), - 'Min_ITL_ms': mul(report.metrics.latency.inter_token_latency.min, itl_mult), - 'P0.1_ITL_ms': mul(report.metrics.latency.inter_token_latency.p0p1, itl_mult), - 'P1_ITL_ms': mul(report.metrics.latency.inter_token_latency.p1, itl_mult), - 'P5_ITL_ms': mul(report.metrics.latency.inter_token_latency.p5, itl_mult), - 'P10_ITL_ms': mul(report.metrics.latency.inter_token_latency.p10, itl_mult), - 'P25_ITL_ms': mul(report.metrics.latency.inter_token_latency.p25, itl_mult), - 'P50_ITL_ms': mul(report.metrics.latency.inter_token_latency.p50, itl_mult), - 'P75_ITL_ms': mul(report.metrics.latency.inter_token_latency.p75, itl_mult), - 'P90_ITL_ms': mul(report.metrics.latency.inter_token_latency.p90, itl_mult), - 'P95_ITL_ms': mul(report.metrics.latency.inter_token_latency.p95, itl_mult), - 'P99_ITL_ms': mul(report.metrics.latency.inter_token_latency.p99, itl_mult), - 'P99.9_ITL_ms': mul(report.metrics.latency.inter_token_latency.p99p9, itl_mult), - 'Max_ITL_ms': mul(report.metrics.latency.inter_token_latency.max, itl_mult), - # E2EL - 'Mean_E2EL_ms': mul(report.metrics.latency.request_latency.mean, e2el_mult), - 'StdDev_E2EL_ms': mul(report.metrics.latency.request_latency.stddev, e2el_mult), - 'Min_E2EL_ms': mul(report.metrics.latency.request_latency.min, e2el_mult), - 'P0.1_E2EL_ms': mul(report.metrics.latency.request_latency.p0p1, e2el_mult), - 'P1_E2EL_ms': mul(report.metrics.latency.request_latency.p1, e2el_mult), - 'P5_E2EL_ms': mul(report.metrics.latency.request_latency.p5, e2el_mult), - 'P10_E2EL_ms': mul(report.metrics.latency.request_latency.p10, e2el_mult), - 'P25_E2EL_ms': mul(report.metrics.latency.request_latency.p25, e2el_mult), - 'P50_E2EL_ms': mul(report.metrics.latency.request_latency.p50, e2el_mult), - 'P75_E2EL_ms': mul(report.metrics.latency.request_latency.p75, e2el_mult), - 'P90_E2EL_ms': mul(report.metrics.latency.request_latency.p90, e2el_mult), - 'P95_E2EL_ms': mul(report.metrics.latency.request_latency.p95, e2el_mult), - 'P99_E2EL_ms': mul(report.metrics.latency.request_latency.p99, e2el_mult), - 'P99.9_E2EL_ms': mul(report.metrics.latency.request_latency.p99p9, e2el_mult), - 'Max_E2EL_ms': mul(report.metrics.latency.request_latency.max, e2el_mult), - } - - -def get_scenarios( - runs_df: pd.DataFrame, - scenario_columns: list[str], - bounded: bool = False) -> list[dict[str, Any]]: - """Get a list of available scenarios and numeric bounds from runs DataFrame. - - Args: - runs_df (DataFrame): Benchmark runs to find the scenarios for. - scenario_columns (list[str]): Columns to group into common sets. - bounded (bool): For numeric columns, return min/max bounds. - - Returns: - list[dict[str, Any]]: List of scenarios, consisting of unique groups of - values from scenario_columns. When bounded scenarios are returned, - any numeric columns are given as the min/max available with - __ge__ and __le__ prefixes, respectively. - """ - # Non-numeric columns - cols_nn = [] - # Numeric columns - cols_num = [] - for col in scenario_columns: - if col not in runs_df.columns: - raise KeyError(f'Invalid column: {col}') - if COLUMNS[col].dtype in ['int', 'float']: - cols_num.append(col) - else: - cols_nn.append(col) - - # Get unique combinations of values for non-numeric scenario columns, - # as tuples. - if bounded: - if not cols_nn: - raise Exception( - 'Scenario must include at least one non-numeric column') - scenario_tuples = list(set(runs_df.set_index(cols_nn).index.dropna())) - else: - scenario_tuples = list( - set(runs_df.set_index(scenario_columns).index.dropna())) - - # Create list of scenario dicts - scenarios = [] - # If there is a column that is all NA in a scenario, we will drop that - # scenario - all_na = False - for s_tuple in scenario_tuples: - s_dict = {} - if bounded: - for ii, col_nn in enumerate(cols_nn): - if isinstance(s_tuple, str): - # If only a single non-numeric column exists, - # runs_df.set_index(cols_nn) will be type - # pandas.core.indexes.base.Index rather than - # pandas.core.indexes.multi.MultiIndex and s_tuple will be - # the column name (string rather than tuple of strings) - s_dict[col_nn] = s_tuple - else: - s_dict[col_nn] = s_tuple[ii] - # Get rows matching this scenario's non-numeric columns - df = get_scenario_df(runs_df, s_dict) - # Get min/max for numeric columns of this scenario - for col_num in cols_num: - if df[col_num].isna().all(): - # This scenario has a column that is all NA, drop it - all_na = True - break - # Format as appropriate data type - fmt = getattr(builtins, COLUMNS[col_num].dtype) - # Get min/max - val_min = fmt(df[col_num].min()) - val_max = fmt(df[col_num].max()) - if val_min == val_max: - # Column only has a single value, no need to specify bounds - s_dict[col_num] = val_min - else: - s_dict['__ge__' + col_num] = val_min - s_dict['__le__' + col_num] = val_max - else: - for ii, col in enumerate(scenario_columns): - if isinstance(s_tuple, str): - # If only a single scenario column is defined, - # runs_df.set_index(scenario_columns) will be type - # pandas.core.indexes.base.Index rather than - # pandas.core.indexes.multi.MultiIndex and s_tuple will be - # the column name (string rather than tuple of strings) - s_dict[col] = s_tuple - else: - s_dict[col] = s_tuple[ii] - if not all_na: - # Add scenario only if there are rows were all columns have data - scenarios.append(s_dict) - all_na = False - - return scenarios - - -def get_scenario_df( - runs_df: pd.DataFrame, - scenario: dict[str, Any]) -> pd.DataFrame: - """Get rows from a dataframe matching a scenario. - - Args: - runs_df (pandas.DataFrame): Benchmark runs to retrieve the - scenario data from. - scenario (dict[str, Any]): Columns and values to match. - - Returns: - pandas.DataFrame: Rows matching the scenario. - """ - for col, val in scenario.items(): - if col[:BOUND_PREFIX_LEN] == '__ge__': - runs_df = runs_df[(runs_df[col[BOUND_PREFIX_LEN:]] >= val)] - elif col[:BOUND_PREFIX_LEN] == '__gt__': - runs_df = runs_df[(runs_df[col[BOUND_PREFIX_LEN:]] > val)] - elif col[:BOUND_PREFIX_LEN] == '__lt__': - runs_df = runs_df[(runs_df[col[BOUND_PREFIX_LEN:]] < val)] - elif col[:BOUND_PREFIX_LEN] == '__le__': - runs_df = runs_df[(runs_df[col[BOUND_PREFIX_LEN:]] <= val)] - else: - runs_df = runs_df[(runs_df[col] == val)] - return runs_df - - -def set_scenario_bounds( - scenario: dict[str, Any], - bounds: dict[str, dict[str, int | float]]) -> dict[str, Any]: - """Create a new scenario with bounds applied. - - Args: - scenario (dict[str, Any]): Scenario to apply new bounds to. - bounds (dict[str, dict[str, int | float]]): Bounds to apply to - scenario. - - Returns: - dict[str, Any]: Scenario with updated column bounds. - """ - - scenario_bounded = {} - - # Get scenario columns, without bound prefixes - scenario_cols = [] - for col in scenario: - cb = col_base(col) - if cb not in scenario_cols: - scenario_cols.append(cb) - # Make sure bounds apply only to columns that exist in scenario - for col in bounds: - if col not in scenario_cols: - raise KeyError(f'Invalid column for scenario: {col_base(col)}') - - # Add columns not in bounds to scenario_bounded - for col, val in scenario.items(): - if col_base(col) in bounds: - continue - scenario_bounded[col] = val - - # Add new bounds to scenario - for col, bdict in bounds.items(): - if not bdict: - raise Exception(f'Empty bounds for column: {col}') - for bb, val in bdict.items(): - if bb in STR_TO_COLUMN_BOUND: - scenario_bounded[STR_TO_COLUMN_BOUND[bb] + col] = val - else: - raise Exception(f'Invalid bound type: {bb}') - - return scenario_bounded - - -def rebound_scenario( - runs_df: pd.DataFrame, - scenario: dict[str, Any]) -> dict[str, Any]: - """Update scenario bounds to match available data. - - Tighten any bounds that loosely describe available data. - - For bounds on a column which result in a single value, remove bounds and - set this to an inequality. - - If there is no data matching the scenario, return scenario as-is. - - Args: - runs_df (pandas.DataFrame): Benchmark runs the scenario applies to. - scenario (dict[str, Any]): Columns and values to match. - - Returns: - dict[str, Any]: Scenario with updated column bounds. - """ - - df = get_scenario_df(runs_df, scenario) - if len(df) == 0: - return scenario - - # Columns that are given as a bound - cols_bounded = [] - # Get columns that are bounded along with their min/max values available - scenario_tight = {} - for col, val in scenario.items(): - if col[:BOUND_PREFIX_LEN] in COLUMN_BOUND_STR: - bcol = col_base(col) - if bcol not in cols_bounded: - # Keep record of bounded columns we already covered - cols_bounded.append(bcol) - # Format as appropriate data type - fmt = getattr(builtins, COLUMNS[bcol].dtype) - # Get min/max - val_min = fmt(df[bcol].min()) - val_max = fmt(df[bcol].max()) - if val_min == val_max: - # Column only has a single value, no need to specify bounds - scenario_tight[bcol] = val_min - else: - # Apply lower and upper bounds matching available data - scenario_tight['__ge__' + bcol] = val_min - scenario_tight['__le__' + bcol] = val_max - else: - # Fixed column - scenario_tight[col] = val - - return scenario_tight - - -def get_scenario_counts( - runs_df: pd.DataFrame, - scenarios: list[dict[str, Any]], -) -> list[int]: - """Get a count of rows in DataFrame matching each scenario. - - Args: - runs_df (pandas.DataFrame): Benchmark runs to count scenario rows from. - scenarios (list[dict[str, Any]]): Scenario groups to count. - - Returns: - list[int]: Counts for each scenario. - """ - counts = [] - for sc in scenarios: - count = len(get_scenario_df(runs_df, sc)) - counts.append(count) - return counts - - -def print_scenarios( - scenarios: list[dict[str, Any]], - runs_df: pd.DataFrame | None = None, - min_count: int = 0 -) -> None: - """Print a formatted table of scenarios. - - Args: - scenarios (list[dict[str, Any]]): Scenario groups to print. - runs_df (pandas.DataFrame | None): Benchmark runs to retrieve the - scenario data from. - min_count (int): Only show scenarios with at least this many rows. - """ - - if not scenarios: - print(f'{Text.BOLD}{Text.RED}No scenarios available!{Text.DEFAULT}') - return - - col_names = [] - # Length of column headers in printable characters - col_names_len = [] - for col in scenarios[0].keys(): - - if col[:BOUND_PREFIX_LEN] in COLUMN_BOUND_STR: - col_bound = col[:BOUND_PREFIX_LEN] - col_base = col[BOUND_PREFIX_LEN:] - col_names.append( - col_base + - Text.MAGENTA + - COLUMN_BOUND_STR[col_bound] + - Text.DEFAULT + - Text.BOLD) - col_names_len.append(len(col[BOUND_PREFIX_LEN:]) + 1) - else: - col_names.append(col) - col_names_len.append(len(col)) - - # Get maximum text length for each column, including header - spans = col_names_len[:] - for sc in scenarios: - for ii, value in enumerate(sc.values()): - if spans[ii] < len(str(value)): - spans[ii] = len(str(value)) - - # Create header, starting with scenario index - if runs_df is None: - header = f'{Text.BOLD}{Text.BLUE}IDX {Text.DEFAULT}{Text.BOLD}' - else: - counts = get_scenario_counts(runs_df, scenarios) - header = f"""{ - Text.BOLD}{ - Text.BLUE}IDX { - Text.RED}Count { - Text.DEFAULT}{ - Text.BOLD}""" - - # Add each column name to header - for ii, col in enumerate(col_names): - header += col + " " * (spans[ii] - col_names_len[ii] + 2) - header += f'{Text.DEFAULT}' - print(header) - - # Print details of each scenario - for ii, sc in enumerate(scenarios): - row = f'{Text.BLUE}{ii}{Text.DEFAULT}' + " " * (5 - len(str(ii))) - if counts: - if counts[ii] < min_count: - continue - row += f'{Text.RED}{counts[ii]}{Text.DEFAULT}' + \ - " " * (7 - len(str(counts[ii]))) - for jj, val in enumerate(sc.values()): - row += f'{str(val)}' + " " * (spans[jj] - len(str(val)) + 2) - print(row) - - -def make_scenarios_summary_df( - scenarios: list[dict[str, Any]], - runs_df: pd.DataFrame, - min_count: int = 0 -) -> pd.DataFrame: - """ - Make a DataFrame of schenarios details, analagous to the printout from - print_scenarios(). - - Args: - scenarios (list[dict[str, Any]]): Scenario groups to show. - runs_df (pandas.DataFrame): Benchmark runs to retrieve the scenario - data from. - min_count (int): Only show scenarios with at least this many rows. - - Returns: - pandas.DataFrame: Details about available scenarios - """ - - # Make a column name utilizing bound prefixes - def col_name(col: str) -> str: - if col[:BOUND_PREFIX_LEN] in COLUMN_BOUND_STR: - return col[BOUND_PREFIX_LEN:] + \ - COLUMN_BOUND_STR[col[:BOUND_PREFIX_LEN]] - return col - - # Make DataFrame with matching row counts, and columns values from scenario - schema = { - 'Count': pd.Series(dtype='int'), - } - - if scenarios: - # If scenarios is empty, we will end up with a DataFrame having only - # a 'Count' column and no rows - for col in scenarios[0].keys(): - schema[col_name(col)] = pd.Series( - dtype=COLUMNS[col_base(col)].dtype) - df = pd.DataFrame(schema) - - # Populate DataFrame - counts = get_scenario_counts(runs_df, scenarios) - for ii, sc in enumerate(scenarios): - if counts[ii] < min_count: - continue - row = {'Count': counts[ii]} - for col, val in sc.items(): - row[col_name(col)] = val - # Index of DataFrame will have 1:1 correspondance with scenario index - df.loc[ii] = row - - return df - - -def get_meet_slo_df( - runs_df: pd.DataFrame, - slos: list[SLO]) -> pd.DataFrame: - """Get rows from dataset meeting provided SLOs. - - Args: - runs_df (pandas.DataFrame): Dataset to search. - slos (list[SLO]): SLOs to meet. - - Returns: - pandas.DataFrame: Rows matching SLOs - """ - runs_meet_slo_df = runs_df - for slo in slos: - if COLUMNS[slo.col].pref == Pref.LOW: - # Must be less than or equal to SLO value to meet SLO - runs_meet_slo_df = runs_meet_slo_df[runs_meet_slo_df[slo.col].__le__( - slo.value)] - elif COLUMNS[slo.col].pref == Pref.HIGH: - # Must be greater than or equal to SLO value to meet SLO - runs_meet_slo_df = runs_meet_slo_df[runs_meet_slo_df[slo.col].__ge__( - slo.value)] - else: - raise Exception(f'Invalid SLO: {slo.col}') - return runs_meet_slo_df - - -def get_pareto_front_df( - runs_df: pd.DataFrame, - col_a: str, - col_b: str, - sort: bool = False) -> pd.DataFrame: - """Get rows from dataset on Pareto front for the provided metrics. - - Args: - runs_df (pandas.DataFrame): Dataset to search. - col_a (str): First metric column to optimize. - col_b (str): Second metric column to optimize. - sort (bool): Sort results - - Returns: - pandas.DataFrame: Rows on the Pareto front. - """ - # Make sure columns have a preferred direction - if COLUMNS[col_a].pref == Pref.NEUTRAL: - raise Exception(f'Column does not have a preferred direction: {col_a}') - if COLUMNS[col_b].pref == Pref.NEUTRAL: - raise Exception(f'Column does not have a preferred direction: {col_b}') - - def better(a: Any, b: Any, col: str) -> bool: - """Return true if column in 'a' is better than 'b'.""" - if COLUMNS[col].pref == Pref.LOW: - return a[col] < b[col] - if COLUMNS[col].pref == Pref.HIGH: - return a[col] > b[col] - raise Exception(f'Invalid preference for column: {col}') - - pareto_set = set(runs_df.index.tolist()) - for ii, rowa in runs_df.iterrows(): - is_pareto_front = runs_df.index.isin(pareto_set) - for jj, rowb in runs_df[is_pareto_front].iterrows(): - if ii == jj: - continue - if better(rowa, rowb, col_a) and better(rowa, rowb, col_b): - # Index jj worse in all ways to index ii - pareto_set.remove(jj) - if sort: - return runs_df[runs_df.index.isin(pareto_set)].sort_values(by=col_a) - else: - # Preserve order - return runs_df[runs_df.index.isin(pareto_set)] diff --git a/config_explorer/src/config_explorer/plotting.py b/config_explorer/src/config_explorer/plotting.py deleted file mode 100644 index 57e428dc..00000000 --- a/config_explorer/src/config_explorer/plotting.py +++ /dev/null @@ -1,500 +0,0 @@ -""" -Plotting functions for configuration explorer. -""" - -from math import log10 -from typing import Any - -import matplotlib.pyplot as plt -import pandas as pd - -from .explorer import ( - COLUMNS, - SLO, - col_base, - get_scenario_df, - get_meet_slo_df, - get_pareto_front_df, - rebound_scenario, -) -from .constants import ( - BOUND_PREFIX_LEN, - COLUMN_BOUND_STR, -) - - -# Figure number -fignum = 0 - -# Plot trace colors -COLORS = [ - '#FF0000', '#FFAA00', '#DDDD00', '#00DD00', '#00FFFF', '#0000FF', - '#FF00FF', '#666666', '#000000', '#990000', '#777700', '#007700', - '#009999', '#000099' -] - - -# Plot line styles -LINE_STYLES = [ - 'solid', 'dashed', 'dashdot', 'dotted' -] - - -# Plot marker styles -MARKERS = [ - 'o', 'v', 's', '*', 'd', 'X', 'p' -] - - -def _column_axis_label(col: str) -> str: - """Get plot axis label for a column. - - Args: - col (str): Column to make a label for. - - Returns - str: Axis label. - """ - label = COLUMNS[col].label - if COLUMNS[col].units: - label += ' (' + COLUMNS[col].units + ')' - return label - - -def _make_title(scenario: dict[str, Any]) -> str: - """Make a plot title that details the scenario. - - Args: - scenario (dict[str, Any]): Scenario to describe as a multi-line title. - - Returns: - str: Plot title - """ - - # Columns describing a bound - cols_bounded = [] - - title = '' - for col, value in scenario.items(): - if col[:BOUND_PREFIX_LEN] in COLUMN_BOUND_STR: - if col_base(col) not in cols_bounded: - cols_bounded.append(col_base(col)) - # Handle bounded columns later - continue - if len(title.rsplit('\n')[-1]) > 30: - title += '\n' - title += f'{COLUMNS[col].label}: {value} ' - - # Add bounded columns to title - for col in cols_bounded: - if len(title.rsplit('\n')[-1]) > 30: - title += '\n' - # Strings describing value bound - val_bounds = [] - for bound_type in COLUMN_BOUND_STR: - # Bounded column name, as it will appear in a scenario - col_bound = bound_type + col - if col_bound not in scenario: - # This bound type is not in the scenario - continue - value = scenario[col_bound] - val_bounds.append(f'{COLUMN_BOUND_STR[bound_type]}{value}') - title += f'{COLUMNS[col].label}: {" ".join(val_bounds)} ' - - return title.strip() - - -def plot_col_histogram( - runs_df: pd.DataFrame, - col: str, - num_bins: int = 50) -> plt.Figure: - """Plot a histogram of values for a column. - - Args: - runs_df (pandas.DataFrame): Benchmark run data. - col_x (str): Column to histogram. - num_bins (int): Number of bins to use in histogram. - - Returns: - matplotlib.pyplot.Figure: Plot figure. - """ - if len(runs_df[col].dropna()) < 1: - return - - global fignum - fignum += 1 - fig = plt.figure(fignum) - - plt.hist(list(runs_df[col].dropna()), bins=num_bins, color='#0000FF') - plt.xlabel(_column_axis_label(col), fontsize='16') - plt.ylabel('Counts', fontsize='16') - return fig - - -def plot_scenario_histogram( - runs_df: pd.DataFrame, - scenario: dict[str, Any], - num_bins: int = 50) -> dict[str, plt.Figure]: - """ - Plot value histograms for numeric columns in a scenario having a bound. - Any columns having a single value will be skipped. - - Args: - runs_df (pandas.DataFrame): Benchmark run data. - scenario (dict[str, Any]): Scenario from benchmark data to plot. - num_bins (int): Number of bins to use in histogram. - - Returns: - dict[str, matplotlib.pyplot.Figure]: List of histogram plots. - """ - figs = {} - - plot_cols = [] - for col in scenario: - if col[:BOUND_PREFIX_LEN] not in COLUMN_BOUND_STR: - # This is not a bounded column - continue - if col[BOUND_PREFIX_LEN:] in plot_cols: - # This column was already plotted - continue - if len(runs_df[col[BOUND_PREFIX_LEN:]].dropna().unique()) < 2: - # There is only a single value, no need to plot a histogram - continue - # Keep record of plotted columns - plot_cols.append(col[BOUND_PREFIX_LEN:]) - # Create histogram figure - figs[col[BOUND_PREFIX_LEN:]] = plot_col_histogram( - runs_df, col[BOUND_PREFIX_LEN:], num_bins) - - return figs - - -def plot_scenario( - runs_df: pd.DataFrame, - scenario: dict[str, Any], - config_keys: list[str] | list[list[str]], - col_x: str, - col_y: str, - col_seg_by: str = '', - log_x: bool = False, - log_y: bool = False) -> plt.Figure: - """Plot the metrics of a scenario from a column (Y) versus another - column (X). - - An example would be viewing throughput (Y) vs queries per second (X). - - Args: - runs_df (pandas.DataFrame): Benchmark run data. - scenario (dict[str, Any]): Scenario from benchmark data to plot. - config_keys (list[str] | list[list[str]]): a list of columns to be - grouped together as a set of configuration parameters to be - compared within the plot. Each unique grouping of these columns - will be a trace on the plot. A list of configuration keys may - also be provided (list of lists of column names). - col_x (str): Column from benchmark data for X axis. - col_y (str): Column from benchmark data for Y axis. - col_seg_by (str): Group points with matching config_keys only - if they come from rows where this column also matches. This is - effectively another configuration key, but its value is not - displayed on the plot. This is helpful when repeated runs of the - same experiment are viewed, and this is set to the source - directory that is common only to points within a run. - log_x (bool): Plot X axis on log scale. - log_y (bool): Plot Y axis on log scale. - - Returns: - matplotlib.pyplot.Figure: Plot figure. - """ - for col in scenario: - if col_base(col) not in runs_df.columns: - raise KeyError(f'Invalid column: {col_base(col)}') - - scenario = rebound_scenario(runs_df, scenario) - - # Filter runs to specific scenario - runs_df = get_scenario_df(runs_df, scenario) - - if log_x and log_y: - plot_func = plt.loglog - elif log_x: - plot_func = plt.semilogx - elif log_y: - plot_func = plt.semilogy - else: - plot_func = plt.plot - - # Ensure we always have a list of configuration keys (list of lists) - if isinstance(config_keys[0], str): - config_keys = [config_keys] - - global fignum - fignum += 1 - fig = plt.figure(fignum) - - for kk, ck_ in enumerate(config_keys): - # Make a copy of config keys so we can modify it without side effects. - ck = ck_[:] - if col_seg_by and col_seg_by not in ck: - ck.append(col_seg_by) - - # Given configuration keys, find the set of unique combinations of - # these columns within the dataset. - config_sets = list(set(runs_df.set_index(ck).index.dropna())) - config_sets.sort() - - for ii, conf in enumerate(config_sets): - # Make a DataFrame for specific configuration - conf_df = runs_df - labels = [] - for jj, val in enumerate(conf): - conf_df = conf_df[(conf_df[ck[jj]] == val) - ].sort_values(by=col_x) - if ck[jj] == col_seg_by: - continue - labels.append(f'{COLUMNS[ck[jj]].label}={val}') - label = ', '.join(labels) - - # Make plot - plot_func( - conf_df[col_x], conf_df[col_y], - label=label, - marker=MARKERS[kk % len(MARKERS)], markersize=4, - color=COLORS[ii % len(COLORS)], - linestyle=LINE_STYLES[kk % len(LINE_STYLES)] - ) - - if log_x and log_y: - plt.axis([None, None, None, None]) - elif log_x: - plt.axis([None, None, 0, None]) - elif log_y: - plt.axis([0, None, None, None]) - else: - plt.axis([0, None, 0, None]) - - plt.title(_make_title(scenario)) - plt.xlabel(_column_axis_label(col_x), fontsize='16') - plt.ylabel(_column_axis_label(col_y), fontsize='16') - plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) - plt.grid(True, linewidth=1, ls='--', color='gray') - return fig - - -def plot_scenario_tradeoff( - runs_df: pd.DataFrame, - scenario: dict[str, Any], - config_keys: list[str] | list[list[str]], - col_x: str, - col_y: str, - col_z: str, - col_seg_by: str = '', - log_x: bool = False, - log_y: bool = False) -> plt.Figure: - """Make a plot displaying the tradeoff between two columns (X and Y) - while a third column (Z) is changed. - - An example would be viewing throughput vs latency as concurrency is - adjusted. - - Args: - runs_df (pandas.DataFrame): Benchmark run data. - scenario (dict[str, Any]): Scenario from benchmark data to plot. - config_keys (list[str] | list[list[str]]): a list of columns to be - grouped together as a set of configuration parameters to be - compared within the plot. Each unique grouping of these columns - will be a trace on the plot. A list of configuration keys may - also be provided (list of lists of column names). - col_x (str): Column from benchmark data to plot on X axis. - col_y (str): Column from benchmark data to plot on Y axis. - col_z (str): Column from benchmark data to label points with. - col_seg_by (str): Group points with matching config_keys only - if they come from rows where this column also matches. This is - effectively another configuration key, but its value is not - displayed on the plot. This is helpful when repeated runs of the - same experiment are viewed, and this is set to the source - directory that is common only to points within a run. - log_x (bool): Plot X axis on log scale. - log_y (bool): Plot Y axis on log scale. - - Returns: - matplotlib.pyplot.Figure: Plot figure. - """ - for col in scenario: - if col_base(col) not in runs_df.columns: - raise KeyError(f'Invalid column: {col_base(col)}') - - scenario = rebound_scenario(runs_df, scenario) - - # Filter runs to specific scenario - runs_df = get_scenario_df(runs_df, scenario) - - if log_x and log_y: - plot_func = plt.loglog - elif log_x: - plot_func = plt.semilogx - elif log_y: - plot_func = plt.semilogy - else: - plot_func = plt.plot - - # Ensure we always have a list of configuration keys (list of lists) - if isinstance(config_keys[0], str): - config_keys = [config_keys] - - global fignum - fignum += 1 - fig = plt.figure(fignum) - - for kk, ck_ in enumerate(config_keys): - # Make a copy of config keys so we can modify it without side effects. - ck = ck_[:] - if col_seg_by and col_seg_by not in ck: - ck.append(col_seg_by) - - # Given configuration keys, find the set of unique combinations of - # these columns within the dataset. - config_sets = list(set(runs_df.set_index(ck).index.dropna())) - config_sets.sort() - - for ii, conf in enumerate(config_sets): - # Make a DataFrame for specific configuration - conf_df = runs_df - labels = [] - for jj, val in enumerate(conf): - conf_df = conf_df[(conf_df[ck[jj]] == val) - ].sort_values(by=col_z) - if ck[jj] == col_seg_by: - continue - labels.append(f'{COLUMNS[ck[jj]].label}={val}') - label = ', '.join(labels) - - # Make plot - plot_func( - conf_df[col_x], conf_df[col_y], - label=label, - marker=MARKERS[kk % len(MARKERS)], markersize=4, - color=COLORS[ii % len(COLORS)], - linestyle=LINE_STYLES[kk % len(LINE_STYLES)] - ) - # Add Z labels to plot - for jj, val in enumerate(conf_df[col_z]): - if log_y: - y_offset = list(conf_df[col_y])[jj] * \ - log10(runs_df[col_y].max() - runs_df[col_y].min()) * 0.01 - else: - y_offset = runs_df[col_y].max() * 0.02 - plt.text(list(conf_df[col_x])[jj], - list(conf_df[col_y])[jj] + y_offset, - str(val), ha='center', color=COLORS[ii % len(COLORS)]) - - if log_x and log_y: - plt.axis([None, None, None, None]) - elif log_x: - plt.axis([None, None, 0, None]) - elif log_y: - plt.axis([0, None, None, None]) - else: - plt.axis([0, None, 0, None]) - - title = _make_title(scenario) - title += f'\n\nPoint labels: {_column_axis_label(col_z)}' - plt.title(title) - plt.xlabel(_column_axis_label(col_x), fontsize='16') - plt.ylabel(_column_axis_label(col_y), fontsize='16') - plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) - plt.grid(True, linewidth=1, ls='--', color='gray') - return fig - - -def plot_pareto_tradeoff( - runs_df: pd.DataFrame, - scenario: dict[str, Any], - col_x: str, - col_y: str, - slos: list[SLO] = [], - log_x: bool = False, - log_y: bool = False) -> plt.Figure: - """Make a plot displaying the tradeoff between two columns (X and Y), - highlighting the Pareto front and graying out points failng SLOs. - - Args: - runs_df (pandas.DataFrame): Benchmark run data. - scenario (dict[str, Any]): Scenario from benchmark data to select. - col_x (str): Column from benchmark data to plot on X axis. - col_y (str): Column from benchmark data to plot on Y axis. - slos (list[SLO]): Service level objectives. - log_x (bool): Plot X axis on log scale. - log_y (bool): Plot Y axis on log scale. - - Returns: - matplotlib.pyplot.Figure: Plot figure. - """ - for col in scenario: - if col_base(col) not in runs_df.columns: - raise KeyError(f'Invalid column: {col_base(col)}') - - scenario = rebound_scenario(runs_df, scenario) - - # Filter runs to specific scenario - scenario_df = get_scenario_df(runs_df, scenario) - # Get just the rows that meet SLOs - meet_slo_df = get_meet_slo_df(scenario_df, slos) - # From rows matching SLOs, get rows on Pareto front - pareto_df = get_pareto_front_df(meet_slo_df, col_x, col_y) - # Rows that fail SLOs - fail_slo_df = scenario_df[~scenario_df.index.isin( - meet_slo_df.index.tolist())] - # Rows that meet SLOs, but are not on the Pareto front - meet_slo_not_pareto_df = meet_slo_df[~meet_slo_df.index.isin( - pareto_df.index.tolist())] - - if log_x and log_y: - plot_func = plt.loglog - elif log_x: - plot_func = plt.semilogx - elif log_y: - plot_func = plt.semilogy - else: - plot_func = plt.plot - - global fignum - fignum += 1 - fig = plt.figure(fignum) - - plot_func( - pareto_df[col_x], pareto_df[col_y], - marker='o', markersize=4, - color='#FF00FF', - linestyle='', - label='Pareto front' - ) - plot_func( - meet_slo_not_pareto_df[col_x], meet_slo_not_pareto_df[col_y], - marker='o', markersize=4, - color='#000000', - linestyle='', - label='Meets SLOs, non-optimal' - ) - plot_func( - fail_slo_df[col_x], fail_slo_df[col_y], - marker='o', markersize=4, - color='#CCCCCC', - linestyle='', - label='Fails SLOs' - ) - - if log_x and log_y: - plt.axis([None, None, None, None]) - elif log_x: - plt.axis([None, None, 0, None]) - elif log_y: - plt.axis([0, None, None, None]) - else: - plt.axis([0, None, 0, None]) - - plt.title(_make_title(scenario)) - plt.xlabel(_column_axis_label(col_x), fontsize='16') - plt.ylabel(_column_axis_label(col_y), fontsize='16') - plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.) - plt.grid(True, linewidth=1, ls='--', color='gray') - return fig