Skip to content

Commit 87d29bb

Browse files
committed
Remove event_from_dict function
1 parent 36eb826 commit 87d29bb

File tree

6 files changed

+96
-98
lines changed

6 files changed

+96
-98
lines changed

src/_ert/events.py

-4
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,6 @@ def event_from_json(raw_msg: str | bytes) -> Event:
241241
return EventAdapter.validate_json(raw_msg)
242242

243243

244-
def event_from_dict(dict_msg: dict[str, Any]) -> Event:
245-
return EventAdapter.validate_python(dict_msg)
246-
247-
248244
def event_to_json(event: Event) -> str:
249245
return event.model_dump_json()
250246

src/ert/ensemble_evaluator/_ensemble.py

+15-19
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
from typing import Any
99

1010
from _ert.events import (
11+
EnsembleCancelled,
12+
EnsembleFailed,
13+
EnsembleStarted,
14+
EnsembleSucceeded,
1115
Event,
1216
FMEvent,
1317
ForwardModelStepFailure,
1418
ForwardModelStepSuccess,
15-
Id,
16-
event_from_dict,
1719
event_to_json,
1820
)
1921
from _ert.forward_model_runner.client import Client
@@ -192,16 +194,6 @@ async def send_event(
192194
async with Client(self._config.get_uri(), token=self._config.token) as client:
193195
await client.send(event_to_json(event), retries)
194196

195-
def generate_event_creator(self) -> Callable[[Id.ENSEMBLE_TYPES], Event]:
196-
def event_builder(status: str) -> Event:
197-
event = {
198-
"event_type": status,
199-
"ensemble": self.id_,
200-
}
201-
return event_from_dict(event)
202-
203-
return event_builder
204-
205197
async def evaluate(
206198
self,
207199
config: EvaluatorServerConfig,
@@ -217,7 +209,6 @@ async def evaluate(
217209
the final result of executing all its jobs through a scheduler and driver.
218210
"""
219211
self._config = config
220-
event_creator = self.generate_event_creator()
221212

222213
if not self.id_:
223214
raise ValueError("Ensemble id not set")
@@ -238,7 +229,7 @@ async def evaluate(
238229
ee_token=self._config.token,
239230
)
240231

241-
await self.send_event(event_creator(Id.ENSEMBLE_STARTED))
232+
await self.send_event(EnsembleStarted(ensemble=self.id_))
242233

243234
min_required_realizations = (
244235
self.min_required_realizations
@@ -247,10 +238,12 @@ async def evaluate(
247238
)
248239

249240
self._scheduler.add_dispatch_information_to_jobs_file()
250-
result = await self._scheduler.execute(min_required_realizations)
241+
scheduler_finished_successfully = await self._scheduler.execute(
242+
min_required_realizations
243+
)
251244
except PermissionError as error:
252245
logger.exception(f"Unexpected exception in ensemble: \n {error!s}")
253-
await self.send_event(event_creator(Id.ENSEMBLE_FAILED))
246+
await self.send_event(EnsembleFailed(ensemble=self.id_))
254247
return
255248
except Exception as exc:
256249
logger.exception(
@@ -260,13 +253,16 @@ async def evaluate(
260253
)
261254
),
262255
)
263-
await self.send_event(event_creator(Id.ENSEMBLE_FAILED))
256+
await self.send_event(EnsembleFailed(ensemble=self.id_))
264257
return
265258
except asyncio.CancelledError:
266259
print("Cancelling evaluator task!")
267260
return
268-
# Dispatch final result from evaluator - SUCCEEDED or CANCEL
269-
await self.send_event(event_creator(result))
261+
# Dispatch final result from evaluator - SUCCEEDED or CANCELLED
262+
if scheduler_finished_successfully:
263+
await self.send_event(EnsembleSucceeded(ensemble=self.id_))
264+
else:
265+
await self.send_event(EnsembleCancelled(ensemble=self.id_))
270266

271267
@property
272268
def cancellable(self) -> bool:

src/ert/scheduler/job.py

+40-35
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,21 @@
88
from contextlib import suppress
99
from enum import StrEnum
1010
from pathlib import Path
11-
from typing import TYPE_CHECKING, Any
11+
from typing import TYPE_CHECKING
1212

1313
from lxml import etree
1414
from opentelemetry.trace import Status, StatusCode
1515

16-
from _ert.events import Id, RealizationTimeout, event_from_dict
16+
from _ert.events import (
17+
RealizationEvent,
18+
RealizationFailed,
19+
RealizationPending,
20+
RealizationResubmit,
21+
RealizationRunning,
22+
RealizationSuccess,
23+
RealizationTimeout,
24+
RealizationWaiting,
25+
)
1726
from ert.callbacks import forward_model_ok
1827
from ert.config import ForwardModelStep
1928
from ert.constant_filenames import ERROR_file
@@ -43,19 +52,6 @@ class JobState(StrEnum):
4352
ABORTED = "ABORTED"
4453

4554

46-
_queue_jobstate_event_type = {
47-
JobState.WAITING: Id.REALIZATION_WAITING,
48-
JobState.SUBMITTING: Id.REALIZATION_WAITING,
49-
JobState.RESUBMITTING: Id.REALIZATION_RESUBMIT,
50-
JobState.PENDING: Id.REALIZATION_PENDING,
51-
JobState.RUNNING: Id.REALIZATION_RUNNING,
52-
JobState.ABORTING: Id.REALIZATION_FAILURE,
53-
JobState.COMPLETED: Id.REALIZATION_SUCCESS,
54-
JobState.FAILED: Id.REALIZATION_FAILURE,
55-
JobState.ABORTED: Id.REALIZATION_FAILURE,
56-
}
57-
58-
5955
class Job:
6056
"""Handle to a single job scheduler job.
6157
@@ -310,28 +306,37 @@ async def _handle_aborted(self) -> None:
310306
log_info_from_exit_file(Path(self.real.run_arg.runpath) / ERROR_file)
311307

312308
async def _send(self, state: JobState) -> None:
313-
event_dict: dict[str, Any] = {
314-
"ensemble": self._scheduler._ens_id,
315-
"event_type": _queue_jobstate_event_type[state],
316-
"queue_event_type": state,
317-
"real": str(self.iens),
318-
"exec_hosts": self.exec_hosts,
319-
}
320-
self.state = state
321-
if state == JobState.FAILED:
322-
event_dict["message"] = self._message
323-
await self._handle_failure()
309+
event: RealizationEvent | None = None
310+
match state:
311+
case JobState.WAITING | JobState.SUBMITTING:
312+
event = RealizationWaiting(real=str(self.iens))
313+
case JobState.RESUBMITTING:
314+
event = RealizationResubmit(real=str(self.iens))
315+
case JobState.PENDING:
316+
event = RealizationPending(real=str(self.iens))
317+
case JobState.RUNNING:
318+
event = RealizationRunning(real=str(self.iens))
319+
case JobState.FAILED:
320+
event = RealizationFailed(real=str(self.iens))
321+
event.message = self._message
322+
await self._handle_failure()
323+
case JobState.ABORTING:
324+
event = RealizationFailed(real=str(self.iens))
325+
case JobState.ABORTED:
326+
event = RealizationFailed(real=str(self.iens))
327+
await self._handle_aborted()
328+
case JobState.COMPLETED:
329+
event = RealizationSuccess(real=str(self.iens))
330+
self._end_time = time.time()
331+
await self._scheduler.completed_jobs.put(self.iens)
324332

325-
elif state == JobState.ABORTED:
326-
await self._handle_aborted()
327-
328-
elif state == JobState.COMPLETED:
329-
self._end_time = time.time()
330-
await self._scheduler.completed_jobs.put(self.iens)
331-
332-
msg = event_from_dict(event_dict)
333+
self.state = state
334+
if event is not None:
335+
event.ensemble = self._scheduler._ens_id
336+
event.queue_event_type = state
337+
event.exec_hosts = self.exec_hosts
333338

334-
await self._scheduler._events.put(msg)
339+
await self._scheduler._events.put(event)
335340

336341

337342
def log_info_from_exit_file(exit_file_path: Path) -> None:

src/ert/scheduler/scheduler.py

+20-13
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
from _ert.events import (
1818
Event,
1919
ForwardModelStepChecksum,
20-
Id,
20+
RealizationFailed,
2121
RealizationStoppedLongRunning,
22-
event_from_dict,
2322
)
2423

2524
from .driver import Driver
@@ -244,7 +243,18 @@ async def _monitor_and_handle_tasks(
244243
async def execute(
245244
self,
246245
min_required_realizations: int = 0,
247-
) -> Id.ENSEMBLE_SUCCEEDED_TYPE | Id.ENSEMBLE_CANCELLED_TYPE:
246+
) -> bool:
247+
"""Run all the jobs in the scheduler, and wait for them to finish.
248+
249+
Args:
250+
min_required_realizations (int, optional): The minimum amount of
251+
realizations that have to be completed before stopping
252+
long-running jobs. Defaults to 0.
253+
254+
Returns:
255+
bool: Returns True if the scheduler ran successfully, False if it
256+
was cancelled.
257+
"""
248258
scheduling_tasks = [
249259
asyncio.create_task(self._publisher(), name="publisher_task"),
250260
asyncio.create_task(
@@ -286,14 +296,11 @@ async def execute(
286296
else:
287297
failure = job.real.run_arg.ensemble_storage.get_failure(iens)
288298
await self._events.put(
289-
event_from_dict(
290-
{
291-
"ensemble": self._ens_id,
292-
"event_type": Id.REALIZATION_FAILURE,
293-
"queue_event_type": JobState.FAILED,
294-
"message": failure.message if failure else None,
295-
"real": str(iens),
296-
}
299+
RealizationFailed(
300+
ensemble=self._ens_id,
301+
real=str(iens),
302+
queue_event_type=JobState.FAILED,
303+
message=failure.message if failure else None,
297304
)
298305
)
299306
logger.info("All tasks started")
@@ -312,9 +319,9 @@ async def execute(
312319

313320
if self._cancelled:
314321
logger.debug("Scheduler has been cancelled, jobs are stopped.")
315-
return Id.ENSEMBLE_CANCELLED
322+
return False
316323

317-
return Id.ENSEMBLE_SUCCEEDED
324+
return True
318325

319326
async def _process_event_queue(self) -> None:
320327
while True:

tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py

+9-10
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
ForwardModelStepFailure,
2020
ForwardModelStepRunning,
2121
ForwardModelStepSuccess,
22-
Id,
22+
RealizationResubmit,
2323
RealizationSuccess,
24-
event_from_dict,
2524
event_to_json,
2625
)
2726
from _ert.forward_model_runner.client import (
@@ -664,14 +663,14 @@ async def test_snapshot_on_resubmit_is_cleared(evaluator_to_use):
664663
assert (
665664
snapshot.get_fm_step("0", "1")["status"] == FORWARD_MODEL_STATE_FAILURE
666665
)
667-
event_dict = {
668-
"ensemble": str(evaluator._ensemble.id_),
669-
"event_type": Id.REALIZATION_RESUBMIT,
670-
"queue_event_type": JobState.RESUBMITTING,
671-
"real": "0",
672-
"exec_hosts": "something",
673-
}
674-
await evaluator._events.put(event_from_dict(event_dict))
666+
await evaluator._events.put(
667+
RealizationResubmit(
668+
ensemble=evaluator.ensemble.id_,
669+
queue_event_type=JobState.RESUBMITTING,
670+
real="0",
671+
exec_hosts="something",
672+
)
673+
)
675674
event = await anext(events)
676675
snapshot = EnsembleSnapshot.from_nested_dict(event.snapshot)
677676
assert snapshot.get_fm_step("0", "0")["status"] == FORWARD_MODEL_STATE_INIT

0 commit comments

Comments
 (0)