Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
type: local
output_dir: ???
extra_docker_args: ""
mode: sequential
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,26 @@ def kill_job(job_id: str) -> None:
NotImplementedError: If not implemented by a subclass.
"""
raise NotImplementedError("Subclasses must implement this method")

@staticmethod
def get_kill_failure_message(
job_id: str, container_or_id: str, status: Optional[ExecutionState] = None
) -> str:
"""Generate an informative error message when kill fails based on job status.

Args:
job_id: The job ID that failed to kill.
container_or_id: Container name, SLURM job ID, or other identifier.
status: Optional execution state of the job.

Returns:
str: An informative error message with job status context.
"""
if status == ExecutionState.SUCCESS:
return f"Could not find or kill job {job_id} ({container_or_id}) - job already completed successfully"
elif status == ExecutionState.FAILED:
return f"Could not find or kill job {job_id} ({container_or_id}) - job already failed"
elif status == ExecutionState.KILLED:
return f"Could not find or kill job {job_id} ({container_or_id}) - job was already killed"
# Generic error message
return f"Could not find or kill job {job_id} ({container_or_id})"
Original file line number Diff line number Diff line change
Expand Up @@ -622,76 +622,14 @@ def get_status(id: str) -> List[ExecutionStatus]:
def kill_job(job_id: str) -> None:
"""Kill Lepton evaluation jobs and clean up endpoints.

For invocation IDs, this will kill all jobs and clean up all
dedicated endpoints created for the invocation.

Args:
job_id: The job ID or invocation ID to kill.
job_id: The job ID to kill.

Raises:
ValueError: If job is not found or invalid.
RuntimeError: If job cannot be killed.
"""
db = ExecutionDB()

# If it looks like an invocation_id, kill all jobs for that invocation
if len(job_id) == 8 and "." not in job_id:
jobs = db.get_jobs(job_id)
if not jobs:
raise ValueError(f"No jobs found for invocation {job_id}")

endpoint_names = (
set()
) # Use set to avoid duplicates (though each should be unique)
lepton_job_names = []

# Collect all Lepton jobs and endpoint info
for curr_job_data in jobs.values():
if curr_job_data.executor != "lepton":
continue

# Collect endpoint name for this job (each task may have its own)
endpoint_name = curr_job_data.data.get("endpoint_name")
if endpoint_name:
endpoint_names.add(endpoint_name)

lepton_job_name = curr_job_data.data.get("lepton_job_name")
if lepton_job_name:
lepton_job_names.append(lepton_job_name)

# Mark job as killed in database
curr_job_data.data["status"] = "killed"
curr_job_data.data["killed_time"] = time.time()
db.write_job(curr_job_data)

print(
f"🛑 Killing {len(lepton_job_names)} Lepton jobs for invocation {job_id}"
)

# Cancel all Lepton jobs
for lepton_job_name in lepton_job_names:
success = delete_lepton_job(lepton_job_name)
if success:
print(f"✅ Cancelled Lepton job: {lepton_job_name}")
else:
print(f"⚠️ Failed to cancel Lepton job: {lepton_job_name}")

# Clean up all dedicated endpoints
if endpoint_names:
print(f"🧹 Cleaning up {len(endpoint_names)} dedicated endpoints")
for endpoint_name in endpoint_names:
success = delete_lepton_endpoint(endpoint_name)
if success:
print(f"✅ Cleaned up endpoint: {endpoint_name}")
else:
print(f"⚠️ Failed to cleanup endpoint: {endpoint_name}")
else:
print("📌 No dedicated endpoints to clean up (using shared endpoint)")

print(f"🛑 Killed all resources for invocation {job_id}")
return

# Otherwise, treat as individual job_id
job_data = db.get_job(job_id)
if job_data is None:
raise ValueError(f"Job {job_id} not found")
Expand All @@ -703,17 +641,25 @@ def kill_job(job_id: str) -> None:

# Cancel the specific Lepton job
lepton_job_name = job_data.data.get("lepton_job_name")

if lepton_job_name:
success = delete_lepton_job(lepton_job_name)
if success:
cancel_success = delete_lepton_job(lepton_job_name)
if cancel_success:
print(f"✅ Cancelled Lepton job: {lepton_job_name}")
Comment thread
agronskiy marked this conversation as resolved.
# Mark job as killed in database
job_data.data["status"] = "killed"
job_data.data["killed_time"] = time.time()
db.write_job(job_data)
else:
print(f"⚠️ Failed to cancel Lepton job: {lepton_job_name}")

# Mark job as killed in database
job_data.data["status"] = "killed"
job_data.data["killed_time"] = time.time()
db.write_job(job_data)
# Use common helper to get informative error message based on job status
status_list = LeptonExecutor.get_status(job_id)
current_status = status_list[0].state if status_list else None
error_msg = BaseExecutor.get_kill_failure_message(
Comment thread
agronskiy marked this conversation as resolved.
Outdated
job_id, f"lepton_job: {lepton_job_name}", current_status
)
raise RuntimeError(error_msg)
else:
raise ValueError(f"No Lepton job name found for job {job_id}")

print(f"🛑 Killed Lepton job {job_id}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,10 @@ def get_status(id: str) -> List[ExecutionStatus]:

@staticmethod
def kill_job(job_id: str) -> None:
"""Kill a local job by stopping its Docker container and related processes.
"""Kill a local job.

Args:
job_id: The job ID to kill.
job_id: The job ID (e.g., abc123.0) to kill.

Raises:
ValueError: If job is not found or invalid.
Expand Down Expand Up @@ -463,14 +463,55 @@ def kill_job(job_id: str) -> None:
if result.returncode == 0:
killed_something = True

# Mark job as killed in database if we killed something
# If we successfully killed something, mark as killed
if killed_something:
job_data.data["killed"] = True
db.write_job(job_data)
else:
raise RuntimeError(
f"Could not find or kill job {job_id} (container: {container_name})"
)
LocalExecutor._add_to_killed_jobs(job_data.invocation_id, job_id)
return

# If nothing was killed, check if this is a pending job
status_list = LocalExecutor.get_status(job_id)
if status_list and status_list[0].state == ExecutionState.PENDING:
# For pending jobs, mark as killed even though there's nothing to kill yet
job_data.data["killed"] = True
db.write_job(job_data)
LocalExecutor._add_to_killed_jobs(job_data.invocation_id, job_id)
return

# Use common helper to get informative error message based on job status
current_status = status_list[0].state if status_list else None
error_msg = BaseExecutor.get_kill_failure_message(
job_id, f"container: {container_name}", current_status
)
raise RuntimeError(error_msg)

@staticmethod
def _add_to_killed_jobs(invocation_id: str, job_id: str) -> None:
"""Add a job ID to the killed jobs file for this invocation.

Args:
invocation_id: The invocation ID.
job_id: The job ID to mark as killed.
"""
db = ExecutionDB()
jobs = db.get_jobs(invocation_id)
if not jobs:
return

# Get invocation output directory from any job's output_dir
first_job_data = next(iter(jobs.values()))
job_output_dir = pathlib.Path(first_job_data.data.get("output_dir", ""))
if not job_output_dir.exists():
return

# Invocation dir is parent of job output dir
invocation_dir = job_output_dir.parent
killed_jobs_file = invocation_dir / "killed_jobs.txt"

# Append job_id to file
with open(killed_jobs_file, "a") as f:
f.write(f"{job_id}\n")


def _get_progress(artifacts_dir: pathlib.Path) -> Optional[float]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
# check if docker exists
command -v docker >/dev/null 2>&1 || { echo 'docker not found'; exit 1; }

# Initialize: remove killed jobs file from previous runs
script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
killed_jobs_file="$script_dir/killed_jobs.txt"
rm -f "$killed_jobs_file"

{% for task in evaluation_tasks %}
# {{ task.job_id }} {{ task.name }}

Expand All @@ -28,13 +33,17 @@ mkdir -m 777 -p "$task_dir"
mkdir -m 777 -p "$artifacts_dir"
mkdir -m 777 -p "$logs_dir"

# Create pre-start stage file
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ)" > "$logs_dir/stage.pre-start"
# Check if this job was killed
if [ -f "$killed_jobs_file" ] && grep -q "^{{ task.job_id }}$" "$killed_jobs_file"; then
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ) Job {{ task.job_id }} ({{ task.name }}) was killed, skipping execution" | tee -a "$logs_dir/stdout.log"
else
# Create pre-start stage file
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ)" > "$logs_dir/stage.pre-start"

# Docker run with eval factory command
(
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ)" > "$logs_dir/stage.running"
docker run --rm --shm-size=100g {{ extra_docker_args }} \
# Docker run with eval factory command
(
echo "$(date -u +%Y-%m-%dT%H:%M:%SZ)" > "$logs_dir/stage.running"
docker run --rm --shm-size=100g {{ extra_docker_args }} \
--name {{ task.container_name }} \
--volume "$artifacts_dir":/results \
{% for env_var in task.env_vars -%}
Expand Down Expand Up @@ -85,4 +94,7 @@ echo "$(date -u +%Y-%m-%dT%H:%M:%SZ)" > "$logs_dir/stage.pre-start"
)

{% endif %}
fi


{% endfor %}
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def kill_job(job_id: str) -> None:
"""Kill a SLURM job.

Args:
job_id: The job ID to kill.
job_id: The job ID (e.g., abc123.0) to kill.
"""
db = ExecutionDB()
job_data = db.get_job(job_id)
Expand All @@ -402,26 +402,31 @@ def kill_job(job_id: str) -> None:
f"Job {job_id} is not a slurm job (executor: {job_data.executor})"
)

killed_something = False

result = _kill_slurm_job(
# OPTIMIZATION: Query status AND kill in ONE SSH call
Comment thread
agronskiy marked this conversation as resolved.
slurm_status, result = _kill_slurm_job(
slurm_job_ids=[job_data.data.get("slurm_job_id")],
username=job_data.data.get("username"),
hostname=job_data.data.get("hostname"),
socket=job_data.data.get("socket"),
)

# Mark job as killed in database if kill succeeded
if result.returncode == 0:
killed_something = True

# Mark job as killed in database if we killed something
if killed_something:
job_data.data["killed"] = True
db.write_job(job_data)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure, Do you think it may affect anything?

I see:

        record = asdict(job)
        try:
            with open(EXEC_DB_FILE, "a") as f:
                f.write(json.dumps(record) + "\n")

I think it does not append

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a" stays for append. I'm thinking that we get the duplicate entries in this jsonl and by happy coincidence when reading it back on another run, the latest "wins"?

else:
raise RuntimeError(
f"Could not find or kill job {job_id} (slurm_job_id: {job_data.data.get('slurm_job_id')})"
# Use the pre-fetched status for better error message
current_status = None
if slurm_status:
current_status = SlurmExecutor._map_slurm_state_to_execution_state(
slurm_status
)
error_msg = BaseExecutor.get_kill_failure_message(
job_id,
f"slurm_job_id: {job_data.data.get('slurm_job_id')}",
current_status,
)
raise RuntimeError(error_msg)


def _create_slurm_sbatch_script(
Expand Down Expand Up @@ -880,34 +885,47 @@ def _query_slurm_jobs_status(

def _kill_slurm_job(
slurm_job_ids: List[str], username: str, hostname: str, socket: str | None
) -> None:
"""Kill a SLURM job.
) -> tuple[str | None, subprocess.CompletedProcess]:
"""Kill a SLURM job, querying status first in one SSH call for efficiency.

Args:
slurm_job_ids: List of SLURM job IDs to kill.
username: SSH username.
hostname: SSH hostname.
socket: control socket location or None

Returns:
Tuple of (status_string, completed_process) where status_string is the SLURM status or None
"""
if len(slurm_job_ids) == 0:
return {}
kill_command = "scancel {}".format(",".join(slurm_job_ids))
return None, subprocess.CompletedProcess(args=[], returncode=0)

jobs_str = ",".join(slurm_job_ids)
# Combine both commands in one SSH call: query THEN kill
combined_command = (
f"sacct -j {jobs_str} --format='JobID,State%32' --noheader -P 2>/dev/null; "
f"scancel {jobs_str}"
)

ssh_command = ["ssh"]
if socket is not None:
ssh_command.append(f"-S {socket}")
ssh_command.append(f"{username}@{hostname}")
ssh_command.append(kill_command)
ssh_command.append(combined_command)
ssh_command = " ".join(ssh_command)

completed_process = subprocess.run(
args=shlex.split(ssh_command), capture_output=True
)
if completed_process.returncode != 0:
raise RuntimeError(
"failed to kill slurm job\n{}".format(
completed_process.stderr.decode("utf-8")
)
)
return completed_process

# Parse the sacct output (before scancel runs)
sacct_output = completed_process.stdout.decode("utf-8")
sacct_output_lines = sacct_output.strip().split("\n")
slurm_status = None
if sacct_output_lines and len(slurm_job_ids) == 1:
slurm_status = _parse_slurm_job_status(slurm_job_ids[0], sacct_output_lines)

return slurm_status, completed_process


def _parse_slurm_job_status(slurm_job_id: str, sacct_output_lines: List[str]) -> str:
Expand Down
Loading
Loading