Skip to content

Commit 6bc0bdf

Browse files
committed
checkpoint
1 parent 9b39d25 commit 6bc0bdf

File tree

5 files changed

+213
-540
lines changed

5 files changed

+213
-540
lines changed

src/_ert/events.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ class EEUserDone(BaseEvent):
217217
EEEvent = EESnapshot | EESnapshotUpdate | EETerminated | EEUserCancel | EEUserDone
218218

219219
Event = FMEvent | ForwardModelStepChecksum | RealizationEvent | EEEvent | EnsembleEvent
220-
220+
EventForBrm = EESnapshot | EESnapshotUpdate | ForwardModelStepChecksum
221221
DispatchEvent = FMEvent | ForwardModelStepChecksum | RealizationEvent | EnsembleEvent
222222

223223
_DISPATCH_EVENTS_ANNOTATION = Annotated[

src/ert/ensemble_evaluator/evaluator.py

+13-11
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class UserCancelled(Exception):
2828
EnsembleStarted,
2929
EnsembleSucceeded,
3030
Event,
31+
EventForBrm,
3132
FMEvent,
3233
ForwardModelStepChecksum,
3334
RealizationEvent,
@@ -60,7 +61,7 @@ def __init__(
6061
self,
6162
ensemble: Ensemble,
6263
config: EvaluatorServerConfig,
63-
send_to_brm: Callable[[Event], None],
64+
event_handler: Callable[[EventForBrm], None] = lambda _: _,
6465
) -> None:
6566
self._config: EvaluatorServerConfig = config
6667
self._ensemble: Ensemble = ensemble
@@ -83,19 +84,19 @@ def __init__(
8384
self._dispatchers_empty: asyncio.Event = asyncio.Event()
8485
self._dispatchers_empty.set()
8586
current_snapshot_dict = self._ensemble.snapshot.to_dict()
86-
self._send_to_brm = send_to_brm
87-
event: Event = EESnapshot(
87+
self._handle_event = event_handler
88+
event = EESnapshot(
8889
snapshot=current_snapshot_dict,
8990
ensemble=self.ensemble.id_,
9091
)
91-
self._send_to_brm(event)
92+
self._handle_event(event)
9293
self._monitoring_result: asyncio.Future[bool] = asyncio.Future()
9394

9495
async def _append_message(self, snapshot_update_event: EnsembleSnapshot) -> None:
9596
event = EESnapshotUpdate(
9697
snapshot=snapshot_update_event.to_dict(), ensemble=self._ensemble.id_
9798
)
98-
self._send_to_brm(event)
99+
self._handle_event(event)
99100
if event.snapshot.get(ids.STATUS) in {
100101
ENSEMBLE_STATE_STOPPED,
101102
ENSEMBLE_STATE_FAILED,
@@ -215,11 +216,12 @@ async def _failed_handler(self, events: Sequence[EnsembleFailed]) -> None:
215216
def ensemble(self) -> Ensemble:
216217
return self._ensemble
217218

218-
def cancel_gracefully(self) -> None:
219+
def cancel_gracefully_synced(self) -> None:
220+
self._ee_tasks.append(self._running_loop.create_task(self.cancel_gracefully()))
221+
222+
async def cancel_gracefully(self):
219223
cancel_event = EEUserCancel()
220-
self._ee_tasks.append(
221-
self._running_loop.create_task(self.handle_client_event(cancel_event))
222-
)
224+
await self.handle_client_event(cancel_event)
223225

224226
async def handle_client_event(self, event: EEEvent) -> None:
225227
if type(event) is EEUserCancel:
@@ -277,9 +279,9 @@ async def listen_for_messages(self) -> None:
277279
except asyncio.CancelledError:
278280
return
279281

280-
async def forward_checksum(self, event: Event) -> None:
282+
async def forward_checksum(self, event: ForwardModelStepChecksum) -> None:
281283
# clients still need to receive events via ws
282-
self._send_to_brm(event)
284+
self._handle_event(event)
283285
await self._manifest_queue.put(event)
284286

285287
async def _server(self) -> None:

src/ert/run_models/base_run_model.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ def ensemble_size(self) -> int:
315315

316316
def cancel(self) -> None:
317317
self._cancelled = True
318-
self._ensemble_evaluator.cancel_gracefully()
318+
self._ensemble_evaluator.cancel_gracefully_synced()
319319

320320
def has_failed_realizations(self) -> bool:
321321
return any(self._create_mask_from_failed_realizations())

0 commit comments

Comments
 (0)