Skip to content

Remove event_from_dict function and ZMQ client from LegacyEnsemble #10634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,6 @@ def event_from_json(raw_msg: str | bytes) -> Event:
return EventAdapter.validate_json(raw_msg)


def event_from_dict(dict_msg: dict[str, Any]) -> Event:
return EventAdapter.validate_python(dict_msg)


def event_to_json(event: Event) -> str:
return event.model_dump_json()

Expand Down
48 changes: 19 additions & 29 deletions src/ert/ensemble_evaluator/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from typing import Any

from _ert.events import (
EnsembleCancelled,
EnsembleFailed,
EnsembleStarted,
EnsembleSucceeded,
Event,
FMEvent,
ForwardModelStepFailure,
ForwardModelStepSuccess,
Id,
event_from_dict,
event_to_json,
)
from _ert.forward_model_runner.client import Client
from ert.config import ForwardModelStep, QueueConfig
from ert.run_arg import RunArg
from ert.scheduler import Scheduler, create_driver
Expand Down Expand Up @@ -183,24 +183,9 @@ def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:

return snapshot_mutate_event

async def send_event(
self,
event: Event,
retries: int = 10,
) -> None:
assert self._config is not None
async with Client(self._config.get_uri(), token=self._config.token) as client:
await client.send(event_to_json(event), retries)

def generate_event_creator(self) -> Callable[[Id.ENSEMBLE_TYPES], Event]:
def event_builder(status: str) -> Event:
event = {
"event_type": status,
"ensemble": self.id_,
}
return event_from_dict(event)

return event_builder
async def send_event(self, event: Event) -> None:
if self.outbound_event_queue is not None:
await self.outbound_event_queue.put(event)

async def evaluate(
self,
Expand All @@ -217,7 +202,7 @@ async def evaluate(
the final result of executing all its jobs through a scheduler and driver.
"""
self._config = config
event_creator = self.generate_event_creator()
self.outbound_event_queue = scheduler_queue

if not self.id_:
raise ValueError("Ensemble id not set")
Expand All @@ -238,7 +223,7 @@ async def evaluate(
ee_token=self._config.token,
)

await self.send_event(event_creator(Id.ENSEMBLE_STARTED))
await self.send_event(EnsembleStarted(ensemble=self.id_))

min_required_realizations = (
self.min_required_realizations
Expand All @@ -247,10 +232,12 @@ async def evaluate(
)

self._scheduler.add_dispatch_information_to_jobs_file()
result = await self._scheduler.execute(min_required_realizations)
scheduler_finished_successfully = await self._scheduler.execute(
min_required_realizations
)
except PermissionError as error:
logger.exception(f"Unexpected exception in ensemble: \n {error!s}")
await self.send_event(event_creator(Id.ENSEMBLE_FAILED))
await self.send_event(EnsembleFailed(ensemble=self.id_))
return
except Exception as exc:
logger.exception(
Expand All @@ -260,13 +247,16 @@ async def evaluate(
)
),
)
await self.send_event(event_creator(Id.ENSEMBLE_FAILED))
await self.send_event(EnsembleFailed(ensemble=self.id_))
return
except asyncio.CancelledError:
print("Cancelling evaluator task!")
return
# Dispatch final result from evaluator - SUCCEEDED or CANCEL
await self.send_event(event_creator(result))
# Dispatch final result from evaluator - SUCCEEDED or CANCELLED
if scheduler_finished_successfully:
await self.send_event(EnsembleSucceeded(ensemble=self.id_))
else:
await self.send_event(EnsembleCancelled(ensemble=self.id_))

@property
def cancellable(self) -> bool:
Expand Down
75 changes: 40 additions & 35 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@
from contextlib import suppress
from enum import StrEnum
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING

from lxml import etree
from opentelemetry.trace import Status, StatusCode

from _ert.events import Id, RealizationTimeout, event_from_dict
from _ert.events import (
RealizationEvent,
RealizationFailed,
RealizationPending,
RealizationResubmit,
RealizationRunning,
RealizationSuccess,
RealizationTimeout,
RealizationWaiting,
)
from ert.callbacks import forward_model_ok
from ert.config import ForwardModelStep
from ert.constant_filenames import ERROR_file
Expand Down Expand Up @@ -43,19 +52,6 @@ class JobState(StrEnum):
ABORTED = "ABORTED"


_queue_jobstate_event_type = {
JobState.WAITING: Id.REALIZATION_WAITING,
JobState.SUBMITTING: Id.REALIZATION_WAITING,
JobState.RESUBMITTING: Id.REALIZATION_RESUBMIT,
JobState.PENDING: Id.REALIZATION_PENDING,
JobState.RUNNING: Id.REALIZATION_RUNNING,
JobState.ABORTING: Id.REALIZATION_FAILURE,
JobState.COMPLETED: Id.REALIZATION_SUCCESS,
JobState.FAILED: Id.REALIZATION_FAILURE,
JobState.ABORTED: Id.REALIZATION_FAILURE,
}


class Job:
"""Handle to a single job scheduler job.

Expand Down Expand Up @@ -310,28 +306,37 @@ async def _handle_aborted(self) -> None:
log_info_from_exit_file(Path(self.real.run_arg.runpath) / ERROR_file)

async def _send(self, state: JobState) -> None:
event_dict: dict[str, Any] = {
"ensemble": self._scheduler._ens_id,
"event_type": _queue_jobstate_event_type[state],
"queue_event_type": state,
"real": str(self.iens),
"exec_hosts": self.exec_hosts,
}
self.state = state
if state == JobState.FAILED:
event_dict["message"] = self._message
await self._handle_failure()
event: RealizationEvent | None = None
match state:
case JobState.WAITING | JobState.SUBMITTING:
event = RealizationWaiting(real=str(self.iens))
case JobState.RESUBMITTING:
event = RealizationResubmit(real=str(self.iens))
case JobState.PENDING:
event = RealizationPending(real=str(self.iens))
case JobState.RUNNING:
event = RealizationRunning(real=str(self.iens))
case JobState.FAILED:
event = RealizationFailed(real=str(self.iens))
event.message = self._message
await self._handle_failure()
case JobState.ABORTING:
event = RealizationFailed(real=str(self.iens))
case JobState.ABORTED:
event = RealizationFailed(real=str(self.iens))
await self._handle_aborted()
case JobState.COMPLETED:
event = RealizationSuccess(real=str(self.iens))
self._end_time = time.time()
await self._scheduler.completed_jobs.put(self.iens)

elif state == JobState.ABORTED:
await self._handle_aborted()

elif state == JobState.COMPLETED:
self._end_time = time.time()
await self._scheduler.completed_jobs.put(self.iens)

msg = event_from_dict(event_dict)
self.state = state
if event is not None:
event.ensemble = self._scheduler._ens_id
event.queue_event_type = state
event.exec_hosts = self.exec_hosts

await self._scheduler._events.put(msg)
await self._scheduler._events.put(event)


def log_info_from_exit_file(exit_file_path: Path) -> None:
Expand Down
33 changes: 20 additions & 13 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
from _ert.events import (
Event,
ForwardModelStepChecksum,
Id,
RealizationFailed,
RealizationStoppedLongRunning,
event_from_dict,
)

from .driver import Driver
Expand Down Expand Up @@ -244,7 +243,18 @@ async def _monitor_and_handle_tasks(
async def execute(
self,
min_required_realizations: int = 0,
) -> Id.ENSEMBLE_SUCCEEDED_TYPE | Id.ENSEMBLE_CANCELLED_TYPE:
) -> bool:
"""Run all the jobs in the scheduler, and wait for them to finish.

Args:
min_required_realizations (int, optional): The minimum amount of
realizations that have to be completed before stopping
long-running jobs. Defaults to 0.

Returns:
bool: Returns True if the scheduler ran successfully, False if it
was cancelled.
"""
scheduling_tasks = [
asyncio.create_task(self._publisher(), name="publisher_task"),
asyncio.create_task(
Expand Down Expand Up @@ -286,14 +296,11 @@ async def execute(
else:
failure = job.real.run_arg.ensemble_storage.get_failure(iens)
await self._events.put(
event_from_dict(
{
"ensemble": self._ens_id,
"event_type": Id.REALIZATION_FAILURE,
"queue_event_type": JobState.FAILED,
"message": failure.message if failure else None,
"real": str(iens),
}
RealizationFailed(
ensemble=self._ens_id,
real=str(iens),
queue_event_type=JobState.FAILED,
message=failure.message if failure else None,
)
)
logger.info("All tasks started")
Expand All @@ -312,9 +319,9 @@ async def execute(

if self._cancelled:
logger.debug("Scheduler has been cancelled, jobs are stopped.")
return Id.ENSEMBLE_CANCELLED
return False

return Id.ENSEMBLE_SUCCEEDED
return True

async def _process_event_queue(self) -> None:
while True:
Expand Down
19 changes: 9 additions & 10 deletions tests/ert/unit_tests/ensemble_evaluator/test_ensemble_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
ForwardModelStepFailure,
ForwardModelStepRunning,
ForwardModelStepSuccess,
Id,
RealizationResubmit,
RealizationSuccess,
event_from_dict,
event_to_json,
)
from _ert.forward_model_runner.client import (
Expand Down Expand Up @@ -664,14 +663,14 @@ async def test_snapshot_on_resubmit_is_cleared(evaluator_to_use):
assert (
snapshot.get_fm_step("0", "1")["status"] == FORWARD_MODEL_STATE_FAILURE
)
event_dict = {
"ensemble": str(evaluator._ensemble.id_),
"event_type": Id.REALIZATION_RESUBMIT,
"queue_event_type": JobState.RESUBMITTING,
"real": "0",
"exec_hosts": "something",
}
await evaluator._events.put(event_from_dict(event_dict))
await evaluator._events.put(
RealizationResubmit(
ensemble=evaluator.ensemble.id_,
queue_event_type=JobState.RESUBMITTING,
real="0",
exec_hosts="something",
)
)
event = await anext(events)
snapshot = EnsembleSnapshot.from_nested_dict(event.snapshot)
assert snapshot.get_fm_step("0", "0")["status"] == FORWARD_MODEL_STATE_INIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,6 @@ async def test_queue_config_properties_propagated_to_scheduler(
monkeypatch.setattr(QueueConfig, "max_running", 44)
ensemble._queue_config.max_submit = 55

async def mock_send_event_method(*args, **kwargs):
return

monkeypatch.setattr(
"ert.ensemble_evaluator._ensemble.LegacyEnsemble.send_event",
mock_send_event_method,
)

# The function under test:
await ensemble.evaluate(config=MagicMock())

Expand Down
Loading