Skip to content

Commit 0b33cce

Browse files
committed
Remove ZMQ client from LegacyEnsemble
This commit changes LegacyEnsemble to put events directly in scheduler queue rather than sending it over ZMQ.
1 parent 3ec0f9e commit 0b33cce

File tree

2 files changed

+6
-24
lines changed

2 files changed

+6
-24
lines changed

src/ert/ensemble_evaluator/_ensemble.py

+6-16
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
FMEvent,
1717
ForwardModelStepFailure,
1818
ForwardModelStepSuccess,
19-
event_to_json,
2019
)
21-
from _ert.forward_model_runner.client import Client
2220
from ert.config import ForwardModelStep, QueueConfig
2321
from ert.run_arg import RunArg
2422
from ert.scheduler import Scheduler, create_driver
@@ -185,15 +183,6 @@ def update_snapshot(self, events: Sequence[Event]) -> EnsembleSnapshot:
185183

186184
return snapshot_mutate_event
187185

188-
async def send_event(
189-
self,
190-
event: Event,
191-
retries: int = 10,
192-
) -> None:
193-
assert self._config is not None
194-
async with Client(self._config.get_uri(), token=self._config.token) as client:
195-
await client.send(event_to_json(event), retries)
196-
197186
async def evaluate(
198187
self,
199188
config: EvaluatorServerConfig,
@@ -209,6 +198,7 @@ async def evaluate(
209198
the final result of executing all its jobs through a scheduler and driver.
210199
"""
211200
self._config = config
201+
outbound_event_queue = scheduler_queue or asyncio.Queue()
212202

213203
if not self.id_:
214204
raise ValueError("Ensemble id not set")
@@ -229,7 +219,7 @@ async def evaluate(
229219
ee_token=self._config.token,
230220
)
231221

232-
await self.send_event(EnsembleStarted(ensemble=self.id_))
222+
await outbound_event_queue.put(EnsembleStarted(ensemble=self.id_))
233223

234224
min_required_realizations = (
235225
self.min_required_realizations
@@ -243,7 +233,7 @@ async def evaluate(
243233
)
244234
except PermissionError as error:
245235
logger.exception(f"Unexpected exception in ensemble: \n {error!s}")
246-
await self.send_event(EnsembleFailed(ensemble=self.id_))
236+
await outbound_event_queue.put(EnsembleFailed(ensemble=self.id_))
247237
return
248238
except Exception as exc:
249239
logger.exception(
@@ -253,16 +243,16 @@ async def evaluate(
253243
)
254244
),
255245
)
256-
await self.send_event(EnsembleFailed(ensemble=self.id_))
246+
await outbound_event_queue.put(EnsembleFailed(ensemble=self.id_))
257247
return
258248
except asyncio.CancelledError:
259249
print("Cancelling evaluator task!")
260250
return
261251
# Dispatch final result from evaluator - SUCCEEDED or CANCELLED
262252
if scheduler_finished_successfully:
263-
await self.send_event(EnsembleSucceeded(ensemble=self.id_))
253+
await outbound_event_queue.put(EnsembleSucceeded(ensemble=self.id_))
264254
else:
265-
await self.send_event(EnsembleCancelled(ensemble=self.id_))
255+
await outbound_event_queue.put(EnsembleCancelled(ensemble=self.id_))
266256

267257
@property
268258
def cancellable(self) -> bool:

tests/ert/unit_tests/ensemble_evaluator/test_ensemble_legacy.py

-8
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,6 @@ async def test_queue_config_properties_propagated_to_scheduler(
114114
monkeypatch.setattr(QueueConfig, "max_running", 44)
115115
ensemble._queue_config.max_submit = 55
116116

117-
async def mock_send_event_method(*args, **kwargs):
118-
return
119-
120-
monkeypatch.setattr(
121-
"ert.ensemble_evaluator._ensemble.LegacyEnsemble.send_event",
122-
mock_send_event_method,
123-
)
124-
125117
# The function under test:
126118
await ensemble.evaluate(config=MagicMock())
127119

0 commit comments

Comments
 (0)