7
7
from logging import Logger
8
8
from queue import Queue
9
9
from threading import Event , Lock , Thread
10
- from typing import Callable , Tuple
10
+ from typing import Any , Callable , Dict , Tuple
11
11
12
12
from ax .core .types import TEvaluationOutcome , TParameterization
13
13
21
21
IDLE_SLEEP_SEC = 0.1
22
22
23
23
24
- # TODO[mpolson64] Create `optimize` method that constructs its own ax_client
25
- def optimize_with_client (
26
- ax_client : AxClient ,
24
+ def interactive_optimize (
27
25
num_trials : int ,
28
26
candidate_queue_maxsize : int ,
27
+ # Callable[[Queue[Tuple[TParameterization, int]], Event, ...], None]
28
+ candidate_generator_function : Callable [..., None ],
29
+ candidate_generator_kwargs : Dict [str , Any ],
30
+ # Callable[[Queue[Tuple[int, TEvaluationOutcome]], Event, ...], None]
31
+ data_attacher_function : Callable [..., None ],
32
+ data_attacher_kwargs : Dict [str , Any ],
29
33
elicitation_function : Callable [[TParameterization ], TEvaluationOutcome ],
30
34
) -> None :
31
35
"""
@@ -40,27 +44,23 @@ def optimize_with_client(
40
44
only be used in contexts where it is necessary for the user to not experience any
41
45
"lag" while candidates are being generated.
42
46
43
- Extract results of the experiment from the AxClient passed in.
44
-
45
- The basic structure is as follows: One thread tries for a lock on the AxClient,
46
- generates a candidate, and enqueues it to a candidate queue. Another thread tries
47
- for the same lock, takes all the trial outcomes in the outcome queue, and attaches
48
- them to the AxClient. The main thread pops a candidate off the candidate queue,
49
- elicits response from the user, and puts the response onto the outcome queue.
50
-
51
47
Args:
52
- ax_client: An AxClient properly configured for the experiment intended to be
53
- run. Construct and configure this before passing in.
54
48
num_trials: The total number of trials to be run.
55
49
candidate_queue_maxsize: The maximum number of candidates to pregenerate.
50
+ candidate_generator_function: A function taking in a queue and event that
51
+ enqueues candidates (generated by any means). See
52
+ `ax_client_candidate_generator` for an example.
53
+ candidate_generator_kwargs: kwargs to be passed into
54
+ `candidate_generator_function` when it is spawned as a thread.
55
+ data_attacher_function: A function taking in a queue and event that attaches
56
+ observations to Ax. See `ax_client_data_attacher` for an example.
57
+ data_attacher_kwargs: kwargs to be passed into `data_attacher_function` when
58
+ it is spawned as a thread.
56
59
elicitation_function: Function from parameterization (as returned by
57
- `AxClient.get_next_trial`) to outcome (as expected by
58
- `AxClient.complete_trial`).
60
+ `AxClient.get_next_trial`) to outcome (as expected by
61
+ `AxClient.complete_trial`).
59
62
"""
60
63
61
- # Construct a lock to ensure only one thread my access the AxClient at any moment
62
- ax_client_lock = Lock ()
63
-
64
64
# Construct queues to buffer inputs and outputs of the AxClient
65
65
candidate_queue : "Queue[Tuple[TParameterization, int]]" = Queue (
66
66
maxsize = candidate_queue_maxsize
@@ -74,18 +74,21 @@ def optimize_with_client(
74
74
# Construct threads to run candidate thread-safe pregeneration and thread-safe
75
75
# data attaching respectively
76
76
candidate_generator_thread = Thread (
77
- target = _candidate_generator ,
77
+ target = candidate_generator_function ,
78
78
args = (
79
- ax_client ,
80
- ax_client_lock ,
81
- candidate_generator_stop_event ,
82
79
candidate_queue ,
80
+ candidate_generator_stop_event ,
83
81
num_trials ,
84
82
),
83
+ kwargs = candidate_generator_kwargs ,
85
84
)
86
85
data_attacher_thread = Thread (
87
- target = _data_attacher ,
88
- args = (ax_client , ax_client_lock , data_attacher_stop_event , data_queue ),
86
+ target = data_attacher_function ,
87
+ args = (
88
+ data_queue ,
89
+ data_attacher_stop_event ,
90
+ ),
91
+ kwargs = data_attacher_kwargs ,
89
92
)
90
93
91
94
candidate_generator_thread .start ()
@@ -105,12 +108,43 @@ def optimize_with_client(
105
108
data_attacher_stop_event .set ()
106
109
107
110
108
- def _candidate_generator (
111
+ def interactive_optimize_with_client (
109
112
ax_client : AxClient ,
110
- lock : Lock ,
111
- stop_event : Event ,
113
+ num_trials : int ,
114
+ candidate_queue_maxsize : int ,
115
+ elicitation_function : Callable [[TParameterization ], TEvaluationOutcome ],
116
+ ) -> None :
117
+ """
118
+ Implementation of `interactive_loop` using the AxClient. Extract results of the
119
+ experiment from the AxClient passed in.
120
+
121
+ The basic structure is as follows: One thread tries for a lock on the AxClient,
122
+ generates a candidate, and enqueues it to a candidate queue. Another thread tries
123
+ for the same lock, takes all the trial outcomes in the outcome queue, and attaches
124
+ them to the AxClient. The main thread pops a candidate off the candidate queue,
125
+ elicits response from the user, and puts the response onto the outcome queue.
126
+ """
127
+
128
+ # Construct a lock to ensure only one thread my access the AxClient at any moment
129
+ ax_client_lock = Lock ()
130
+
131
+ interactive_optimize (
132
+ num_trials = num_trials ,
133
+ candidate_queue_maxsize = candidate_queue_maxsize ,
134
+ candidate_generator_function = ax_client_candidate_generator ,
135
+ candidate_generator_kwargs = {"ax_client" : ax_client , "lock" : ax_client_lock },
136
+ data_attacher_function = ax_client_data_attacher ,
137
+ data_attacher_kwargs = {"ax_client" : ax_client , "lock" : ax_client_lock },
138
+ elicitation_function = elicitation_function ,
139
+ )
140
+
141
+
142
+ def ax_client_candidate_generator (
112
143
queue : "Queue[Tuple[TParameterization, int]]" ,
144
+ stop_event : Event ,
113
145
num_trials : int ,
146
+ ax_client : AxClient ,
147
+ lock : Lock ,
114
148
) -> None :
115
149
"""Thread-safe method for generating the next trial from the AxClient and
116
150
enqueueing it to the candidate queue. The number of candidates pre-generated is
@@ -138,11 +172,11 @@ def _candidate_generator(
138
172
time .sleep (IDLE_SLEEP_SEC )
139
173
140
174
141
- def _data_attacher (
175
+ def ax_client_data_attacher (
176
+ queue : "Queue[Tuple[int, TEvaluationOutcome]]" ,
177
+ stop_event : Event ,
142
178
ax_client : AxClient ,
143
179
lock : Lock ,
144
- stop_event : Event ,
145
- queue : "Queue[Tuple[int, TEvaluationOutcome]]" ,
146
180
) -> None :
147
181
"""Thread-safe method for attaching evaluation outcomes to the AxClient from the
148
182
outcome queue. If the AxClient's lock is acquired all data in the outcome queue
0 commit comments