Skip to content

estimator function #1420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 120 additions & 107 deletions flaml/automl/automl.py
Original file line number Diff line number Diff line change
@@ -2526,6 +2526,122 @@ def _search_sequential(self):
if time_left < time_ensemble < 2 * time_left:
break

def build_ensemble(self, automl_object = None):

if automl_object is not None:
self.__dict__.update(automl_object.__dict__)

if not self._ensemble or self._state.task not in ("binary", "multiclass", "regression"):
logger.info("Ensemble is disabled or task type is not supported.")
return

search_states = list(x for x in self._search_states.items() if x[1].best_config)
search_states.sort(key = lambda x: x[1].best_loss)
estimators = [
(
x[0],
x[1].learner_class(
task = self._state.task,
n_jobs = self._state.n_jobs,
**AutoMLState.sanitize(x[1].best_config),
),
)
for x in search_states[:2]
]
estimators += [
(
x[0],
x[1].learner_class(
task = self._state.task,
n_jobs = self._state.n_jobs,
**AutoMLState.sanitize(x[1].best_config),
),
)
for x in search_states[2:]
if x[1].best_loss < 4 * self._selected.best_loss
]
logger.info([(estimator[0], estimator[1].params) for estimator in estimators])

if len(estimators) <= 1:
logger.info("Not enough estimators to build an ensemble.")
return

if self._state.task in CLASSIFICATION:
from sklearn.ensemble import StackingClassifier as Stacker
else:
from sklearn.ensemble import StackingRegressor as Stacker

if self._use_ray is not False:
import ray
n_cpus = ray.is_initialized() and ray.available_resources()["CPU"] or os.cpu_count()
elif self._use_spark:
from flaml.tune.spark.utils import get_n_cpus
n_cpus = get_n_cpus()
else:
n_cpus = os.cpu_count()

ensemble_n_jobs = (
-self._state.n_jobs
if abs(self._state.n_jobs) == 1
else max(1, int(n_cpus / 2 / self._state.n_jobs))
)

if isinstance(self._ensemble, dict):
final_estimator = self._ensemble.get("final_estimator", self._trained_estimator)
passthrough = self._ensemble.get("passthrough", True)
ensemble_n_jobs = self._ensemble.get("n_jobs", ensemble_n_jobs)
else:
final_estimator = self._trained_estimator
passthrough = True

stacker = Stacker(
estimators,
final_estimator = final_estimator,
n_jobs = ensemble_n_jobs,
passthrough = passthrough,
)

sample_weight_dict = (
{"sample_weight": self._sample_weight_full} if self._sample_weight_full is not None else {}
)

try:
logger.info("Building ensemble with tuned estimators")
stacker.fit(
self._X_train_all,
self._y_train_all,
**sample_weight_dict,
)
logger.info(f"Ensemble built successfully: {stacker}")
self._trained_estimator = stacker
self._trained_estimator.model = stacker
except ValueError as e:
if passthrough:
logger.warning(
"Using passthrough=False for ensemble because the data contain categorical features."
)
stacker = Stacker(
estimators,
final_estimator = final_estimator,
n_jobs = ensemble_n_jobs,
passthrough = False,
)
stacker.fit(
self._X_train_all,
self._y_train_all,
**sample_weight_dict,
)
logger.info(f"Ensemble built successfully with passthrough=False: {stacker}")
self._trained_estimator = stacker
self._trained_estimator.model = stacker
else:
raise e
except joblib.externals.loky.process_executor.TerminatedWorkerError:
logger.error(
"Not enough memory to build the ensemble. "
"Please try increasing available RAM, decreasing n_jobs for ensemble, or disabling ensemble."
)

def _search(self):
# initialize the search_states
self._eci = []
@@ -2587,115 +2703,12 @@ def _search(self):
self.modelcount = sum(search_state.total_iter for search_state in self._search_states.values())
if self._trained_estimator:
logger.info(f"selected model: {self._trained_estimator.model}")
estimators = []
if self._ensemble and self._state.task in (
"binary",
"multiclass",
"regression",
):
search_states = list(x for x in self._search_states.items() if x[1].best_config)
search_states.sort(key=lambda x: x[1].best_loss)
estimators = [
(
x[0],
x[1].learner_class(
task=self._state.task,
n_jobs=self._state.n_jobs,
**AutoMLState.sanitize(x[1].best_config),
),
)
for x in search_states[:2]
]
estimators += [
(
x[0],
x[1].learner_class(
task=self._state.task,
n_jobs=self._state.n_jobs,
**AutoMLState.sanitize(x[1].best_config),
),
)
for x in search_states[2:]
if x[1].best_loss < 4 * self._selected.best_loss
]
logger.info([(estimator[0], estimator[1].params) for estimator in estimators])
if len(estimators) > 1:
if self._state.task.is_classification():
from sklearn.ensemble import StackingClassifier as Stacker
else:
from sklearn.ensemble import StackingRegressor as Stacker
if self._use_ray is not False:
import ray

n_cpus = ray.is_initialized() and ray.available_resources()["CPU"] or os.cpu_count()
elif self._use_spark:
from flaml.tune.spark.utils import get_n_cpus

n_cpus = get_n_cpus()
else:
n_cpus = os.cpu_count()
ensemble_n_jobs = (
-self._state.n_jobs # maximize total parallelization degree
if abs(self._state.n_jobs) == 1 # 1 and -1 correspond to min/max parallelization
else max(1, int(n_cpus / 2 / self._state.n_jobs))
# the total degree of parallelization = parallelization degree per estimator * parallelization degree of ensemble
)
if isinstance(self._ensemble, dict):
final_estimator = self._ensemble.get("final_estimator", self._trained_estimator)
passthrough = self._ensemble.get("passthrough", True)
ensemble_n_jobs = self._ensemble.get("n_jobs", ensemble_n_jobs)
else:
final_estimator = self._trained_estimator
passthrough = True
stacker = Stacker(
estimators,
final_estimator,
n_jobs=ensemble_n_jobs,
passthrough=passthrough,
)
sample_weight_dict = (
(self._sample_weight_full is not None) and {"sample_weight": self._sample_weight_full} or {}
)
for e in estimators:
e[1].__class__.init()
import joblib
if len(estimators) > 1 and self._ensemble:
self.build_ensemble()
else:
logger.info("Skipping ensemble building due to insufficient estimators or disabled ensemble.")

try:
logger.info("Building ensemble with tuned estimators")
stacker.fit(
self._X_train_all,
self._y_train_all,
**sample_weight_dict, # NOTE: _search is after kwargs is updated to fit_kwargs_by_estimator
)
logger.info(f"ensemble: {stacker}")
self._trained_estimator = stacker
self._trained_estimator.model = stacker
except ValueError as e:
if passthrough:
logger.warning(
"Using passthrough=False for ensemble because the data contain categorical features."
)
stacker = Stacker(
estimators,
final_estimator,
n_jobs=self._state.n_jobs,
passthrough=False,
)
stacker.fit(
self._X_train_all,
self._y_train_all,
**sample_weight_dict, # NOTE: _search is after kwargs is updated to fit_kwargs_by_estimator
)
logger.info(f"ensemble: {stacker}")
self._trained_estimator = stacker
self._trained_estimator.model = stacker
else:
raise e
except joblib.externals.loky.process_executor.TerminatedWorkerError:
logger.error(
"No enough memory to build the ensemble."
" Please try increasing available RAM, decreasing n_jobs for ensemble, or disabling ensemble."
)
elif self._state.retrain_final:
# reset time budget for retraining
if self._max_iter > 1: