Design document for Job Launcher and Job Handle (TA: NVFlare developers)#4282
Design document for Job Launcher and Job Handle (TA: NVFlare developers)#4282IsaacYangSLA wants to merge 1 commit intoNVIDIA:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new design document describing NVFlare’s JobLauncherSpec / JobHandleSpec abstractions and the Process/Docker/K8s launcher implementations used by the server and client runtime.
Changes:
- Introduces a comprehensive design doc for job launching/handling interfaces and lifecycle flow.
- Documents launcher selection via
BEFORE_JOB_LAUNCHevent and how return codes are resolved. - Describes Process/Docker/K8s launcher implementations and example configurations.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| **Abort path** (`_terminate_job`): | ||
|
|
||
| 1. Wait up to 10 seconds for the child to exit gracefully (polling `job_handle.poll()`). | ||
| 2. Call `job_handle.terminate()`. | ||
|
|
There was a problem hiding this comment.
The client abort path is described as polling job_handle.poll() for up to 10 seconds before terminating, but _terminate_job() currently waits (by checking run_processes removal) and then unconditionally calls job_handle.terminate()—it does not poll the handle. Please update this section to reflect the current behavior, or adjust the implementation if polling is the intended design.
Greptile SummaryThis PR introduces the Key concerns:
Confidence Score: 1/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant UL as Upper Layer (ServerEngine / ClientExecutor)
participant GJL as get_job_launcher()
participant EV as Event System (BEFORE_JOB_LAUNCH)
participant JL as JobLauncherSpec (concrete)
participant JH as JobHandleSpec (concrete)
participant BG as Background Thread
UL->>UL: build JOB_PROCESS_ARGS in FLContext
UL->>GJL: get_job_launcher(job_meta, fl_ctx)
GJL->>EV: fire BEFORE_JOB_LAUNCH
EV-->>JL: handle_event → add_launcher(self, fl_ctx)
GJL-->>UL: return first registered launcher
UL->>JL: launch_job(job_meta, fl_ctx)
JL->>JH: create handle (Process / Docker / K8s)
JL->>JH: enter_states([RUNNING], timeout)
JL-->>UL: return job_handle (or None on failure)
UL->>BG: spawn thread → job_handle.wait()
BG->>JH: wait() — blocks until SUCCEEDED / TERMINATED
alt job aborted
UL->>JH: terminate()
JH-->>UL: None
end
BG->>JH: poll() → JobReturnCode
BG-->>UL: notify completion with return code
|
| | Method | Signature | Semantics | | ||
| |--------|-----------|-----------| | ||
| | `terminate()` | `() -> None` | Stop the job immediately. | | ||
| | `poll()` | `() -> JobReturnCode` | Non-blocking query for the job's current return code. Returns `UNKNOWN` while still running. | |
There was a problem hiding this comment.
Why the running instance return "UNKNOWN" instead of "RUNNING" ?
There was a problem hiding this comment.
This is the 'exit code' of a handle. In process case, before the process exits, there is no exit code and it will be UNKNOWN. If the process exits normally, the exit code will be 0. The high-level status code will be SUCCESS. In k8s, there is no clear exit code. So once we detect POD is in SUCCEEDED or TERMINATED state, we know it exits and then we set the high-level status code (SUCCESS or TERMINATED).
00c9d80 to
c9657bb
Compare
| def allocate_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext) -> dict: | ||
| return True |
There was a problem hiding this comment.
allocate_resources returns bool instead of dict
ResourceManagerSpec.allocate_resources is declared -> dict (it should return the allocated resources for later de-allocation). Returning True here violates the type contract; callers that try to iterate or look up keys in the returned value will raise a TypeError or AttributeError.
For a bare-environment no-op implementation the correct stub value is an empty dict:
| def allocate_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext) -> dict: | |
| return True | |
| def allocate_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext) -> dict: | |
| return {} |
|
|
||
| | Docker Status | JobReturnCode | | ||
| |---------------|---------------| | ||
| | `created` | `UNKNOWN` | |
There was a problem hiding this comment.
why status can't reflect the real status ?
| Pod phase mapping: | ||
|
|
||
| | Pod Phase | JobState | JobReturnCode | | ||
| |-----------|----------|---------------| |
There was a problem hiding this comment.
same where why can't the status starting,running instead of unknown
There was a problem hiding this comment.
There are two concepts. One is the state the job is in. The other is once the job exits, what exit code (status code is returns. The POD only has states, so we have to map the state to both concepts.
| start = time.time() | ||
| while not self.cell: | ||
| self.logger.info("Waiting for the client cell to be created.") | ||
| if time.time() - start > 15.0: | ||
| if time.time() - start > 600: | ||
| raise RuntimeError("Client cell could not be created. Failed to login the client.") |
There was a problem hiding this comment.
Cell-creation timeout increased from 15 s → 600 s, sleep from 0.5 s → 10 s
The new values change the failure-detection latency from ~15 seconds to up to 10 minutes. While longer startup times are expected in K8s environments, this change affects all environments (process and Docker launchers included). A client whose cell genuinely fails to initialize (network misconfiguration, port conflict, etc.) will now block silently for 10 minutes before surfacing an error, which significantly degrades the developer/operator experience.
Consider either:
- making the timeout configurable via a constructor parameter so that each deployment can tune it, or
- only applying the longer timeout on the K8s code path (e.g., gated by a flag or a subclass override).
There was a problem hiding this comment.
these are needed default for large models, Docker and K8s env can overwrite with shorter timeout if needed. @greptileai
There was a problem hiding this comment.
Understood that large models need longer initialization, but the current implementation doesn't support overriding this timeout—it's hardcoded at 600s in client_registration.
The Communicator.__init__ already has a timeout parameter (used for message timeouts), but the cell-creation timeout is a separate hardcoded value. If the design intent is to allow environment-specific overrides, consider:
| start = time.time() | |
| while not self.cell: | |
| self.logger.info("Waiting for the client cell to be created.") | |
| if time.time() - start > 15.0: | |
| if time.time() - start > 600: | |
| raise RuntimeError("Client cell could not be created. Failed to login the client.") | |
| def __init__( | |
| self, | |
| client_config=None, | |
| secure_train=False, | |
| client_state_processors: Optional[List[Filter]] = None, | |
| compression=None, | |
| cell: Cell = None, | |
| client_register_interval=2, | |
| timeout=5.0, | |
| maint_msg_timeout=5.0, | |
| cell_creation_timeout=600.0, | |
| ): |
Then use self.cell_creation_timeout in the registration loop. This would actually enable the configurability you're describing.
| if job_config.get("module_args") is None: | ||
| self.container_args_module_args_dict = DEFAULT_CONTAINER_ARGS_MODULE_ARGS_DICT | ||
| else: | ||
| self.container_args_module_args_dict = job_config.get("module_args") | ||
| self.container_args_module_args_dict_as_list = list() | ||
| for k, v in self.container_args_module_args_dict.items(): | ||
| self.container_args_module_args_dict_as_list.append(k) | ||
| self.container_args_module_args_dict_as_list.append(v) |
There was a problem hiding this comment.
None values in pod container args when default module args are used
When job_config.get("module_args") is None, DEFAULT_CONTAINER_ARGS_MODULE_ARGS_DICT is used. Its values are all None:
DEFAULT_CONTAINER_ARGS_MODULE_ARGS_DICT = {"-m": None, "-w": None, ...}The loop on lines 144–147 appends both keys and their None values to container_args_module_args_dict_as_list:
self.container_args_module_args_dict_as_list.append(k) # "-m"
self.container_args_module_args_dict_as_list.append(v) # NoneThis produces a list like ["-m", None, "-w", None, ...] that is later placed into container["args"]. The Kubernetes API will reject a pod spec whose args array contains null entries, causing pod creation to fail immediately. The default dict should either not be used (raise an error if module_args is missing) or its None values should be filtered out before constructing the args list:
for k, v in self.container_args_module_args_dict.items():
if v is not None:
self.container_args_module_args_dict_as_list.append(k)
self.container_args_module_args_dict_as_list.append(v)0b592ec to
246d3a9
Compare
| try: | ||
| self.core_v1.create_namespaced_pod(body=job_handle.get_manifest(), namespace=self.namespace) | ||
| if job_handle.enter_states([JobState.RUNNING], timeout=self.timeout): | ||
| return job_handle | ||
| else: | ||
| job_handle.terminate() | ||
| return None | ||
| except ApiException: | ||
| self.core_v1.create_namespaced_pod(body=pod_manifest, namespace=self.namespace) | ||
| job_handle.enter_states([JobState.RUNNING], timeout=self.timeout) | ||
| return job_handle | ||
| except ApiException as e: | ||
| job_handle.terminate() | ||
| return None | ||
| return job_handle |
There was a problem hiding this comment.
enter_states timeout not handled — pod-not-RUNNING path silently returns a handle
The previous implementation explicitly checked the return value of enter_states([JobState.RUNNING], ...) and returned None on timeout:
# old code
if job_handle.enter_states([JobState.RUNNING], timeout=self.timeout):
return job_handle
else:
job_handle.terminate()
return NoneThe new code drops that check entirely:
job_handle.enter_states([JobState.RUNNING], timeout=self.timeout)
return job_handle # returned even when enter_states() returned False (timeout)enter_states returns False when timeout is not None and the pod never reaches RUNNING within that window (see K8sJobHandle.enter_states, line 177–178). The caller (ServerEngine / ClientExecutor) stores the returned handle and starts a background thread calling handle.wait(), which would then loop indefinitely on a stuck/unschedulable pod. Meanwhile, the engine believes the job is running normally.
Consider restoring the explicit check, for example:
if not job_handle.enter_states([JobState.RUNNING], timeout=self.timeout):
job_handle.terminate()
return None
return job_handle| with open(data_pvc_file_path, "rt") as f: | ||
| data_pvc_dict = yaml.safe_load(f) | ||
| # data_pvc_dict will be pvc: mountPath | ||
| # currently, support one pvc and always mount to /var/tmp/nvflare/data | ||
| # ie, ignore the mountPath in data_pvc_dict | ||
| self.data_pvc = list(data_pvc_dict.keys())[0] |
There was a problem hiding this comment.
data_pvc_dict can be None or empty, causing an unhandled crash
yaml.safe_load() returns None when the file is empty. Calling list(None.keys()) will raise AttributeError. Even if the file is non-empty but has no top-level keys (e.g., just a YAML comment), data_pvc_dict will be None or {}, and list({}.keys())[0] raises IndexError. Neither case is caught, so the launcher crashes at construction time with an opaque traceback rather than a clear error message.
A simple guard would surface the misconfiguration immediately:
with open(data_pvc_file_path, "rt") as f:
data_pvc_dict = yaml.safe_load(f)
if not data_pvc_dict:
raise ValueError(f"data_pvc_file_path '{data_pvc_file_path}' is empty or contains no PVC entries.")
self.data_pvc = list(data_pvc_dict.keys())[0]Job Launcher for K8s environement GPU, image, pvc updated and working Add codes Fix coding style issues
246d3a9 to
80cb031
Compare
| "imagePullPolicy": "Always", | ||
| } | ||
| ] | ||
| self.container_args_python_args_list = ["-u", "-m", job_config.get("command")] |
There was a problem hiding this comment.
None silently injected into python args when command is absent
job_config.get("command") returns None if the key is missing, producing ["-u", "-m", None] in container_args_python_args_list. This list is later concatenated directly into container["args"] (line 155–159), which the Kubernetes API will reject because pod args cannot contain null entries.
While K8sJobLauncher.launch_job always passes an explicit "command" key, anyone constructing K8sJobHandle directly (or writing a subclass) will hit this silently. Consider adding an explicit guard:
command = job_config.get("command")
if not command:
raise ValueError("job_config must contain a non-empty 'command' key")
self.container_args_python_args_list = ["-u", "-m", command]| def allocate_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext) -> dict: | ||
| return True |
There was a problem hiding this comment.
allocate_resources return type violates ResourceManagerSpec contract
ResourceManagerSpec.allocate_resources is declared -> dict and callers are expected to iterate or store the returned dict for later de-allocation via free_resources. Returning True here breaks the type contract; any caller that passes the return value to free_resources(resources, ...) will receive a TypeError when it tries to iterate over a boolean.
For a bare-environment no-op, the correct stub is an empty dict:
| def allocate_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext) -> dict: | |
| return True | |
| return {} |
| self._stuck_count = -10 | ||
| self._max_stuck_count = self.timeout |
There was a problem hiding this comment.
Magic number _stuck_count = -10 unexplained
_stuck_count is initialized to -10, which means the first 10 times a pod is observed in the Pending phase do not count toward the stuck threshold. This acts as an implicit grace period, but the logic is undocumented. Readers must reverse-engineer the intent. Consider extracting this into a named constant or constructor parameter with a comment explaining the grace-period semantics:
_STUCK_COUNT_GRACE_PERIOD = -10 # allow 10 Pending-phase polls before declaring stuck
# in __init__:
self._stuck_count = _STUCK_COUNT_GRACE_PERIODNote: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Description
The abstract layers for JobLauncher/JobHandle to support three environments, and how the upper layers (client and server) call the concrete classes to launch job.
Types of changes
./runtest.sh.