Skip to content

Commit 39f5abb

Browse files
committed
Terminate all LSF jobs in one bkill
1 parent 59ebfaf commit 39f5abb

File tree

2 files changed

+132
-61
lines changed

2 files changed

+132
-61
lines changed

src/ert/scheduler/lsf_driver.py

+129-58
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@
3333
]
3434

3535

36+
class KillCoordinator:
37+
def __init__(self) -> None:
38+
self.instances_ready = 0
39+
self.barrier = asyncio.Condition()
40+
self.kill_completed = asyncio.Event()
41+
self.kill_task: asyncio.Task[None] | None = None
42+
43+
3644
@dataclass(frozen=True)
3745
class IgnoredJobstates:
3846
job_state: Literal["UNKWN"]
@@ -286,6 +294,8 @@ def __init__(
286294

287295
self._submit_locks: MutableMapping[int, asyncio.Lock] = {}
288296

297+
self.kill_coordinator = KillCoordinator()
298+
289299
async def submit(
290300
self,
291301
iens: int,
@@ -347,39 +357,44 @@ async def submit(
347357
if iens not in self._submit_locks:
348358
self._submit_locks[iens] = asyncio.Lock()
349359

350-
async with self._submit_locks[iens]:
351-
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
352-
process_success, process_message = await self._execute_with_retry(
353-
bsub_with_args,
354-
retry_on_empty_stdout=True,
355-
retry_codes=(FLAKY_SSH_RETURNCODE,),
356-
total_attempts=self._max_bsub_attempts,
357-
retry_interval=self._sleep_time_between_cmd_retries,
358-
error_on_msgs=BSUB_FAILURE_MESSAGES,
359-
)
360-
if not process_success:
361-
self._job_error_message_by_iens[iens] = process_message
362-
raise FailedSubmit(process_message)
360+
async def protected_submit() -> None:
361+
async with self._submit_locks[iens]:
362+
logger.debug(
363+
f"Submitting to LSF with command {shlex.join(bsub_with_args)}"
364+
)
365+
process_success, process_message = await self._execute_with_retry(
366+
bsub_with_args,
367+
retry_on_empty_stdout=True,
368+
retry_codes=(FLAKY_SSH_RETURNCODE,),
369+
total_attempts=self._max_bsub_attempts,
370+
retry_interval=self._sleep_time_between_cmd_retries,
371+
error_on_msgs=BSUB_FAILURE_MESSAGES,
372+
)
373+
if not process_success:
374+
self._job_error_message_by_iens[iens] = process_message
375+
raise FailedSubmit(process_message)
363376

364-
match = re.search(
365-
r"Job <([0-9]+)> is submitted to .*queue", process_message
366-
)
367-
if match is None:
368-
raise FailedSubmit(
369-
f"Could not understand '{process_message}' from bsub"
377+
match = re.search(
378+
r"Job <([0-9]+)> is submitted to .*queue", process_message
379+
)
380+
if match is None:
381+
raise FailedSubmit(
382+
f"Could not understand '{process_message}' from bsub"
383+
)
384+
job_id = match[1]
385+
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")
386+
387+
(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
388+
json.dumps({"job_id": job_id}), encoding="utf-8"
370389
)
371-
job_id = match[1]
372-
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")
390+
self._jobs[job_id] = JobData(
391+
iens=iens,
392+
job_state=QueuedJob(job_state="PEND"),
393+
submitted_timestamp=time.time(),
394+
)
395+
self._iens2jobid[iens] = job_id
373396

374-
(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
375-
json.dumps({"job_id": job_id}), encoding="utf-8"
376-
)
377-
self._jobs[job_id] = JobData(
378-
iens=iens,
379-
job_state=QueuedJob(job_state="PEND"),
380-
submitted_timestamp=time.time(),
381-
)
382-
self._iens2jobid[iens] = job_id
397+
await asyncio.shield(protected_submit())
383398

384399
async def kill(self, iens: int) -> None:
385400
if iens not in self._submit_locks:
@@ -395,38 +410,94 @@ async def kill(self, iens: int) -> None:
395410
)
396411
return
397412

398-
job_id = self._iens2jobid[iens]
399-
400-
logger.debug(f"Killing realization {iens} with LSF-id {job_id}")
401-
bkill_with_args: list[str] = [
402-
str(self._bkill_cmd),
403-
"-s",
404-
"SIGTERM",
405-
job_id,
406-
]
407-
408-
_, process_message = await self._execute_with_retry(
409-
bkill_with_args,
410-
retry_codes=(FLAKY_SSH_RETURNCODE,),
411-
total_attempts=3,
412-
retry_interval=self._sleep_time_between_cmd_retries,
413-
return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG),
414-
)
415-
await asyncio.create_subprocess_shell(
416-
f"sleep {self._sleep_time_between_bkills}; "
417-
f"{self._bkill_cmd} -s SIGKILL {job_id}",
418-
start_new_session=True,
419-
stdout=asyncio.subprocess.DEVNULL,
420-
stderr=asyncio.subprocess.DEVNULL,
421-
)
413+
async with self.kill_coordinator.barrier:
414+
await asyncio.sleep(0.1)
415+
self.kill_coordinator.instances_ready += 1
416+
417+
if self.kill_coordinator.instances_ready == len(self._submit_locks):
418+
self.kill_coordinator.barrier.notify_all()
419+
420+
self.kill_coordinator.kill_task = asyncio.create_task(
421+
self.kill_all()
422+
)
423+
424+
self.kill_coordinator.kill_task.add_done_callback(
425+
lambda _: self.kill_coordinator.kill_completed.set()
426+
)
427+
else:
428+
try:
429+
await asyncio.wait_for(
430+
self.kill_coordinator.barrier.wait(), timeout=30
431+
)
432+
except TimeoutError:
433+
logger.warning(
434+
f"Timeout waiting for all realizations to"
435+
" coordinate termination. "
436+
f"Expected {len(self._iens2jobid)} but only got "
437+
f"{self.kill_coordinator.instances_ready}. "
438+
f"Proceeding with termination operation anyway."
439+
)
440+
if self.kill_coordinator.kill_task is None:
441+
self.kill_coordinator.kill_task = asyncio.create_task(
442+
self.kill_all()
443+
)
444+
self.kill_coordinator.kill_task.add_done_callback(
445+
lambda _: self.kill_coordinator.kill_completed.set()
446+
)
447+
448+
await self.kill_coordinator.kill_completed.wait()
449+
450+
async def kill_all(self) -> None:
451+
logger.debug(
452+
f"Killing realizations {' '.join(str(i) for i in self._iens2jobid)}"
453+
f"with LSF-id {' '.join(str(i) for i in self._iens2jobid.values())}"
454+
)
455+
bkill_with_args: list[str] = [
456+
str(self._bkill_cmd),
457+
"-s",
458+
"SIGTERM",
459+
*self._iens2jobid.values(),
460+
]
422461

462+
_, process_message = await self._execute_with_retry(
463+
bkill_with_args,
464+
retry_codes=(FLAKY_SSH_RETURNCODE,),
465+
total_attempts=3,
466+
retry_interval=self._sleep_time_between_cmd_retries,
467+
return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG),
468+
)
469+
logger.info("Sending SIGKILL in 15 seconds from now!!!")
470+
logger.info(
471+
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL \
472+
{' '.join(str(i) for i in self._iens2jobid.values())}"
473+
)
474+
print(
475+
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL \
476+
{' '.join(str(i) for i in self._iens2jobid.values())}"
477+
)
478+
await asyncio.create_subprocess_shell(
479+
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL \
480+
{' '.join(str(i) for i in self._iens2jobid.values())}",
481+
start_new_session=True,
482+
stdout=asyncio.subprocess.DEVNULL,
483+
stderr=asyncio.subprocess.DEVNULL,
484+
)
485+
for job_id in self._iens2jobid.values():
423486
if not re.search(
424487
f"Job <{job_id}> is being (terminated|signaled)", process_message
425488
):
426-
if JOB_ALREADY_FINISHED_BKILL_MSG in process_message:
427-
logger.debug(f"LSF kill failed with: {process_message}")
428-
return
429-
logger.error(f"LSF kill failed with: {process_message}")
489+
job_pattern = f"(.*{job_id}.*)"
490+
job_message_match = re.search(job_pattern, process_message)
491+
job_message = (
492+
job_message_match.group(1)
493+
if job_message_match
494+
else "No specific message found"
495+
)
496+
497+
logger.error(
498+
f"LSF kill seems to have failed for job {job_id} with message: "
499+
f"{job_message}!"
500+
)
430501

431502
async def poll(self) -> None:
432503
while True:

tests/ert/unit_tests/scheduler/test_lsf_driver.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,15 @@ async def test_faulty_bsub_produces_error_log(monkeypatch, tmp_path):
337337
"1",
338338
255,
339339
"",
340-
"Job <22>: No matching job found",
340+
"Job <11>: No matching job found",
341341
"No matching job found",
342342
id="inconsistency_ert_vs_lsf",
343343
),
344344
pytest.param(
345345
{"1": "11"},
346346
"1",
347347
0,
348-
"wrong_stdout...",
348+
"Job <11>: wrong_stdout...",
349349
"",
350350
"wrong_stdout...",
351351
id="artificial_bkill_stdout_giving_logged_error",
@@ -355,7 +355,7 @@ async def test_faulty_bsub_produces_error_log(monkeypatch, tmp_path):
355355
"1",
356356
1,
357357
"",
358-
"wrong_on_stderr",
358+
"Job <11>: wrong_on_stderr",
359359
"wrong_on_stderr",
360360
id="artificial_bkill_stderr_and_returncode_giving_logged_error",
361361
),

0 commit comments

Comments
 (0)