forked from facebook/Ax
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
773 lines (646 loc) · 29.2 KB
/
utils.py
File metadata and controls
773 lines (646 loc) · 29.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from collections.abc import Callable, Iterable
from copy import deepcopy
from datetime import datetime, timedelta
from enum import Enum
from functools import wraps
from logging import Logger
from typing import Any
import pandas as pd
from ax.core.arm import Arm
from ax.core.base_trial import BaseTrial, TrialStatus
from ax.core.batch_trial import BatchTrial
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun
from ax.core.map_metric import MapMetric
from ax.core.observation import ObservationFeatures
from ax.core.optimization_config import OptimizationConfig
from ax.core.trial import Trial
from ax.exceptions.core import AxError
from ax.utils.common.constants import Keys
from ax.utils.common.logger import get_logger
from pyre_extensions import none_throws
logger: Logger = get_logger(__name__)
# Threshold for switching to pending points extraction based on trial status.
MANY_TRIALS_IN_EXPERIMENT = 100
OLD_TRIAL_THRESHOLD_DAYS = 10
# ------------------- Utils shared by Client and BatchClient--------------------
def _maybe_update_trial_status_to_complete(
experiment: Experiment,
trial_index: int,
) -> None:
"""Mark a trial as completed, logging a warning if any optimization config
metrics are missing.
The trial is always marked ``COMPLETED`` regardless of which metrics are
present. Data availability is tracked separately from trial orchestration
status.
Args:
experiment: The experiment to check.
trial_index: The index of the trial to check.
"""
experiment.trials[trial_index].mark_completed()
if experiment.optimization_config is not None:
optimization_config = experiment.optimization_config
trial_data = experiment.lookup_data(trial_indices=[trial_index])
missing_metrics = set(optimization_config.metrics.keys()) - {
*trial_data.metric_names
}
if len(missing_metrics) > 0:
logger.warning(
f"Trial {trial_index} marked COMPLETED but missing optimization "
f"config metrics: {missing_metrics}. "
"Partial data will still be used for modeling."
)
# -------------------- Experiment result extraction utils. ---------------------
def _extract_generator_runs(trial: BaseTrial) -> list[GeneratorRun]:
if isinstance(trial, BatchTrial):
return trial.generator_runs
if isinstance(trial, Trial):
return [none_throws(trial.generator_run)]
raise ValueError("Unexpected trial type")
def get_model_trace_of_times(
experiment: Experiment,
) -> tuple[list[float | None], list[float | None]]:
"""
Get time spent fitting the model and generating candidates during each trial.
Not cumulative.
Returns:
List of fit times, list of gen times.
"""
generator_runs = [
gr
for trial in experiment.trials.values()
for gr in _extract_generator_runs(trial=trial)
]
fit_times = [gr.fit_time for gr in generator_runs]
gen_times = [gr.gen_time for gr in generator_runs]
return fit_times, gen_times
def get_model_times(experiment: Experiment) -> tuple[float, float]:
"""Get total times spent fitting the model and generating candidates in the
course of the experiment.
"""
fit_times, gen_times = get_model_trace_of_times(experiment)
fit_time = sum(t for t in fit_times if t is not None)
gen_time = sum(t for t in gen_times if t is not None)
return fit_time, gen_time
def is_bandit_experiment(generation_strategy_name: str) -> bool:
"""
Determine if an experiment is a bandit experiment based on the generation
strategy name.
A bandit experiment is identified by having a generation strategy with the name
"FACTORIAL + EMPIRICAL_BAYES_THOMPSON_SAMPLING".
Args:
generation_strategy_name: Name of the generation_strategy of the experiment
that needs to be checked.
Returns:
True if the generation strategy indicates this is a bandit experiment,
False otherwise.
"""
return (
generation_strategy_name
== Keys.FACTORIAL_PLUS_EMPIRICAL_BAYES_THOMPSON_SAMPLING
)
# -------------------- Metric availability. ------------------------------------
class MetricAvailability(int, Enum):
"""Metric data availability relative to a set of required metrics.
This enum describes how complete a trial's metric data is relative to the
metrics required by an ``OptimizationConfig``.
Values:
NOT_OBSERVED: The trial has no data for any of the required metrics.
This includes trials with no data at all, and trials that only have
data for tracking metrics not in the optimization config.
INCOMPLETE: The trial has data for some but not all required metrics.
COMPLETE: The trial has data for all required metrics.
"""
NOT_OBSERVED = 0
INCOMPLETE = 1
COMPLETE = 2
def compute_metric_availability(
experiment: Experiment,
trial_indices: Iterable[int] | None = None,
optimization_config: OptimizationConfig | None = None,
metric_names: set[str] | None = None,
) -> dict[int, MetricAvailability]:
"""Compute metric data availability for trials relative to a set of required
metrics.
This function checks which required metrics have data for each trial.
It only inspects whether a metric name appears anywhere in the trial's data
(at any step, for any arm). It does not check for data at specific steps or
fidelity levels. For curve data with a "step" column, a metric is considered
observed if it appears at any step.
The computation is purely in-memory, operating on ``experiment.data``. There
are no database queries or network calls.
Args:
experiment: The experiment whose data to inspect.
trial_indices: Trial indices to compute availability for. If ``None``,
computes for all trials in the experiment.
optimization_config: The optimization config whose metrics define the
required set. If ``None``, uses ``experiment.optimization_config``.
Ignored if ``metric_names`` is provided.
metric_names: An explicit set of required metric names. If provided,
takes precedence over ``optimization_config``.
Returns:
A dict mapping trial index to ``MetricAvailability``.
Raises:
ValueError: If no required metrics can be determined (no
``metric_names``, no ``optimization_config``, and no optimization
config on the experiment).
"""
# Resolve required metrics.
if metric_names is not None:
required_metrics = metric_names
else:
resolved_opt_config = (
optimization_config
if optimization_config is not None
else experiment.optimization_config
)
if resolved_opt_config is None:
raise ValueError(
"An optimization config is required to compute metric availability. "
"Either pass one explicitly, set one on the experiment, or provide "
"metric_names."
)
required_metrics = set(resolved_opt_config.metrics.keys())
# Resolve trial indices.
if trial_indices is not None:
resolved_trial_indices = list(trial_indices)
else:
resolved_trial_indices = list(experiment.trials.keys())
if len(resolved_trial_indices) == 0:
return {}
# If there are no required metrics, everything is COMPLETE.
if len(required_metrics) == 0:
return {idx: MetricAvailability.COMPLETE for idx in resolved_trial_indices}
# Get the set of metric names per trial from the experiment data.
# Use lookup_data to access the public API, then groupby on the DataFrame
# to extract distinct metric names per trial.
data = experiment.lookup_data(trial_indices=resolved_trial_indices)
metrics_per_trial: dict[int, set[str]] = {}
if len(data.metric_names) > 0:
df = data.full_df
for trial_idx, group in df.groupby("trial_index")["metric_name"]:
metrics_per_trial[int(trial_idx)] = set(group.unique())
# Compute availability for each trial.
result: dict[int, MetricAvailability] = {}
for idx in resolved_trial_indices:
available = metrics_per_trial.get(idx, set())
available_required = available & required_metrics
if len(available_required) == 0:
result[idx] = MetricAvailability.NOT_OBSERVED
elif len(available_required) == len(required_metrics):
result[idx] = MetricAvailability.COMPLETE
else:
result[idx] = MetricAvailability.INCOMPLETE
return result
# -------------------- Pending observations extraction utils. ---------------------
def extract_pending_observations(
experiment: Experiment,
include_out_of_design_points: bool = False,
) -> dict[str, list[ObservationFeatures]] | None:
"""Computes a list of pending observation features (corresponding to
arms that have been generated and run in the course of the experiment
but have not been completed with data, or arms that belong to
abandoned trials).
Note: Individually abandoned arms within a BatchTrial are NOT
included as pending.
This function dispatches to:
- ``get_pending_observation_features`` if experiment is using
``BatchTrial``-s or has fewer than 100 trials,
- ``get_pending_observation_features_based_on_trial_status`` if
experiment is using ``Trial``-s and has more than 100 trials.
``get_pending_observation_features_based_on_trial_status`` is a faster
way to compute pending observations, but it is not guaranteed to be
accurate for ``BatchTrial`` settings and makes assumptions, e.g.
arms in ``COMPLETED`` trial never being pending. See docstring of
that function for more details.
NOTE: Pending observation features are passed to the model to
instruct it to not generate the same points again.
"""
if len(experiment.trials) >= MANY_TRIALS_IN_EXPERIMENT and all(
isinstance(t, Trial) for t in experiment.trials.values()
):
return get_pending_observation_features_based_on_trial_status(
experiment=experiment,
include_out_of_design_points=include_out_of_design_points,
)
return get_pending_observation_features(
experiment=experiment, include_out_of_design_points=include_out_of_design_points
)
def get_pending_observation_features(
experiment: Experiment,
*,
include_out_of_design_points: bool = False,
) -> dict[str, list[ObservationFeatures]] | None:
"""Computes a list of pending observation features (corresponding to
arms that have been generated in the course of the experiment
but have not been completed with data, or arms that belong to
abandoned trials).
Note: Individually abandoned arms within a BatchTrial are NOT
included as pending.
NOTE: Pending observation features are passed to the model to
instruct it to not generate the same points again.
Args:
experiment: Experiment, pending features on which we seek to compute.
include_out_of_design_points: By default, this function will not include
"out of design" points (those that are not in the search space) among
the pending points. This is because pending points are generally used to
help the model avoid re-suggesting the same points again. For points
outside of the search space, this will not happen, so they typically do
not need to be included. However, if the user wants to include them,
they can be included by setting this flag to ``True``.
Returns:
An optional mapping from metric names to a list of observation features,
pending for that metric (i.e. do not have evaluation data for that metric).
If there are no pending features for any of the metrics, return is None.
"""
def _is_in_design(arm: Arm) -> bool:
return experiment.search_space.check_membership(parameterization=arm.parameters)
pending_features = {metric_name: [] for metric_name in experiment.metrics}
def create_observation_feature(
arm: Arm,
trial_index: int,
trial: BaseTrial,
) -> ObservationFeatures | None:
if not include_out_of_design_points and not _is_in_design(arm=arm):
return None
return ObservationFeatures.from_arm(
arm=arm,
trial_index=trial_index,
metadata=trial._get_candidate_metadata(arm_name=arm.name),
)
# Build a mapping from trial_index to the set of metric names with data,
# using a single lookup_data call on the experiment for efficiency.
all_data_df = experiment.lookup_data().df
if len(all_data_df) > 0:
metric_names_by_trial: dict[int, set[str]] = (
all_data_df.groupby("trial_index")["metric_name"]
.apply(lambda x: set(x))
.to_dict()
)
else:
metric_names_by_trial = {}
for trial_index, trial in experiment.trials.items():
metric_names_in_data = metric_names_by_trial.get(trial_index, set())
for metric_name in experiment.metrics:
if metric_name not in pending_features:
pending_features[metric_name] = []
if trial.status.is_candidate or (
(
trial.status.is_deployed
or trial.status.is_completed
or trial.status.is_abandoned
)
and metric_name not in metric_names_in_data
and trial.arms is not None
):
for arm in trial.arms:
if feature := create_observation_feature(
arm=arm,
trial_index=trial_index,
trial=trial,
):
pending_features[metric_name].append(feature)
return pending_features if any(x for x in pending_features.values()) else None
# TODO: allow user to pass search space which overrides that on the experiment
# (to use for the `include_out_of_design_points` check)
def get_pending_observation_features_based_on_trial_status(
experiment: Experiment,
include_out_of_design_points: bool = False,
) -> dict[str, list[ObservationFeatures]] | None:
"""A faster analogue of ``get_pending_observation_features`` that makes
assumptions about trials in experiment in order to speed up extraction
of pending points.
Assumptions:
* All arms in all trials in ``CANDIDATE``, ``STAGED``, ``RUNNING`` and ``ABANDONED``
statuses are to be considered pending for all outcomes.
* ``COMPLETED`` trials with incomplete metric availability (i.e., missing
one or more optimization config metrics) are also considered pending to
prevent re-suggestion of partially evaluated arms.
* All arms in all other trials are to be considered not pending for
all outcomes.
This entails:
* No actual data-fetching for trials to determine whether arms in them are pending
for specific outcomes.
* Even if data is present for some outcomes in ``RUNNING`` trials, their arms will
still be considered pending for those outcomes.
NOTE: This function should not be used to extract pending features in field
experiments, where arms in running trials should not be considered pending if
there is data for those arms.
Args:
experiment: Experiment, pending features on which we seek to compute.
Returns:
An optional mapping from metric names to a list of observation features,
pending for that metric (i.e. do not have evaluation data for that metric).
If there are no pending features for any of the metrics, return is None.
"""
pending_features_list = []
for status in [
TrialStatus.CANDIDATE,
TrialStatus.STAGED,
TrialStatus.RUNNING,
TrialStatus.ABANDONED,
]:
for trial in experiment.trials_by_status[status]:
for arm in trial.arms:
if (
include_out_of_design_points
or experiment.search_space.check_membership(arm.parameters)
):
pending_features_list.append(
ObservationFeatures.from_arm(
arm=arm,
trial_index=trial.index,
metadata=trial._get_candidate_metadata(arm_name=arm.name),
)
)
# Also add COMPLETED trials with incomplete metric availability as pending,
# so that partially evaluated arms are not re-suggested.
completed_trials = list(experiment.trials_by_status[TrialStatus.COMPLETED])
if completed_trials and experiment.optimization_config is not None:
metric_availabilities = compute_metric_availability(
experiment=experiment,
trial_indices=[t.index for t in completed_trials],
)
for trial in completed_trials:
if metric_availabilities.get(trial.index) in (
MetricAvailability.INCOMPLETE,
MetricAvailability.NOT_OBSERVED,
):
for arm in trial.arms:
if (
include_out_of_design_points
or experiment.search_space.check_membership(arm.parameters)
):
pending_features_list.append(
ObservationFeatures.from_arm(
arm=arm,
trial_index=trial.index,
metadata=trial._get_candidate_metadata(
arm_name=arm.name
),
)
)
pending_features = {
# Using deepcopy here to avoid issues due to in-place transforms.
metric_name: deepcopy(pending_features_list)
for metric_name in experiment.metrics
}
return pending_features if pending_features_list else None
def extend_pending_observations(
experiment: Experiment,
pending_observations: dict[str, list[ObservationFeatures]],
generator_run: GeneratorRun,
) -> None:
"""Extend given pending observations dict (from metric name to observations
that are pending for that metric), with arms in a given generator run.
Note: This function performs this operation in-place for performance reasons.
It is only used within the ``GenerationStrategy`` class, and is not intended
for wide re-use. Please use caution when re-using this function.
Args:
experiment: Experiment, for which the generation strategy is producing
``GeneratorRun``s.
pending_observations: Dict from metric name to pending observations for
that metric, used to avoid resuggesting arms that will be explored soon.
generator_run: ``GeneratorRun`` currently produced by the
``GenerationStrategy`` to add to the pending points.
"""
for m in experiment.metrics:
if m not in pending_observations:
pending_observations[m] = []
pending_observations[m].extend(
ObservationFeatures.from_arm(a) for a in generator_run.arms
)
return
# -------------------- Get target trial utils. ---------------------
def get_target_trial_index(
experiment: Experiment,
require_data_for_all_metrics: bool = False,
) -> int | None:
"""Get the index of the target trial in the ``Experiment``.
Find the target trial, among the trials with data for status quo arm and
required metrics, giving priority in the following order:
1. A running long-run trial. If the long-run trial
doesn't have data for sq + required metrics, fallback to short-run.
2. Longest running trial with data that is still currently running.
3. Longest running completed trial with data, excluding trials
completed over 10 days ago as they are likely stale.
In the event of any ties, the tie breaking order is:
a. longest running trial by duration
b. trial with most arms
c. arbitrary selection
Args:
experiment: The experiment associated with this ``GenerationStrategy``.
require_data_for_all_metrics: If True, filter to trials with data for
ALL metrics on the experiment (including tracking). If False,
filter to trials with data for all optimization config metrics only.
Typically, this is True for plotting, and false for other purposes
Returns:
The index of the target trial in the ``Experiment``, or None if no
valid trial is found.
"""
df = experiment.lookup_data().df
status_quo = experiment.status_quo
if df.empty or status_quo is None:
return None
# trial indices that have data for required metrics
trial_indices_with_required_metrics = get_trial_indices_with_required_metrics(
experiment=experiment,
df=df,
require_data_for_all_metrics=require_data_for_all_metrics,
)
# trial indices that have data for status quo arm
sq_df = df[df["arm_name"] == status_quo.name]
trial_indices_with_sq_data = set(sq_df.trial_index.unique())
# trials with both SQ data and required metrics
valid_trial_indices = (
trial_indices_with_sq_data & trial_indices_with_required_metrics
)
# only consider running trials with valid data
running_trials = [
trial
for trial in experiment.trials_by_status[TrialStatus.RUNNING]
if trial.index in valid_trial_indices
]
sorted_running_trials = _sort_trials(trials=running_trials, trials_are_running=True)
# Priority 1: Any running long-run trial
has_running_long_run_trial = any(
trial.trial_type == Keys.LONG_RUN
for trial in experiment.trials_by_status[TrialStatus.RUNNING]
)
if has_running_long_run_trial:
# try to find a long-run trial on the experiment, if multiple exist, it will
# return the one with longest duration
for trial in sorted_running_trials:
if trial.trial_type == Keys.LONG_RUN:
return trial.index
# Priority 2: longest running currently running trial with data
if len(sorted_running_trials) > 0:
return sorted_running_trials[0].index
# Priority 3: select from non-running trials with data, excluding:
# - trials that were failed/abandoned/early stopped, likely indicating a problem
# with the trial and therefore should not be selected as the target trial
# - old trials completed > 10 days ago (unless all are old)
valid_completed_trials = [
experiment.trials[idx]
for idx in valid_trial_indices
if experiment.trials[idx].status == TrialStatus.COMPLETED
]
old_threshold = datetime.now() - timedelta(days=OLD_TRIAL_THRESHOLD_DAYS)
non_old_trials = [
trial
for trial in valid_completed_trials
if trial.time_completed is None or trial.time_completed >= old_threshold
]
if non_old_trials:
return _sort_trials(trials=non_old_trials, trials_are_running=False)[0].index
elif valid_completed_trials:
return _sort_trials(trials=valid_completed_trials, trials_are_running=False)[
0
].index
return None
def get_trial_indices_with_required_metrics(
experiment: Experiment,
df: "pd.DataFrame",
require_data_for_all_metrics: bool,
) -> set[int]:
"""Return trial indices in an experiment which have data for required metrics.
Args:
experiment: an Ax experiment
df: experiment data to search through (used only to extract trial
indices; actual availability is computed via
``compute_metric_availability``).
require_data_for_all_metrics: If True, require data for all metrics
on the experiment. If False, only require data for optimization
config metrics.
Returns:
Set of trial indices with data for required metrics.
"""
trial_indices = {int(idx) for idx in df.trial_index.unique()}
if len(trial_indices) == 0:
return set()
if require_data_for_all_metrics:
metric_names = set(experiment.metrics.keys())
else:
if experiment.optimization_config is None:
# If there is no optimization config set, any trial with data
# is valid.
return trial_indices
metric_names = None # Will use optimization config metrics.
availability = compute_metric_availability(
experiment=experiment,
trial_indices=trial_indices,
metric_names=metric_names,
)
return {
idx
for idx, avail in availability.items()
if avail == MetricAvailability.COMPLETE
}
def _sort_trials(
trials: list[BaseTrial],
trials_are_running: bool,
) -> list[BaseTrial]:
"""Sort a list of trials by (1) duration of trial, (2) number of arms in trial.
Args:
trials: The trials to sort.
trials_are_running: Whether the trials are running or not, used to determine
the trial duration for sorting
Returns:
The sorted trials.
"""
default_time_run_started = datetime.now()
twelve_hours_in_secs = 12 * 60 * 60
sorted_trials_expecting_data = sorted(
trials,
key=lambda t: (
# First sorting criterion: trial duration, if a trial's duration is within
# 12 hours of another trial, we consider them to be a tie
int(
(
# if the trial is running, we set end time to now for sorting ease
(
_time_trial_completed_safe(trial=t).timestamp()
if not trials_are_running
else default_time_run_started.timestamp()
)
- _time_trial_started_safe(
trial=t, default_time_run_started=default_time_run_started
).timestamp()
)
// twelve_hours_in_secs
),
# In event of a tie, we want the trial with the most arms
+len(t.arms_by_name),
),
reverse=True,
)
return sorted_trials_expecting_data
def _time_trial_started_safe(
trial: BatchTrial, default_time_run_started: datetime
) -> datetime:
"""Not all RUNNING trials have ``time_run_started`` defined.
This function accepts, but penalizes those trials by using a
default ``time_run_started``, which moves them to the end of
the sort because they would be running a very short time.
Args:
trial: The trial to check.
default_time_run_started: The time to use if `time_run_started` is not defined.
Do not use ``default_time_run_started=datetime.now()`` as it will be
slightly different for each trial. Instead set ``val = datetime.now()``
and then pass ``val`` as the ``default_time_run_started`` argument.
"""
return (
trial.time_run_started
if trial.time_run_started is not None
else default_time_run_started
)
def _time_trial_completed_safe(trial: BatchTrial) -> datetime:
"""Not all COMPLETED trials have `time_completed` defined.
This functions accepts, but penalizes those trials by
choosing epoch 0 as the completed time,
which moves them to the end of the sort because
they would be running a very short time."""
return (
trial.time_completed
if trial.time_completed is not None
else datetime.fromtimestamp(0)
)
# -------------------- MapMetric related utils. ---------------------
def has_map_metrics(optimization_config: OptimizationConfig) -> bool:
"""Check if the optimization config has any ``MapMetric``s.
Args:
optimization_config: Optimization config.
"""
metrics = optimization_config.metrics
# Technically an OptimizationConfig could have zero metrics since a
# MultiObjective could have 0 objectives
if len(metrics) == 0:
return False
return any(isinstance(metric, MapMetric) for metric in metrics.values())
# -------------------- Context manager and decorator utils. ---------------------
def batch_trial_only(msg: str | None = None) -> Callable[..., Any]:
"""A decorator to verify that the value passed to the `trial`
argument to `func` is a `BatchTrial`.
"""
def batch_trial_only_decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def _batch_trial_only(*args: Any, **kwargs: Any) -> Any:
if "trial" not in kwargs:
raise AxError(
f"Expected a keyword argument `trial` to `{func.__name__}`."
)
if not isinstance(kwargs["trial"], BatchTrial):
message = msg or (
f"Expected the argument `trial` to `{func.__name__}` "
f"to be a `BatchTrial`, but got {type(kwargs['trial'])}."
)
raise AxError(message)
return func(*args, **kwargs)
return _batch_trial_only
return batch_trial_only_decorator