Skip to content

Commit ae00e84

Browse files
committed
Simplify JobAlreadyExists and fix stale job ID under auth
JobAlreadyExists no longer carries a Job object — the client-side job_id could differ from the server's canonical ID when auth rewrites the user segment, making the Job handle point at a nonexistent job. Instead, Fray now passes existing_job_policy=KEEP when adopt_existing is True, so the server returns the canonical ID via LaunchJobResponse on the success path. No error-path workaround needed.
1 parent e358984 commit ae00e84

3 files changed

Lines changed: 6 additions & 8 deletions

File tree

lib/fray/src/fray/v2/iris_backend.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ def submit(self, request: JobRequest, adopt_existing: bool = True) -> IrisJobHan
508508
replicas = request.replicas or 1
509509
coscheduling = resolve_coscheduling(request.resources.device, replicas)
510510

511+
policy = cluster_pb2.EXISTING_JOB_POLICY_KEEP if adopt_existing else cluster_pb2.EXISTING_JOB_POLICY_UNSPECIFIED
511512
try:
512513
job = self._iris.submit(
513514
entrypoint=iris_entrypoint,
@@ -519,12 +520,10 @@ def submit(self, request: JobRequest, adopt_existing: bool = True) -> IrisJobHan
519520
replicas=replicas,
520521
max_retries_failure=request.max_retries_failure,
521522
max_retries_preemption=request.max_retries_preemption,
523+
existing_job_policy=policy,
522524
)
523525
except IrisJobAlreadyExists as e:
524-
if adopt_existing:
525-
logger.info("Job %s already exists, adopting existing job", request.name)
526-
return IrisJobHandle(e.job)
527-
raise FrayJobAlreadyExists(request.name, handle=IrisJobHandle(e.job)) from e
526+
raise FrayJobAlreadyExists(request.name) from e
528527
return IrisJobHandle(job)
529528

530529
def host_actor(

lib/iris/src/iris/client/client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ def __init__(self, job_id: JobName, status: cluster_pb2.JobStatus):
132132
class JobAlreadyExists(Exception):
133133
"""Raised when a job with the same name is already running."""
134134

135-
def __init__(self, job: "Job", message: str):
136-
self.job = job
135+
def __init__(self, message: str):
137136
super().__init__(message)
138137

139138

@@ -723,7 +722,7 @@ def submit(
723722
)
724723
except ConnectError as e:
725724
if e.code == Code.ALREADY_EXISTS:
726-
raise JobAlreadyExists(Job(self, job_id), str(e)) from e
725+
raise JobAlreadyExists(str(e)) from e
727726
raise
728727

729728
return Job(self, canonical_id)

scripts/iris/dev_tpu.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ def allocate(
521521
constraints=constraints or None,
522522
)
523523
except JobAlreadyExists as exc:
524-
raise click.ClickException(f"Job already exists for session '{session_name}': {exc.job.job_id}") from exc
524+
raise click.ClickException(f"Job already exists for session '{session_name}': {exc}") from exc
525525

526526
workers = wait_for_workers(job, timeout=timeout, project=project)
527527
for worker in workers:

0 commit comments

Comments
 (0)