Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion causaltune/models/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ElasticNetEstimator(SKLearnEstimator):
ITER_HP = "max_iter"

@classmethod
def search_space(cls, data_size, task="regresssion", **params):
def search_space(cls, data_size, task="regression", **params):
return {
"alpha": {
"domain": tune.loguniform(lower=0.0001, upper=1.0),
Expand Down
126 changes: 47 additions & 79 deletions causaltune/optimiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from econml.inference import BootstrapInference

from joblib import Parallel, delayed

from causaltune.search.params import SimpleParamService
from causaltune.score.scoring import Scorer, metrics_to_minimize
from causaltune.utils import treatment_is_multivalue
Expand All @@ -34,13 +32,9 @@
from causaltune.dataset_processor import CausalityDatasetProcessor
from causaltune.models.passthrough import feature_filter

# tune.run = run


# Patched from sklearn.linear_model._base to adjust rtol and atol values
def _check_precomputed_gram_matrix(
X, precompute, X_offset, X_scale, rtol=1e-4, atol=1e-2
):
def _check_precomputed_gram_matrix(X, precompute, X_offset, X_scale, rtol=1e-4, atol=1e-2):
n_features = X.shape[1]
f1 = n_features // 2
f2 = min(f1 + 1, n_features - 1)
Expand Down Expand Up @@ -177,24 +171,17 @@ def __init__(
self._settings["tuner"]["time_budget_s"] = time_budget
self._settings["tuner"]["num_samples"] = num_samples
self._settings["tuner"]["verbose"] = verbose
self._settings["tuner"][
"use_ray"
] = use_ray # requires ray to be installed via pip install flaml[ray]
self._settings["tuner"]["resources_per_trial"] = (
resources_per_trial if resources_per_trial is not None else {"cpu": 0.5}
)
self._settings["try_init_configs"] = try_init_configs
self._settings["include_experimental_estimators"] = (
include_experimental_estimators
)
self._settings["include_experimental_estimators"] = include_experimental_estimators

# params for FLAML on component models:
self._settings["component_models"] = {}
self._settings["component_models"]["task"] = components_task
self._settings["component_models"]["verbose"] = components_verbose
self._settings["component_models"][
"pred_time_limit"
] = components_pred_time_limit
self._settings["component_models"]["pred_time_limit"] = components_pred_time_limit
self._settings["component_models"]["n_jobs"] = components_njobs
self._settings["component_models"]["time_budget"] = components_time_budget
self._settings["component_models"]["eval_method"] = "holdout"
Expand All @@ -221,6 +208,7 @@ def __init__(
self.causal_model = None
self.identified_estimand = None
self.problem = None
self.use_ray = use_ray
# properties that are used to resume fits (warm start)
self.resume_scores = []
self.resume_cfg = []
Expand All @@ -239,9 +227,7 @@ def init_propensity_model(self, propensity_model: str):
self.propensity_model = AutoML(
**{**self._settings["component_models"], "task": "classification"}
)
elif hasattr(propensity_model, "fit") and hasattr(
propensity_model, "predict_proba"
):
elif hasattr(propensity_model, "fit") and hasattr(propensity_model, "predict_proba"):
self.propensity_model = propensity_model
else:
raise ValueError(
Expand All @@ -266,9 +252,7 @@ def init_outcome_model(self, outcome_model):
# The current default behavior
return self.auto_outcome_model()
else:
raise ValueError(
'outcome_model valid values are None, "auto", or an estimator object'
)
raise ValueError('outcome_model valid values are None, "auto", or an estimator object')

def auto_outcome_model(self):
data = self.data
Expand Down Expand Up @@ -303,6 +287,7 @@ def fit(
preprocess: bool = False,
encoder_type: Optional[str] = None,
encoder_outcome: Optional[str] = None,
use_ray: Optional[bool] = None,
):
"""Performs AutoML on list of causal inference estimators
- If estimator has a search space specified in its parameters, HPO is performed on the whole model.
Expand All @@ -326,6 +311,9 @@ def fit(
Returns:
None
"""
if use_ray is not None:
self.use_ray = use_ray

if outcome is None and isinstance(data, CausalityDataset):
outcome = data.outcomes[0]

Expand All @@ -344,19 +332,15 @@ def fit(
if preprocess:
data = copy.deepcopy(data)
self.dataset_processor = CausalityDatasetProcessor()
self.dataset_processor.fit(
data, encoder_type=encoder_type, outcome=encoder_outcome
)
self.dataset_processor.fit(data, encoder_type=encoder_type, outcome=encoder_outcome)
data = self.dataset_processor.transform(data)
else:
self.dataset_processor = None

self.data = data
treatment_values = data.treatment_values

assert (
len(treatment_values) > 1
), "Treatment must take at least 2 values, eg 0 and 1!"
assert len(treatment_values) > 1, "Treatment must take at least 2 values, eg 0 and 1!"

self._control_value = treatment_values[0]
self._treatment_values = list(treatment_values[1:])
Expand All @@ -378,8 +362,8 @@ def fit(

self.init_propensity_model(self._settings["propensity_model"])

self.identified_estimand: IdentifiedEstimand = (
self.causal_model.identify_effect(proceed_when_unidentifiable=True)
self.identified_estimand: IdentifiedEstimand = self.causal_model.identify_effect(
proceed_when_unidentifiable=True
)

if bool(self.identified_estimand.estimands["iv"]) and bool(data.instruments):
Expand Down Expand Up @@ -450,9 +434,7 @@ def fit(
and self._settings["tuner"]["num_samples"] == -1
):
self._settings["tuner"]["time_budget_s"] = (
2.5
* len(self.estimator_list)
* self._settings["component_models"]["time_budget"]
2.5 * len(self.estimator_list) * self._settings["component_models"]["time_budget"]
)

cmtb = self._settings["component_models"]["time_budget"]
Expand Down Expand Up @@ -485,9 +467,7 @@ def fit(
# )
# )

search_space = self.cfg.search_space(
self.estimator_list, data_size=data.data.shape
)
search_space = self.cfg.search_space(self.estimator_list, data_size=data.data.shape)
init_cfg = (
self.cfg.default_configs(self.estimator_list, data_size=data.data.shape)
if self._settings["try_init_configs"]
Expand All @@ -507,14 +487,12 @@ def fit(
self._tune_with_config,
search_space,
metric=self.metric,
# use_ray=self.use_ray,
cost_attr="evaluation_cost",
points_to_evaluate=(
init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg
),
evaluated_rewards=(
[] if len(self.resume_scores) == 0 else self.resume_scores
),
points_to_evaluate=(init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg),
evaluated_rewards=([] if len(self.resume_scores) == 0 else self.resume_scores),
mode=("min" if self.metric in metrics_to_minimize() else "max"),
# resources_per_trial= {"cpu": 1} if self.use_ray else None,
low_cost_partial_config={},
**self._settings["tuner"],
)
Expand All @@ -529,12 +507,8 @@ def fit(
self._tune_with_config,
search_space,
metric=self.metric,
points_to_evaluate=(
init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg
),
evaluated_rewards=(
[] if len(self.resume_scores) == 0 else self.resume_scores
),
points_to_evaluate=(init_cfg if len(self.resume_cfg) == 0 else self.resume_cfg),
evaluated_rewards=([] if len(self.resume_scores) == 0 else self.resume_scores),
mode=("min" if self.metric in metrics_to_minimize() else "max"),
low_cost_partial_config={},
**self._settings["tuner"],
Expand Down Expand Up @@ -568,18 +542,25 @@ def _tune_with_config(self, config: dict) -> dict:
Returns:
(dict): values of metrics after optimisation
"""
estimates = Parallel(n_jobs=2, backend="threading")(
delayed(self._estimate_effect)(config) for i in range(1)
)[0]
from causaltune.remote import remote_exec

if self.use_ray:
# flaml.tune handles the interaction with Ray itself
# estimates = self._estimate_effect(config)
estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)
else:
estimates = remote_exec(CausalTune._estimate_effect, (self, config), self.use_ray)

# Parallel(n_jobs=2, backend="threading")(
# delayed(self._estimate_effect)(config) for i in range(1)
# ))[0]

if "exception" not in estimates:
est_name = estimates["estimator_name"]
current_score = estimates[self.metric]

estimates["optimization_score"] = current_score
estimates["evaluation_cost"] = (
1e8 # will be overwritten for successful runs
)
estimates["evaluation_cost"] = 1e8 # will be overwritten for successful runs

# Initialize best_score if this is the first estimator for this name
if est_name not in self._best_estimators:
Expand Down Expand Up @@ -611,22 +592,19 @@ def _tune_with_config(self, config: dict) -> dict:
"codec",
"policy_risk",
]:
is_better = (
np.isfinite(current_score) and current_score < best_score
) or (np.isinf(best_score) and np.isfinite(current_score))
is_better = (np.isfinite(current_score) and current_score < best_score) or (
np.isinf(best_score) and np.isfinite(current_score)
)
else:
is_better = (
np.isfinite(current_score) and current_score > best_score
) or (np.isinf(best_score) and np.isfinite(current_score))
is_better = (np.isfinite(current_score) and current_score > best_score) or (
np.isinf(best_score) and np.isfinite(current_score)
)

# Store the estimator if we're storing all, if it's better, or if it's the first valid (non-inf) estimator
if (
self._settings["store_all"]
or is_better
or (
self._best_estimators[est_name][1] is None
and np.isfinite(current_score)
)
or (self._best_estimators[est_name][1] is None and np.isfinite(current_score))
):
self._best_estimators[est_name] = (
current_score,
Expand Down Expand Up @@ -658,9 +636,7 @@ def _estimate_effect(self, config):
# Do we need an boject property for this, instead of a local var?
self.estimator_name = config["estimator"]["estimator_name"]
outcome_model = self.init_outcome_model(self._settings["outcome_model"])
method_params = self.cfg.method_params(
config, outcome_model, self.propensity_model
)
method_params = self.cfg.method_params(config, outcome_model, self.propensity_model)

try: #
# This calls the causal model's estimate_effect method
Expand Down Expand Up @@ -697,9 +673,7 @@ def _estimate_effect(self, config):
}

def _compute_metrics(self, estimator, df: pd.DataFrame) -> dict:
return self.scorer.make_scores(
estimator, df, self.metrics_to_report, r_scorer=None
)
return self.scorer.make_scores(estimator, df, self.metrics_to_report, r_scorer=None)

def score_dataset(self, df: pd.DataFrame, dataset_name: str):
"""
Expand All @@ -714,13 +688,9 @@ def score_dataset(self, df: pd.DataFrame, dataset_name: str):
"""
for scr in self.scores.values():
if scr["estimator"] is None:
warnings.warn(
"Skipping scoring for estimator %s", scr["estimator_name"]
)
warnings.warn("Skipping scoring for estimator %s", scr["estimator_name"])
else:
scr["scores"][dataset_name] = self._compute_metrics(
scr["estimator"], df
)
scr["scores"][dataset_name] = self._compute_metrics(scr["estimator"], df)

@property
def best_estimator(self) -> str:
Expand Down Expand Up @@ -793,9 +763,7 @@ def effect(self, df, *args, **kwargs):
"""
return self.model.effect(df, *args, **kwargs)

def predict(
self, cd: CausalityDataset, preprocess: Optional[bool] = False, *args, **kwargs
):
def predict(self, cd: CausalityDataset, preprocess: Optional[bool] = False, *args, **kwargs):
"""Heterogeneous Treatment Effects for data CausalityDataset

Args:
Expand Down
12 changes: 12 additions & 0 deletions causaltune/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
def remote_exec(function, args, use_ray=False):
if use_ray:
import ray

remote_function = ray.remote(function)
return ray.get(remote_function.remote(*args))
else:
from joblib import Parallel, delayed

return Parallel(n_jobs=2, backend="threading")(delayed(function)(*args) for i in range(1))[
0
]
Loading
Loading