Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 0 additions & 114 deletions ax/core/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from datetime import datetime, timedelta
from unittest.mock import patch

import numpy as np
import pandas as pd
from ax.core.arm import Arm
from ax.core.batch_trial import BatchTrial
Expand All @@ -28,19 +27,15 @@
from ax.core.utils import (
_maybe_update_trial_status_to_complete,
batch_trial_only,
best_feasible_objective,
compute_metric_availability,
extract_pending_observations,
get_missing_metrics,
get_missing_metrics_by_name,
get_model_times,
get_model_trace_of_times,
get_pending_observation_features,
get_pending_observation_features_based_on_trial_status as get_pending_status,
get_target_trial_index,
is_bandit_experiment,
MetricAvailability,
MissingMetrics,
)
from ax.exceptions.core import AxError
from ax.utils.common.constants import Keys
Expand Down Expand Up @@ -95,120 +90,11 @@ def setUp(self) -> None:
trial_index=self.hss_trial.index,
metadata=self.hss_cand_metadata,
)
self.df = pd.DataFrame(
[
{
"arm_name": "0_0",
"mean": 2.0,
"sem": 0.2,
"trial_index": 1,
"metric_name": "a",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "a",
},
{
"arm_name": "0_0",
"mean": 1.8,
"sem": 0.3,
"trial_index": 1,
"metric_name": "b",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "b",
},
{
"arm_name": "0_1",
"mean": float("nan"),
"sem": float("nan"),
"trial_index": 1,
"metric_name": "a",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "a",
},
{
"arm_name": "0_1",
"mean": 3.7,
"sem": 0.5,
"trial_index": 1,
"metric_name": "b",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "b",
},
{
"arm_name": "0_2",
"mean": 0.5,
"sem": None,
"trial_index": 1,
"metric_name": "a",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "a",
},
{
"arm_name": "0_2",
"mean": float("nan"),
"sem": float("nan"),
"trial_index": 1,
"metric_name": "b",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "b",
},
{
"arm_name": "0_2",
"mean": float("nan"),
"sem": float("nan"),
"trial_index": 1,
"metric_name": "c",
"start_time": "2018-01-01",
"end_time": "2018-01-02",
"metric_signature": "c",
},
]
)

self.data = Data(df=self.df)

self.optimization_config = OptimizationConfig(
objective=Objective(metric=Metric(name="a"), minimize=False),
outcome_constraints=[
OutcomeConstraint(
metric=Metric(name="b"),
op=ComparisonOp.GEQ,
bound=0,
relative=False,
)
],
)
self.batch_experiment = get_branin_experiment(with_completed_trial=False)
self.batch_experiment.status_quo = Arm(
name="status_quo", parameters={"x1": 0.0, "x2": 0.0}
)

def test_get_missing_metrics_by_name(self) -> None:
expected = {"a": {("0_1", 1)}, "b": {("0_2", 1)}}
actual = get_missing_metrics_by_name(self.data, ["a", "b"])
self.assertEqual(actual, expected)

def test_get_missing_metrics(self) -> None:
expected = MissingMetrics(
{"a": {("0_1", 1)}},
{"b": {("0_2", 1)}},
{"c": {("0_0", 1), ("0_1", 1), ("0_2", 1)}},
)
actual = get_missing_metrics(self.data, self.optimization_config)
self.assertEqual(actual, expected)

def test_best_feasible_objective(self) -> None:
bfo = best_feasible_objective(
self.optimization_config,
values={"a": np.array([1.0, 3.0, 2.0]), "b": np.array([0.0, -1.0, 0.0])},
)
self.assertEqual(list(bfo), [1.0, 1.0, 2.0])

def test_get_model_times(self) -> None:
exp = get_branin_experiment(num_trial=2)
fit_times, gen_times = get_model_trace_of_times(exp)
Expand Down
133 changes: 1 addition & 132 deletions ax/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,126 +12,29 @@
from enum import Enum
from functools import wraps
from logging import Logger
from typing import Any, NamedTuple
from typing import Any

import numpy as np
import numpy.typing as npt
import pandas as pd
from ax.core.arm import Arm
from ax.core.base_trial import BaseTrial, TrialStatus
from ax.core.batch_trial import BatchTrial
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun
from ax.core.map_metric import MapMetric
from ax.core.objective import MultiObjective
from ax.core.observation import ObservationFeatures
from ax.core.optimization_config import OptimizationConfig
from ax.core.trial import Trial
from ax.core.types import ComparisonOp
from ax.exceptions.core import AxError
from ax.utils.common.constants import Keys
from ax.utils.common.logger import get_logger
from pyre_extensions import none_throws

logger: Logger = get_logger(__name__)
TArmTrial = tuple[str, int]

# Threshold for switching to pending points extraction based on trial status.
MANY_TRIALS_IN_EXPERIMENT = 100
OLD_TRIAL_THRESHOLD_DAYS = 10

# --------------------------- Data integrity utils. ---------------------------


class MissingMetrics(NamedTuple):
objective: dict[str, set[TArmTrial]]
outcome_constraints: dict[str, set[TArmTrial]]
tracking_metrics: dict[str, set[TArmTrial]]


def get_missing_metrics(
data: Data, optimization_config: OptimizationConfig
) -> MissingMetrics:
"""Return all arm_name, trial_index pairs, for which some of the
observations of optimization config metrics are missing.

Args:
data: Data to search.
optimization_config: provides metric_names to search for.

Returns:
A NamedTuple(missing_objective, Dict[str, missing_outcome_constraint])
"""
objective = optimization_config.objective
if isinstance(objective, MultiObjective):
objective_metric_names = [m.name for m in objective.metrics]
else:
objective_metric_names = [optimization_config.objective.metric.name]

outcome_constraints_metric_names = [
outcome_constraint.metric.name
for outcome_constraint in optimization_config.outcome_constraints
]
missing_objectives = {
objective_metric_name: _get_missing_arm_trial_pairs(data, objective_metric_name)
for objective_metric_name in objective_metric_names
}
missing_outcome_constraints = get_missing_metrics_by_name(
data, outcome_constraints_metric_names
)
all_metric_names = set(data.df["metric_name"])
optimization_config_metric_names = set(missing_objectives.keys()).union(
outcome_constraints_metric_names
)
missing_tracking_metric_names = all_metric_names.difference(
optimization_config_metric_names
)
missing_tracking_metrics = get_missing_metrics_by_name(
data=data, metric_names=missing_tracking_metric_names
)
return MissingMetrics(
objective={k: v for k, v in missing_objectives.items() if len(v) > 0},
outcome_constraints={
k: v for k, v in missing_outcome_constraints.items() if len(v) > 0
},
tracking_metrics={
k: v for k, v in missing_tracking_metrics.items() if len(v) > 0
},
)


def get_missing_metrics_by_name(
data: Data, metric_names: Iterable[str]
) -> dict[str, set[TArmTrial]]:
"""Return all arm_name, trial_index pairs missing some observations of
specified metrics.

Args:
data: Data to search.
metric_names: list of metrics to search for.

Returns:
A Dict[str, missing_metrics], one entry for each metric_name.
"""
missing_metrics = {
metric_name: _get_missing_arm_trial_pairs(data=data, metric_name=metric_name)
for metric_name in metric_names
}
return missing_metrics


def _get_missing_arm_trial_pairs(data: Data, metric_name: str) -> set[TArmTrial]:
"""Return arm_name and trial_index pairs missing a specified metric."""
metric_df = data.df[data.df.metric_name == metric_name]
present_metric_df = metric_df[metric_df["mean"].notnull()]
arm_trial_pairs = set(zip(data.df["arm_name"], data.df["trial_index"]))
arm_trial_pairs_with_metric = set(
zip(present_metric_df["arm_name"], present_metric_df["trial_index"])
)
missing_arm_trial_pairs = arm_trial_pairs.difference(arm_trial_pairs_with_metric)
return missing_arm_trial_pairs


# ------------------- Utils shared by Client and BatchClient--------------------
def _maybe_update_trial_status_to_complete(
Expand Down Expand Up @@ -169,40 +72,6 @@ def _maybe_update_trial_status_to_complete(
# -------------------- Experiment result extraction utils. ---------------------


def best_feasible_objective(
optimization_config: OptimizationConfig,
values: dict[str, npt.NDArray],
) -> npt.NDArray:
"""Compute the best feasible objective value found by each iteration.

Args:
optimization_config: Optimization config.
values: Dictionary from metric name to array of value at each
iteration. If optimization config contains outcome constraints, values
for them must be present in `values`.

Returns: Array of cumulative best feasible value.
"""
# Get objective at each iteration
objective = optimization_config.objective
f = values[objective.metric.signature]
# Set infeasible points to have infinitely bad values
infeas_val = np.inf if objective.minimize else -np.inf
for oc in optimization_config.outcome_constraints:
if oc.relative:
raise ValueError(
"Benchmark aggregation does not support relative constraints"
)
g = values[oc.metric.signature]
feas = g <= oc.bound if oc.op == ComparisonOp.LEQ else g >= oc.bound
f[~feas] = infeas_val

# Get cumulative best
minimize = objective.minimize
accumulate = np.minimum.accumulate if minimize else np.maximum.accumulate
return accumulate(f)


def _extract_generator_runs(trial: BaseTrial) -> list[GeneratorRun]:
if isinstance(trial, BatchTrial):
return trial.generator_runs
Expand Down
Loading