Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 48 additions & 52 deletions carps/analysis/calc_hypervolume.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from carps.analysis.utils import get_ids_mo
from carps.utils.loggingutils import get_logger, setup_logging

# from carps.analysis.gather_data import convert_mixed_types_to_str
setup_logging()
logger = get_logger(__file__)

Expand Down Expand Up @@ -52,63 +53,40 @@ def gather_trajectory(x: pd.DataFrame) -> pd.DataFrame:
data.append(D)
return pd.DataFrame(data)

def get_pareto_front(costs):
"""Return all Pareto-optimal rows from the given array. Assumes minimization."""
is_efficient = np.ones(len(costs), dtype=bool)
for i, c in enumerate(costs):
if is_efficient[i]:
is_efficient[is_efficient] = np.any(costs[is_efficient] < c, axis=1) | np.all(costs[is_efficient] == c, axis=1)
is_efficient[i] = True
return costs[is_efficient]

def get_reference_point(x: pd.DataFrame, on_key: str = "trial_value__cost") -> np.ndarray:
"""Get reference point from the dataframe.

Dataframe should only contain data from one task. The reference point is the maximum
of the costs over all trials. This is the worst case scenario for the hypervolume
calculation. The reference point is needed to define the bound of the hypervolume.
def add_running_pareto_front(group):
"""Adds the pareto front of all costs up until the current trial to the group.

Args:
x (pd.DataFrame): Dataframe with the trajectory.
on_key (str, optional): Column to use for the reference point. Defaults to "trial_value__cost".
Can also be "trial_value__cost_inc".
group (_type_): _description_

Returns:
np.ndarray: Reference point.
_type_: _description_
"""
if "task_id" in x.columns:
assert x["task_id"].nunique() == 1, "Cannot get reference point for multiple tasks" # noqa: PD101
costs = get_costs(x, on_key)
return np.max(costs, axis=0)

group = group.sort_values("n_trials").reset_index(drop=True)
costs = np.stack(group["trial_value__cost_normalized"].to_numpy())
pareto_fronts = []

def get_cost_min(x: pd.DataFrame, on_key: str = "trial_value__cost") -> np.ndarray:
"""Get the minimum objective values from the dataframe.

Dataframe should only contain data from one task. The point is the minimum
of the costs over all trials. This is the best case scenario for the hypervolume
calculation. The minimum point is needed for normalization.

Args:
x (pd.DataFrame): Dataframe with the trajectory.
on_key (str, optional): Column to use for the reference point. Defaults to "trial_value__cost".
Can also be "trial_value__cost_inc".

Returns:
np.ndarray: Minimum cost.
"""
if "task_id" in x.columns:
assert x["task_id"].nunique() == 1, "Cannot get reference point for multiple tasks" # noqa: PD101
costs = get_costs(x, on_key)
return np.min(costs, axis=0)
for i in range(len(group)):
current_costs = costs[:i+1]
front = get_pareto_front(current_costs)
pareto_fronts.append(tuple(map(tuple, front)))

group["pareto_front"] = pareto_fronts
return group

def get_costs(x: pd.DataFrame, on_key: str = "trial_value__cost") -> np.ndarray:
"""Get costs from the dataframe.

Here, it is expected that the costs are vectors (in the case of multi-objective optimization).

Args:
x (pd.DataFrame): Dataframe with the trajectory.
on_key (str, optional): Column to use for the costs. Defaults to "trial_value__cost".
Can also be "trial_value__cost_raw".
"""
return np.array(x[on_key].to_list())


def add_reference_point(x: pd.DataFrame, on_key: str = "trial_value__cost") -> pd.DataFrame:
def add_reference_point(x: pd.DataFrame) -> pd.DataFrame:
"""Add reference point to the dataframe.

The reference point is needed to define the bound of the hypervolume.
Expand All @@ -121,14 +99,31 @@ def add_reference_point(x: pd.DataFrame, on_key: str = "trial_value__cost") -> p
Returns:
pd.DataFrame: Dataframe with the reference point.
"""
reference_point = get_reference_point(x, on_key)
# Flatten and stack all cost vectors
costs = np.vstack([np.array(c) for c in x["trial_value__cost_raw"]])

# Sanity check for consistent dimensionality
if len(set(cost.shape[0] for cost in costs)) != 1:
raise ValueError("Inconsistent number of objectives in cost vectors.")

# Reference point is max across all objectives
reference_point = np.max(costs, axis=0) + 1e-4

# Set reference point per row
x["reference_point"] = [reference_point] * len(x)
minimum_cost = get_cost_min(x, on_key)
x["minimum_cost"] = [minimum_cost] * len(x)
return x

def normalize_objectives(x: pd.DataFrame) -> pd.DataFrame:
costs = np.vstack(x["trial_value__cost_raw"])
min_vals, max_vals = costs.min(0), costs.max(0)
denom = np.where(max_vals - min_vals == 0, 1, max_vals - min_vals)
normalized = (costs - min_vals) / denom
x["trial_value__cost_normalized"] = list(normalized)
return x



def calc_hv(x: pd.DataFrame, on_key: str = "trial_value__cost") -> pd.DataFrame:
def calc_hv(x: pd.DataFrame) -> pd.DataFrame:
"""Calculate hypervolume per trajectory step.

Args:
Expand All @@ -139,8 +134,9 @@ def calc_hv(x: pd.DataFrame, on_key: str = "trial_value__cost") -> pd.DataFrame:
Returns:
pd.DataFrame: Dataframe with the hypervolume.
"""
F = get_costs(x, on_key)
ind = HV(ref_point=x["reference_point"].iloc[0], pf=None, nds=False)
F = np.vstack([np.array(p) for p in x["pareto_front"]])

ind = HV(ref_point=[1.000001]*F.shape[1], pf=None, nds=False)
x["hypervolume"] = ind(F)
return x

Expand Down Expand Up @@ -219,7 +215,7 @@ def add_hypervolume_to_df(logs: pd.DataFrame, on_key: str = "trial_value__cost")
"""
tqdm.pandas(desc="Calc hypervolume...")
ids_mo = get_ids_mo(logs)
add_reference_point_partial = partial(add_reference_point, on_key=on_key)
add_reference_point_partial = partial(add_reference_point)
mo_cols = ["hypervolume", "reference_point"]
for mo_col in mo_cols:
if mo_col not in logs.columns:
Expand Down
31 changes: 17 additions & 14 deletions carps/analysis/gather_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from carps.utils.loggingutils import get_logger, setup_logging
from carps.utils.task import Task
from carps.utils.trials import TrialInfo
from carps.analysis.calc_hypervolume import calc_hv, add_reference_point, run_id, add_running_pareto_front, normalize_objectives

if TYPE_CHECKING:
from carps.objective_functions.objective_function import ObjectiveFunction
Expand Down Expand Up @@ -393,11 +394,11 @@ def maybe_postadd_task(logs: pd.DataFrame, overwrite: bool = False) -> pd.DataFr
task_cfg = load_task_cfg(task_id=gid[0], task_index=task_index)

task_cfg_yaml = OmegaConf.to_yaml(task_cfg)
if "${seed}" in task_cfg_yaml:
# Add seed to config to make it resolvable
assert gdf["seed"].nunique() == 1 # noqa: PD101
seed = gdf["seed"].iloc[0]
task_cfg.seed = int(seed)
# if "${seed}" in task_cfg_yaml:
# # Add seed to config to make it resolvable
# assert gdf["seed"].nunique() == 1 # noqa: PD101
# seed = gdf["seed"].iloc[0]
# task_cfg.seed = int(seed)
task_cfg = OmegaConf.to_container(task_cfg, resolve=False)
task_columns = [c for c in gdf.columns if c.startswith("task.")]
if overwrite:
Expand Down Expand Up @@ -469,7 +470,7 @@ def maybe_convert_cost_to_so(x: float | list | np.ndarray) -> float:
float: Single-objective cost or aggregated cost.
"""
if isinstance(x, list | np.ndarray):
return np.sum(x)
return np.sum(x) # TODO change to HV here
if isinstance(x, dict):
assert len(x.values()) == 1
# Most likely comes from database
Expand All @@ -478,7 +479,7 @@ def maybe_convert_cost_to_so(x: float | list | np.ndarray) -> float:
if isinstance(value, str):
value = ast.literal_eval(value)
if isinstance(value, list):
return np.sum(value)
return np.sum(value) # TODO Change to HV here
if isinstance(value, float | int):
return value
if isinstance(x, float):
Expand Down Expand Up @@ -547,7 +548,12 @@ def process_logs(logs: pd.DataFrame, keep_task_columns: list[str] | None = None)

logger.debug("Handle MO costs...")
logs["trial_value__cost_raw"] = logs["trial_value__cost"].apply(maybe_convert_cost_dtype)
logs["trial_value__cost"] = logs["trial_value__cost_raw"].apply(maybe_convert_cost_to_so)
# trial_value__cost_raw for add_reference_point and to calc_hv
logs = logs.groupby(by=["task_type", "task_id"]).apply(normalize_objectives).reset_index(drop=True)
logs = logs.groupby(by=[*run_id]).apply(add_running_pareto_front).reset_index(drop=True)
logs = logs.groupby(by=[*run_id, "n_trials"]).apply(calc_hv).reset_index(drop=True)
logs["trial_value__cost"] = logs["hypervolume"] #logs["trial_value__cost_raw"].apply(maybe_convert_cost_to_so)
print(logs.head())
logger.debug("Determine incumbent cost...")
logs["trial_value__cost_inc"] = logs.groupby(by=grouper_keys)["trial_value__cost"].transform("cummin")

Expand Down Expand Up @@ -606,9 +612,9 @@ def normalize_logs(logs: pd.DataFrame) -> pd.DataFrame:
logs["trial_value__cost_raw"] = logs["trial_value__cost"].apply(maybe_convert_cost_dtype)
else:
logs["trial_value__cost_raw"] = logs["trial_value__cost_raw"].apply(maybe_convert_cost_dtype)
logs = add_hypervolume_to_df(logs, on_key="trial_value__cost_raw")
# logs = add_hypervolume_to_df(logs, on_key="trial_value__cost_raw")
# IDs have changed, so we need to recalculate
ids_mo = get_ids_mo(logs)
# ids_mo = get_ids_mo(logs)
hv = logs.loc[ids_mo, "hypervolume"]
logs.loc[ids_mo, "trial_value__cost"] = -hv # higher is better
logs["trial_value__cost"] = logs["trial_value__cost"].astype("float64")
Expand Down Expand Up @@ -785,10 +791,7 @@ def rename_legacy(logs: pd.DataFrame) -> pd.DataFrame:

# NOTE(eddiebergman): Use `n_processes=None` as default, which uses `os.cpu_count()` in `Pool`
def filelogs_to_df(
rundir: str | list[str],
log_fn: str = "trial_logs.jsonl",
n_processes: int | None = None,
outdir: str | Path | None = None,
rundir: str | list[str] = "results/", log_fn: str = "trial_logs.jsonl", n_processes: int | None = None
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Load logs from file and preprocess.

Expand Down
4 changes: 2 additions & 2 deletions carps/analysis/run_autorank.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,14 @@ def cd_evaluation(
alpha=alpha,
alpha_normality=alpha_normality,
num_samples=len(rank_data),
sample_matrix=None,
posterior_matrix=None,
decision_matrix=None,
rope=None,
rope_mode=None,
effect_size=res.effect_size,
force_mode=None,
sample_matrix=None,
plot_order=None,
# plot_order=None,
)
is_significant = True
if result.pvalue >= result.alpha:
Expand Down
44 changes: 27 additions & 17 deletions carps/experimenter/create_cluster_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from py_experimenter.experimenter import PyExperimenter

from carps.utils.loggingutils import CustomEncoder
import pickle as pckl

logger = logging.getLogger("create experiments")

Expand Down Expand Up @@ -79,7 +80,7 @@ def get_experiment_definition(cfg: OmegaConf) -> dict:
cfg_dict = OmegaConf.to_container(cfg=cfg, resolve=True)

cfg_str = json.dumps(cfg_dict, cls=CustomEncoder)
cfg_hash = create_config_hash(cfg)
cfg_hash = create_config_hash_from_full_cfg(cfg)

return {
"config": cfg_str,
Expand Down Expand Up @@ -107,6 +108,7 @@ def fill_database(cfg: DictConfig, experimenter: PyExperimenter) -> None:
DatabaseConnectionError: If there is an error with the database connection.
"""
experiment_definition = get_experiment_definition(cfg)


column_names = list(experimenter.db_connector.database_configuration.keyfields.keys())
exists = False
Expand All @@ -131,7 +133,7 @@ def fill_database(cfg: DictConfig, experimenter: PyExperimenter) -> None:
# experimenter.close_ssh()


@hydra.main(config_path="../configs", config_name="base.yaml", version_base=None) # type: ignore[misc]
@hydra.main(config_path="../configs", config_name="base.yaml", version_base=None, save_as_pckl=True, folder_path="configs_pckl") # type: ignore[misc]
def main(cfg: DictConfig) -> None:
"""Store experiment config in database.

Expand All @@ -141,23 +143,31 @@ def main(cfg: DictConfig) -> None:
Global configuration.

"""
fill_database(cfg, experimenter)
if save_as_pckl:
experiment_definition = get_experiment_definition(cfg)
files = list(Path(folder_path).glob("*.pkl"))

if experiment_definition['config_hash'] not in files:
with open(f"{folder_path}{experiment_definition['config_hash']}.pkl", "wb") as f:
pckl.dump(experiment_definition, f)
else:
experiment_configuration_file_path = Path(__file__).parent / "py_experimenter.yaml"

database_credential_file_path = Path(__file__).parent / "credentials.yaml"
if database_credential_file_path is not None and not database_credential_file_path.exists():
database_credential_file_path = None # type: ignore[assignment]

experimenter = PyExperimenter(
experiment_configuration_file_path=experiment_configuration_file_path,
name="carps",
database_credential_file_path=database_credential_file_path,
log_level=logging.INFO,
use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel,
use_codecarbon=False
)
fill_database(cfg, experimenter)


if __name__ == "__main__":
# TODO make experiment_configuration_file_path and database_credential_file_path a commandline arg
experiment_configuration_file_path = Path(__file__).parent / "py_experimenter.yaml"

database_credential_file_path = Path(__file__).parent / "credentials.yaml"
if database_credential_file_path is not None and not database_credential_file_path.exists():
database_credential_file_path = None # type: ignore[assignment]

experimenter = PyExperimenter(
experiment_configuration_file_path=experiment_configuration_file_path,
name="carps",
database_credential_file_path=database_credential_file_path,
log_level=logging.INFO,
use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel,
)

main()
8 changes: 6 additions & 2 deletions carps/experimenter/database/download_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def main(
pyexperimenter_configuration_file_path: str | None = None,
database_credential_file_path: str | Path | None = None,
outdir: str | Path | None = None,
codecarbon: bool = False
) -> None:
"""Download results from the database and save them to outdir.

Expand Down Expand Up @@ -49,6 +50,7 @@ def main(
database_credential_file_path=database_credential_file_path,
log_file="logs/reset_experiments.log",
use_ssh_tunnel=OmegaConf.load(experiment_configuration_file_path).PY_EXPERIMENTER.Database.use_ssh_tunnel,
use_codecarbon=codecarbon
)

experiment_config_table = experimenter.get_table()
Expand All @@ -64,12 +66,14 @@ def main(
logger.info(f"\tFrom them, found {n_errored} errored runs of type {task_type}. ❌")
trajectory_table = experimenter.get_logtable("trajectory")
trials_table = experimenter.get_logtable("trials")
codecarbon_table = experimenter.get_codecarbon_table()
if codecarbon:
codecarbon_table = experimenter.get_codecarbon_table()

experiment_config_table.to_parquet(outdir / "experiment_config.parquet", index=False)
trajectory_table.to_parquet(outdir / "trajectory.parquet", index=False)
trials_table.to_parquet(outdir / "trials.parquet", index=False)
codecarbon_table.to_parquet(outdir / "codecarbon.parquet", index=False)
if codecarbon:
codecarbon_table.to_parquet(outdir / "codecarbon.parquet", index=False)
logger.info(
"Downloaded results from the database. "
f"Saved to '{outdir}'. "
Expand Down
11 changes: 6 additions & 5 deletions carps/experimenter/database/process_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add_metadata(
"config",
"config_hash",
"name",
"n_trials"
]
metadata_columns = [c for c in experiment_config_table.columns if c not in ignore_columns]

Expand Down Expand Up @@ -100,9 +101,9 @@ def process_single_run_from_database(
if logs_from_one_run["experiment_id"].nunique() != 1: # noqa: PD101
raise ValueError("Multiple values for `experiment_id` found in the logs. Something is suspicious.")
experiment_id = logs_from_one_run["experiment_id"].iloc[0]
logs_from_one_run = process_logs(logs_from_one_run)
if only_incumbents:
logs_from_one_run = filter_non_incumbent_entries(logs=logs_from_one_run)
# logs_from_one_run = process_logs(logs_from_one_run)
# if only_incumbents:
# logs_from_one_run = filter_non_incumbent_entries(logs=logs_from_one_run)
return add_metadata(
logs_from_one_run=logs_from_one_run,
experiment_id=experiment_id,
Expand Down Expand Up @@ -197,9 +198,9 @@ def process_logs_from_database(

# Combine the results into a single DataFrame
processed_logs = pd.concat(result, ignore_index=True).reset_index(drop=True)
processed_logs.to_parquet(output_filename, index=False)
processed_logs = process_logs(processed_logs)
processed_logs.to_parquet(output_filename, index=False, engine="fastparquet")
logger.info(f"Processed logs saved to {output_filename} 💌.")
return processed_logs


if __name__ == "__main__":
Expand Down
Loading
Loading