16
16
FMEvent ,
17
17
ForwardModelStepFailure ,
18
18
ForwardModelStepSuccess ,
19
- event_to_json ,
20
19
)
21
- from _ert .forward_model_runner .client import Client
22
20
from ert .config import ForwardModelStep , QueueConfig
23
21
from ert .run_arg import RunArg
24
22
from ert .scheduler import Scheduler , create_driver
@@ -185,15 +183,6 @@ def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:
185
183
186
184
return snapshot_mutate_event
187
185
188
- async def send_event (
189
- self ,
190
- event : Event ,
191
- retries : int = 10 ,
192
- ) -> None :
193
- assert self ._config is not None
194
- async with Client (self ._config .get_uri (), token = self ._config .token ) as client :
195
- await client .send (event_to_json (event ), retries )
196
-
197
186
async def evaluate (
198
187
self ,
199
188
config : EvaluatorServerConfig ,
@@ -209,6 +198,7 @@ async def evaluate(
209
198
the final result of executing all its jobs through a scheduler and driver.
210
199
"""
211
200
self ._config = config
201
+ outbound_event_queue = scheduler_queue or asyncio .Queue ()
212
202
213
203
if not self .id_ :
214
204
raise ValueError ("Ensemble id not set" )
@@ -229,7 +219,7 @@ async def evaluate(
229
219
ee_token = self ._config .token ,
230
220
)
231
221
232
- await self . send_event (EnsembleStarted (ensemble = self .id_ ))
222
+ await outbound_event_queue . put (EnsembleStarted (ensemble = self .id_ ))
233
223
234
224
min_required_realizations = (
235
225
self .min_required_realizations
@@ -253,16 +243,16 @@ async def evaluate(
253
243
)
254
244
),
255
245
)
256
- await self . send_event (EnsembleFailed (ensemble = self .id_ ))
246
+ await outbound_event_queue . put (EnsembleFailed (ensemble = self .id_ ))
257
247
return
258
248
except asyncio .CancelledError :
259
249
print ("Cancelling evaluator task!" )
260
250
return
261
251
# Dispatch final result from evaluator - SUCCEEDED or CANCELLED
262
252
if scheduler_finished_successfully :
263
- await self . send_event (EnsembleSucceeded (ensemble = self .id_ ))
253
+ await outbound_event_queue . put (EnsembleSucceeded (ensemble = self .id_ ))
264
254
else :
265
- await self . send_event (EnsembleCancelled (ensemble = self .id_ ))
255
+ await outbound_event_queue . put (EnsembleCancelled (ensemble = self .id_ ))
266
256
267
257
@property
268
258
def cancellable (self ) -> bool :
0 commit comments