Skip to content

Terminate all jobs in one bkill #10297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ async def _execute_with_retry(
)
except FileNotFoundError as e:
return (False, str(e))
except OSError as e:
logger.error(str(e))
await asyncio.sleep(retry_interval)
continue

stdout, stderr = await process.communicate(stdin)

Expand Down
187 changes: 129 additions & 58 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
]


class KillCoordinator:
def __init__(self) -> None:
self.instances_ready = set()

Check failure on line 38 in src/ert/scheduler/lsf_driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Need type annotation for "instances_ready" (hint: "instances_ready: set[<type>] = ...")
self.barrier = asyncio.Condition()
self.kill_completed = asyncio.Event()
self.kill_task: asyncio.Task[None] | None = None


@dataclass(frozen=True)
class IgnoredJobstates:
job_state: Literal["UNKWN"]
Expand Down Expand Up @@ -286,6 +294,8 @@

self._submit_locks: MutableMapping[int, asyncio.Lock] = {}

self.kill_coordinator = KillCoordinator()

async def submit(
self,
iens: int,
Expand Down Expand Up @@ -347,39 +357,44 @@
if iens not in self._submit_locks:
self._submit_locks[iens] = asyncio.Lock()

async with self._submit_locks[iens]:
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process_success, process_message = await self._execute_with_retry(
bsub_with_args,
retry_on_empty_stdout=True,
retry_codes=(FLAKY_SSH_RETURNCODE,),
total_attempts=self._max_bsub_attempts,
retry_interval=self._sleep_time_between_cmd_retries,
error_on_msgs=BSUB_FAILURE_MESSAGES,
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise FailedSubmit(process_message)
async def protected_submit() -> None:
async with self._submit_locks[iens]:
logger.debug(
f"Submitting to LSF with command {shlex.join(bsub_with_args)}"
)
process_success, process_message = await self._execute_with_retry(
bsub_with_args,
retry_on_empty_stdout=True,
retry_codes=(FLAKY_SSH_RETURNCODE,),
total_attempts=self._max_bsub_attempts,
retry_interval=self._sleep_time_between_cmd_retries,
error_on_msgs=BSUB_FAILURE_MESSAGES,
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise FailedSubmit(process_message)

match = re.search(
r"Job <([0-9]+)> is submitted to .*queue", process_message
)
if match is None:
raise FailedSubmit(
f"Could not understand '{process_message}' from bsub"
match = re.search(
r"Job <([0-9]+)> is submitted to .*queue", process_message
)
if match is None:
raise FailedSubmit(
f"Could not understand '{process_message}' from bsub"
)
job_id = match[1]
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")

(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
job_id = match[1]
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")
self._jobs[job_id] = JobData(
iens=iens,
job_state=QueuedJob(job_state="PEND"),
submitted_timestamp=time.time(),
)
self._iens2jobid[iens] = job_id

(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id] = JobData(
iens=iens,
job_state=QueuedJob(job_state="PEND"),
submitted_timestamp=time.time(),
)
self._iens2jobid[iens] = job_id
await asyncio.shield(protected_submit())

async def kill(self, iens: int) -> None:
if iens not in self._submit_locks:
Expand All @@ -395,38 +410,94 @@
)
return

job_id = self._iens2jobid[iens]

logger.debug(f"Killing realization {iens} with LSF-id {job_id}")
bkill_with_args: list[str] = [
str(self._bkill_cmd),
"-s",
"SIGTERM",
job_id,
]

_, process_message = await self._execute_with_retry(
bkill_with_args,
retry_codes=(FLAKY_SSH_RETURNCODE,),
total_attempts=3,
retry_interval=self._sleep_time_between_cmd_retries,
return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG),
)
await asyncio.create_subprocess_shell(
f"sleep {self._sleep_time_between_bkills}; "
f"{self._bkill_cmd} -s SIGKILL {job_id}",
start_new_session=True,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
async with self.kill_coordinator.barrier:
await asyncio.sleep(0.1)
self.kill_coordinator.instances_ready.add(iens)

if self.kill_coordinator.instances_ready == set(self._submit_locks):
self.kill_coordinator.barrier.notify_all()

self.kill_coordinator.kill_task = asyncio.create_task(
self.kill_all()
)

self.kill_coordinator.kill_task.add_done_callback(
lambda _: self.kill_coordinator.kill_completed.set()
)
else:
try:
await asyncio.wait_for(
self.kill_coordinator.barrier.wait(), timeout=30
)
except TimeoutError:
logger.warning(
f"Timeout waiting for all realizations to"
" coordinate termination. "
f"Expected {len(self._submit_locks)} but only got "
f"{len(self.kill_coordinator.instances_ready)}. "
f"Proceeding with termination operation anyway."
)
if self.kill_coordinator.kill_task is None:
self.kill_coordinator.kill_task = asyncio.create_task(
self.kill_all()
)
self.kill_coordinator.kill_task.add_done_callback(
lambda _: self.kill_coordinator.kill_completed.set()
)

await self.kill_coordinator.kill_completed.wait()

async def kill_all(self) -> None:
logger.debug(
f"Killing realizations {' '.join(str(i) for i in self._iens2jobid)}"
f"with LSF-id {' '.join(str(i) for i in self._iens2jobid.values())}"
)
bkill_with_args: list[str] = [
str(self._bkill_cmd),
"-s",
"SIGTERM",
*self._iens2jobid.values(),
]

_, process_message = await self._execute_with_retry(
bkill_with_args,
retry_codes=(FLAKY_SSH_RETURNCODE,),
total_attempts=3,
retry_interval=self._sleep_time_between_cmd_retries,
return_on_msgs=(JOB_ALREADY_FINISHED_BKILL_MSG),
)
logger.info("Sending SIGKILL in 15 seconds from now!!!")
logger.info(
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL \
{' '.join(str(i) for i in self._iens2jobid.values())}"
)
print(
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL \
{' '.join(str(i) for i in self._iens2jobid.values())}"
)
await asyncio.create_subprocess_shell(
f"sleep {self._sleep_time_between_bkills}; {self._bkill_cmd} -s SIGKILL \
{' '.join(str(i) for i in self._iens2jobid.values())}",
start_new_session=True,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
for job_id in self._iens2jobid.values():
if not re.search(
f"Job <{job_id}> is being (terminated|signaled)", process_message
):
if JOB_ALREADY_FINISHED_BKILL_MSG in process_message:
logger.debug(f"LSF kill failed with: {process_message}")
return
logger.error(f"LSF kill failed with: {process_message}")
job_pattern = f"(.*{job_id}.*)"
job_message_match = re.search(job_pattern, process_message)
job_message = (
job_message_match.group(1)
if job_message_match
else "No specific message found"
)

logger.error(
f"LSF kill seems to have failed for job {job_id} with message: "
f"{job_message}!"
)

async def poll(self) -> None:
while True:
Expand Down
6 changes: 3 additions & 3 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,15 @@ async def test_faulty_bsub_produces_error_log(monkeypatch, tmp_path):
"1",
255,
"",
"Job <22>: No matching job found",
"Job <11>: No matching job found",
"No matching job found",
id="inconsistency_ert_vs_lsf",
),
pytest.param(
{"1": "11"},
"1",
0,
"wrong_stdout...",
"Job <11>: wrong_stdout...",
"",
"wrong_stdout...",
id="artificial_bkill_stdout_giving_logged_error",
Expand All @@ -355,7 +355,7 @@ async def test_faulty_bsub_produces_error_log(monkeypatch, tmp_path):
"1",
1,
"",
"wrong_on_stderr",
"Job <11>: wrong_on_stderr",
"wrong_on_stderr",
id="artificial_bkill_stderr_and_returncode_giving_logged_error",
),
Expand Down
Loading