diff --git a/docs/design/JobLauncher_and_JobHandle.md b/docs/design/JobLauncher_and_JobHandle.md new file mode 100644 index 0000000000..48fea1d147 --- /dev/null +++ b/docs/design/JobLauncher_and_JobHandle.md @@ -0,0 +1,605 @@ +# JobLauncher and JobHandle Design Document + +## 1. Overview + +NVFlare runs each federated job as an isolated execution unit -- a subprocess, Docker container, or Kubernetes pod. Two abstractions govern this: + +- **JobLauncherSpec** -- starts a job and returns a handle. +- **JobHandleSpec** -- represents the running job and provides lifecycle control (poll, wait, terminate). + +The upper layers (server engine, client executor) program exclusively against these two interfaces. The concrete backend is selected at runtime through an event-based mechanism, so the engine never imports or names a specific launcher type. + +``` +┌──────────────────────────────────────────────────────────┐ +│ Upper Layer │ +│ ServerEngine / ClientExecutor │ +│ │ +│ 1. Build JOB_PROCESS_ARGS │ +│ 2. get_job_launcher(job_meta, fl_ctx) → launcher │ +│ 3. launcher.launch_job(job_meta, fl_ctx) → job_handle │ +│ 4. job_handle.wait() / job_handle.terminate() │ +└──────────┬──────────────────────┬────────────────────────┘ + │ BEFORE_JOB_LAUNCH │ + │ event selects one │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ ProcessJob │ │ DockerJob │ │ K8sJob │ +│ Launcher │ │ Launcher │ │ Launcher │ +│ ─────────────── │ │ ─────────────── │ │ ─────────────── │ +│ ProcessHandle │ │ DockerJobHandle │ │ K8sJobHandle │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ + subprocess container pod +``` + +--- + +## 2. Specification Layer (`nvflare/apis/job_launcher_spec.py`) + +### 2.1 JobHandleSpec + +Abstract base class representing a running job. All methods are `@abstractmethod`. + +| Method | Signature | Semantics | +|--------|-----------|-----------| +| `terminate()` | `() -> None` | Stop the job immediately. | +| `poll()` | `() -> JobReturnCode` | Non-blocking query for the job's current return code. Returns `UNKNOWN` while still running. | +| `wait()` | `() -> None` | Block until the job finishes (or is terminated). | + +### 2.2 JobLauncherSpec + +Abstract base class for launching jobs. Extends `FLComponent`, which gives it access to the event system. + +| Method | Signature | Semantics | +|--------|-----------|-----------| +| `launch_job(job_meta, fl_ctx)` | `(dict, FLContext) -> JobHandleSpec` | Start a job and return its handle. Returns `None` on failure. | + +### 2.3 Supporting Types + +**JobProcessArgs** -- String constants for the keys the upper layer places in `FLContextKey.JOB_PROCESS_ARGS`. These are the standardized parameters every job process needs (workspace path, auth token, job ID, parent URL, etc.). + +| Constant | Value | Used by | +|----------|-------|---------| +| `EXE_MODULE` | `"exe_module"` | Server, Client | +| `WORKSPACE` | `"workspace"` | Server, Client | +| `STARTUP_DIR` | `"startup_dir"` | Client | +| `APP_ROOT` | `"app_root"` | Server | +| `AUTH_TOKEN` | `"auth_token"` | Client | +| `TOKEN_SIGNATURE` | `"auth_signature"` | Server, Client | +| `SSID` | `"ssid"` | Server, Client | +| `JOB_ID` | `"job_id"` | Server, Client | +| `CLIENT_NAME` | `"client_name"` | Client | +| `ROOT_URL` | `"root_url"` | Server | +| `PARENT_URL` | `"parent_url"` | Server, Client | +| `PARENT_CONN_SEC` | `"parent_conn_sec"` | Client | +| `SERVICE_HOST` | `"service_host"` | Server | +| `SERVICE_PORT` | `"service_port"` | Server | +| `HA_MODE` | `"ha_mode"` | Server | +| `TARGET` | `"target"` | Client | +| `SCHEME` | `"scheme"` | Client | +| `STARTUP_CONFIG_FILE` | `"startup_config_file"` | Server, Client | +| `RESTORE_SNAPSHOT` | `"restore_snapshot"` | Server | +| `OPTIONS` | `"options"` | Server, Client | + +**JobReturnCode** -- Standard exit semantics (extends `ProcessExitCode`): + +| Code | Value | Meaning | +|------|-------|---------| +| `SUCCESS` | 0 | Job completed successfully. | +| `EXECUTION_ERROR` | 1 | Job failed during execution. | +| `ABORTED` | 9 | Job was terminated/aborted. | +| `UNKNOWN` | 127 | Status cannot be determined (still running, or lost). | + +**`add_launcher(launcher, fl_ctx)`** -- Appends a launcher to the `FLContextKey.JOB_LAUNCHER` list on `fl_ctx`. Called by launchers during the `BEFORE_JOB_LAUNCH` event to volunteer for the current job. + +--- + +## 3. How the Upper Layer Uses Launchers + +### 3.1 Event-Based Launcher Selection + +The engine never directly instantiates a launcher. Instead, it calls `get_job_launcher()` from `nvflare/private/fed/utils/fed_utils.py`: + +```python +def get_job_launcher(job_meta, fl_ctx) -> JobLauncherSpec: + engine = fl_ctx.get_engine() + with engine.new_context() as job_launcher_ctx: + job_launcher_ctx.remove_prop(FLContextKey.JOB_LAUNCHER) + job_launcher_ctx.set_prop(FLContextKey.JOB_META, job_meta, ...) + engine.fire_event(EventType.BEFORE_JOB_LAUNCH, job_launcher_ctx) + job_launcher = job_launcher_ctx.get_prop(FLContextKey.JOB_LAUNCHER) + ... + return job_launcher[0] +``` + +Every registered `FLComponent` receives the `BEFORE_JOB_LAUNCH` event. Each launcher inspects `job_meta` and, if it can handle the job, calls `add_launcher(self, fl_ctx)`. The first launcher to register wins. + +**Selection rule in practice:** + +| Condition | Launcher selected | +|-----------|-------------------| +| `extract_job_image(job_meta, site_name)` returns `None` | **ProcessJobLauncher** (no container image → run as subprocess) | +| `extract_job_image(job_meta, site_name)` returns an image | **DockerJobLauncher** or **K8sJobLauncher** (whichever is configured as a component) | + +### 3.2 Server Side (`ServerEngine`) + +Location: `nvflare/private/fed/server/server_engine.py` + +``` +_start_runner_process(job, job_clients, snapshot, fl_ctx) +│ +├─ 1. Build job_args dict with server-specific JobProcessArgs +│ (WORKSPACE, APP_ROOT, PARENT_URL, AUTH_TOKEN, HA_MODE, ...) +│ +├─ 2. fl_ctx.set_prop(FLContextKey.JOB_PROCESS_ARGS, job_args) +│ +├─ 3. job_launcher = get_job_launcher(job.meta, fl_ctx) +│ +├─ 4. job_handle = job_launcher.launch_job(job.meta, fl_ctx) +│ +├─ 5. Store in run_processes[job_id][RunProcessKey.JOB_HANDLE] +│ +└─ 6. Start background thread → wait_for_complete(workspace, job_id, job_handle) + │ + ├─ job_handle.wait() # blocks until job finishes + └─ get_return_code(job_handle, job_id, workspace, logger) +``` + +**Abort path** (`abort_app_on_server`): + +1. Attempt to send an abort command to the child via the cell messaging system. +2. On failure, retrieve `job_handle` from `run_processes` and call `job_handle.terminate()`. + +### 3.3 Client Side (`ClientExecutor`) + +Location: `nvflare/private/fed/client/client_executor.py` + +``` +start_app(job_id, job_meta, ...) +│ +├─ 1. Build job_args dict with client-specific JobProcessArgs +│ (WORKSPACE, STARTUP_DIR, CLIENT_NAME, PARENT_URL, AUTH_TOKEN, ...) +│ +├─ 2. fl_ctx.set_prop(FLContextKey.JOB_PROCESS_ARGS, job_args) +│ +├─ 3. job_launcher = get_job_launcher(job_meta, fl_ctx) +│ +├─ 4. job_handle = job_launcher.launch_job(job_meta, fl_ctx) +│ +├─ 5. Store in run_processes[job_id][RunProcessKey.JOB_HANDLE] +│ +└─ 6. Start background thread → _wait_child_process_finish(...) + │ + ├─ job_handle.wait() + └─ get_return_code(job_handle, job_id, workspace, logger) +``` + +**Abort path** (`_terminate_job`): + +1. Wait up to 10 seconds for the child to exit gracefully (polling `job_handle.poll()`). +2. Call `job_handle.terminate()`. + +### 3.4 Return Code Resolution + +`get_return_code()` in `fed_utils.py` uses a two-tier strategy: + +1. **File-based** -- Check for `FLMetaKey.PROCESS_RC_FILE` in the job's run directory. The child process writes its own return code to this file before exiting. This is the preferred source because it carries the child's own assessment. +2. **Handle-based** -- Fall back to `job_handle.poll()`, which maps the underlying execution unit's status to a `JobReturnCode`. + +--- + +## 4. The Three Implementations + +### 4.1 Process Launcher (Subprocess) + +**Files:** + +| File | Class | +|------|-------| +| `nvflare/app_common/job_launcher/process_launcher.py` | `ProcessHandle`, `ProcessJobLauncher` | +| `nvflare/app_common/job_launcher/server_process_launcher.py` | `ServerProcessJobLauncher` | +| `nvflare/app_common/job_launcher/client_process_launcher.py` | `ClientProcessJobLauncher` | + +**Class hierarchy:** + +``` +JobHandleSpec + └── ProcessHandle + +JobLauncherSpec (FLComponent) + └── ProcessJobLauncher + ├── ServerProcessJobLauncher + └── ClientProcessJobLauncher +``` + +#### ProcessHandle + +Wraps a `ProcessAdapter` (from `nvflare/utils/process_utils.py`) that manages a `subprocess.Popen` or a PID. + +| Method | Implementation | +|--------|---------------| +| `terminate()` | Delegates to `adapter.terminate()` (sends SIGTERM/SIGKILL). | +| `poll()` | Calls `adapter.poll()`. Maps exit code 0 → `SUCCESS`, 1 → `EXECUTION_ERROR`, 9 → `ABORTED`, other → `EXECUTION_ERROR`, `None` → `UNKNOWN`. | +| `wait()` | Delegates to `adapter.wait()` (blocks on `subprocess.Popen.wait()`). | + +#### ProcessJobLauncher + +| Step | Action | +|------|--------| +| 1 | Copy `os.environ` and add `app_custom_folder` to `PYTHONPATH`. | +| 2 | Call `self.get_command(job_meta, fl_ctx)` (abstract -- implemented by server/client subclasses). | +| 3 | Parse command with `shlex.split()`, spawn the process via `spawn_process(argv, new_env)`. | +| 4 | Return `ProcessHandle(process_adapter=...)`. | + +**Event registration:** + +```python +def handle_event(self, event_type, fl_ctx): + if event_type == EventType.BEFORE_JOB_LAUNCH: + job_image = extract_job_image(job_meta, fl_ctx.get_identity_name()) + if not job_image: # no container image → use subprocess + add_launcher(self, fl_ctx) +``` + +**Server/Client subclasses** only override `get_command()`: + +- `ServerProcessJobLauncher.get_command()` → `generate_server_command(fl_ctx)` → `python -m -w ...` +- `ClientProcessJobLauncher.get_command()` → `generate_client_command(fl_ctx)` → `python -m -w -n ...` + +--- + +### 4.2 Docker Launcher + +**File:** `nvflare/app_opt/job_launcher/docker_launcher.py` + +**Class hierarchy:** + +``` +JobHandleSpec + └── DockerJobHandle + +JobLauncherSpec (FLComponent) + └── DockerJobLauncher + ├── ClientDockerJobLauncher + └── ServerDockerJobLauncher +``` + +#### DockerJobHandle + +Wraps a Docker SDK `Container` object. + +| Method | Implementation | +|--------|---------------| +| `terminate()` | `container.stop()`. | +| `poll()` | Re-fetches container via `docker.from_env().containers.get(id)`. Maps status: `EXITED` → `SUCCESS`, `DEAD` → `ABORTED`, all others → `UNKNOWN`. Removes the container on terminal states. | +| `wait()` | `enter_states([EXITED, DEAD], timeout)` -- polls container status in a 1-second loop until a terminal state is reached. | + +Docker container states and their mappings: + +| Docker Status | JobReturnCode | +|---------------|---------------| +| `created` | `UNKNOWN` | +| `restarting` | `UNKNOWN` | +| `running` | `UNKNOWN` | +| `paused` | `UNKNOWN` | +| `exited` | `SUCCESS` | +| `dead` | `ABORTED` | + +#### DockerJobLauncher + +Constructor parameters: + +| Parameter | Default | Purpose | +|-----------|---------|---------| +| `mount_path` | `"/workspace"` | Container-side mount point for the host workspace. | +| `network` | `"nvflare-network"` | Docker network the container joins. | +| `timeout` | `None` | Maximum seconds to wait for the container to reach `RUNNING`. | + +Launch sequence: + +| Step | Action | +|------|--------| +| 1 | Extract `job_image` from `job_meta` via `extract_job_image()`. | +| 2 | Build `PYTHONPATH` with `app_custom_folder`. | +| 3 | Call `self.get_command(job_meta, fl_ctx)` → `(container_name, command_string)`. | +| 4 | Read `NVFL_DOCKER_WORKSPACE` env var for the host-side workspace path. | +| 5 | `docker_client.containers.run(image, command, name, network, volumes, detach=True)`. | +| 6 | `DockerJobHandle(container).enter_states([RUNNING], timeout)`. | +| 7 | If timeout or error → `handle.terminate()`, return `None`. Otherwise return handle. | + +**Event registration:** Same pattern as Process but with the opposite condition -- registers when `extract_job_image()` returns a truthy value. + +**Server/Client subclasses** override `get_command()`: + +- `ClientDockerJobLauncher` → returns `("{client_name}-{job_id}", generate_client_command(fl_ctx))`. +- `ServerDockerJobLauncher` → returns `("server-{job_id}", generate_server_command(fl_ctx))`. + +--- + +### 4.3 Kubernetes Launcher + +**File:** `nvflare/app_opt/job_launcher/k8s_launcher.py` + +**Class hierarchy:** + +``` +JobHandleSpec + └── K8sJobHandle + +JobLauncherSpec (FLComponent) + └── K8sJobLauncher + ├── ClientK8sJobLauncher + └── ServerK8sJobLauncher +``` + +#### K8sJobHandle + +Wraps a Kubernetes Pod managed through the `CoreV1Api`. + +| Method | Implementation | +|--------|---------------| +| `terminate()` | `delete_namespaced_pod(name, namespace, grace_period_seconds=0)` then `enter_states([TERMINATED], timeout)`. Sets `terminal_state = TERMINATED`. | +| `poll()` | If `terminal_state` is set, return it directly. Otherwise call `_query_state()` to read the pod phase and map it to `JobReturnCode`. | +| `wait()` | `enter_states([SUCCEEDED, TERMINATED])` -- polls pod phase in a 1-second loop. | + +Pod phase mapping: + +| Pod Phase | JobState | JobReturnCode | +|-----------|----------|---------------| +| `Pending` | `STARTING` | `UNKNOWN` | +| `Running` | `RUNNING` | `UNKNOWN` | +| `Succeeded` | `SUCCEEDED` | `SUCCESS` | +| `Failed` | `TERMINATED` | `ABORTED` | +| `Unknown` | `UNKNOWN` | `UNKNOWN` | + +**Stuck detection:** If a pod stays in `Pending` for more than `timeout` poll cycles (initialized to `-10` so it has a grace window), `_stuck()` returns `True` and `_query_state()` automatically calls `terminate()`. This prevents pods that cannot be scheduled from blocking indefinitely. + +#### K8sJobHandle Pod Manifest + +The handle constructs the pod manifest internally from a `job_config` dict: + +```yaml +apiVersion: v1 +kind: Pod +metadata: + name: +spec: + restartPolicy: Never + containers: + - name: container- + image: + command: ["/usr/local/bin/python"] + args: ["-u", "-m", "", "-w", "", ...] + volumeMounts: + - name: nvflws + mountPath: /var/tmp/nvflare/workspace + - name: nvfldata + mountPath: /var/tmp/nvflare/data + - name: nvfletc + mountPath: /var/tmp/nvflare/etc + resources: + limits: + nvidia.com/gpu: # omitted if None + imagePullPolicy: Always + volumes: + - name: nvflws + persistentVolumeClaim: + claimName: + - name: nvfldata + persistentVolumeClaim: + claimName: + - name: nvfletc + persistentVolumeClaim: + claimName: +``` + +#### K8sJobLauncher + +Constructor parameters: + +| Parameter | Purpose | +|-----------|---------| +| `config_file_path` | Path to kubeconfig file. Loaded via `config.load_kube_config()`. | +| `workspace_pvc` | PVC claim name for the NVFlare workspace. | +| `etc_pvc` | PVC claim name for configuration/etc data. | +| `data_pvc_file_path` | Path to a YAML file mapping PVC names to mount paths for training data. | +| `timeout` | Maximum seconds to wait for pod to reach `Running` (also used as stuck threshold). | +| `namespace` | Kubernetes namespace (default: `"default"`). | + +Launch sequence: + +| Step | Action | +|------|--------| +| 1 | Extract `job_image`, `site_name`, and optional `num_of_gpus` from `job_meta`. | +| 2 | Read `JOB_PROCESS_ARGS` from `fl_ctx`; extract `EXE_MODULE` as the container command. | +| 3 | Build `job_config` dict: name, image, container name, command, volume mounts/PVCs, `module_args` from `get_module_args()`, set list, GPU resources. | +| 4 | Create `K8sJobHandle(job_id, core_v1, job_config, namespace, timeout)` which builds the pod manifest. | +| 5 | `core_v1.create_namespaced_pod(body=pod_manifest, namespace)`. | +| 6 | `job_handle.enter_states([RUNNING], timeout)`. | +| 7 | On `ApiException` → `job_handle.terminate()` then return the handle (caller treats as failed). | + +**Server/Client subclasses** override `get_module_args()`: + +- `ClientK8sJobLauncher` → Filters `JOB_PROCESS_ARGS` through `get_client_job_args(include_exe_module=False, include_set_options=False)` to produce the dict of `-flag value` pairs for the container args list. +- `ServerK8sJobLauncher` → Same pattern with `get_server_job_args(...)`. + +**Key difference from Process/Docker:** The K8s launcher does not build a shell command string. Instead, it passes the Python executable as `command` and constructs a structured `args` list (`["-u", "-m", "", "-w", "", ...]`) directly in the pod spec. + +--- + +## 5. Object-Oriented Design Summary + +### 5.1 Full Class Hierarchy + +``` +JobHandleSpec (abstract) +├── ProcessHandle (wraps ProcessAdapter / subprocess.Popen) +├── DockerJobHandle (wraps docker.Container) +└── K8sJobHandle (wraps CoreV1Api + pod name) + +JobLauncherSpec (abstract, extends FLComponent) +├── ProcessJobLauncher (abstract: get_command) +│ ├── ServerProcessJobLauncher +│ └── ClientProcessJobLauncher +├── DockerJobLauncher (abstract: get_command) +│ ├── ServerDockerJobLauncher +│ └── ClientDockerJobLauncher +└── K8sJobLauncher (abstract: get_module_args) + ├── ServerK8sJobLauncher + └── ClientK8sJobLauncher +``` + +### 5.2 Design Patterns + +**Strategy Pattern** -- Each launcher is a strategy for running jobs. The engine programs against `JobLauncherSpec`; the concrete strategy is selected at runtime through the event system. + +**Template Method Pattern** -- Each base launcher (`ProcessJobLauncher`, `DockerJobLauncher`, `K8sJobLauncher`) implements `launch_job()` with a fixed algorithm, delegating the variable part to an abstract method: + +| Base Launcher | Template method calls | Abstract hook | +|---------------|----------------------|---------------| +| `ProcessJobLauncher` | `launch_job()` → `get_command()` | `get_command(job_meta, fl_ctx) -> str` | +| `DockerJobLauncher` | `launch_job()` → `get_command()` | `get_command(job_meta, fl_ctx) -> (str, str)` | +| `K8sJobLauncher` | `launch_job()` → `get_module_args()` | `get_module_args(job_id, fl_ctx) -> dict` | + +Server and client subclasses provide the implementation of these hooks, producing the correct command-line arguments for each role. + +**Observer Pattern** -- Launchers register for the `BEFORE_JOB_LAUNCH` event through the `FLComponent` event system. This decouples launcher registration from the engine's control flow entirely. + +--- + +## 6. Comparison: Process vs Docker vs Kubernetes + +| Aspect | Process | Docker | Kubernetes | +|--------|---------|--------|------------| +| **When selected** | No `job_image` for site | `job_image` present | `job_image` present | +| **Execution unit** | OS subprocess | Docker container | Kubernetes Pod | +| **Isolation** | Shared host, inherited env | Container isolation, mounted workspace | Pod isolation, PVC-backed volumes | +| **Command format** | Shell command string (`python -m ...`) | Shell command inside `/bin/bash -c` | Structured `command` + `args` list in pod spec | +| **Workspace access** | Direct filesystem (same host) | Host directory bind-mounted to container | PersistentVolumeClaims | +| **Data access** | Direct filesystem | Via bind mount | Via PVC (configured in YAML) | +| **Start verification** | None (spawn returns immediately) | Poll for `RUNNING` state with timeout | Poll for `Running` phase with timeout + stuck detection | +| **Wait mechanism** | `subprocess.Popen.wait()` (OS-level block) | Poll container status for `EXITED`/`DEAD` | Poll pod phase for `Succeeded`/`Terminated` | +| **Terminate** | `SIGTERM`/`SIGKILL` via `ProcessAdapter` | `container.stop()` | `delete_namespaced_pod(grace_period=0)` | +| **Return code source** | Process exit code or RC file | Container status mapping or RC file | Pod phase mapping or RC file | +| **GPU support** | Inherited from host environment | Not explicitly managed | `nvidia.com/gpu` resource limit in pod spec | +| **Dependencies** | stdlib only | `docker` Python SDK | `kubernetes` Python client + kubeconfig | +| **Typical use case** | Simulator, single-machine POC | Multi-container on single host | Production cluster with shared storage | + +--- + +## 7. Sequence Diagram + +The following shows the end-to-end flow for launching and managing a job, applicable to both server and client: + +``` + Engine fed_utils Launcher(s) Handle + │ │ │ │ + │ get_job_launcher() │ │ │ + │───────────────────────>│ │ │ + │ │ fire BEFORE_JOB_LAUNCH │ + │ │─────────────────────>│ │ + │ │ │ check job_meta │ + │ │ │ add_launcher(self) │ + │ │<─────────────────────│ │ + │ return launcher │ │ │ + │<───────────────────────│ │ │ + │ │ │ │ + │ launcher.launch_job(job_meta, fl_ctx) │ │ + │─────────────────────────────────────────────->│ │ + │ │ │ create exec unit │ + │ │ │ (process/container │ + │ │ │ /pod) │ + │ │ │─────────────────────>│ + │ │ │ return handle │ + │<─────────────────────────────────────────────────────────────────────│ + │ │ │ │ + │ store handle in run_processes │ │ + │ │ │ │ + │ [background thread] │ │ │ + │ handle.wait() │ │ │ + │─────────────────────────────────────────────────────────────────────>│ + │ │ │ blocks/polls │ + │ │ │ │ + │ ... (on abort) ... │ │ │ + │ handle.terminate() │ │ │ + │─────────────────────────────────────────────────────────────────────>│ + │ │ │ │ + │ ... (on completion) . │ │ │ + │ get_return_code() │ │ │ + │───────────────────────>│ │ │ + │ │ check RC file │ │ + │ │ or handle.poll() │ │ + │ │─────────────────────────────────────────────>│ + │ return_code │ │ │ + │<───────────────────────│ │ │ +``` + +--- + +## 8. Configuration + +Launchers are registered as FL components in the site's `resources.json`. The configurator loads them at startup so they receive events. + +### 8.1 Process Launcher (default) + +```json +{ + "id": "job_launcher", + "path": "nvflare.app_common.job_launcher.server_process_launcher.ServerProcessJobLauncher", + "args": {} +} +``` + +### 8.2 Docker Launcher + +```json +{ + "id": "job_launcher", + "path": "nvflare.app_opt.job_launcher.docker_launcher.ClientDockerJobLauncher", + "args": { + "mount_path": "/workspace", + "network": "nvflare-network", + "timeout": 60 + } +} +``` + +Requires the `NVFL_DOCKER_WORKSPACE` environment variable to be set on the host to identify the workspace directory to bind-mount. + +### 8.3 Kubernetes Launcher + +```json +{ + "id": "job_launcher", + "path": "nvflare.app_opt.job_launcher.k8s_launcher.ClientK8sJobLauncher", + "args": { + "config_file_path": "/path/to/kubeconfig", + "workspace_pvc": "nvflare-workspace-pvc", + "etc_pvc": "nvflare-etc-pvc", + "data_pvc_file_path": "/path/to/data_pvc.yaml", + "timeout": 120, + "namespace": "nvflare" + } +} +``` + +The `data_pvc_file_path` YAML file maps PVC names to mount paths: + +```yaml +my-data-pvc: /var/tmp/nvflare/data +``` + +--- + +## 9. Future Improvements + +1. **Explicit launcher selection** -- Today "has image" → Docker or K8s, "no image" → Process. Allow an explicit `launcher_type` field in job meta or deploy map so a site can support multiple container backends or provide fallback ordering (e.g., try K8s, fall back to Docker). + +2. **Consistent GPU handling** -- The K8s launcher reads `num_of_gpus` from the resource spec; the Docker and Process launchers do not. Standardize resource declaration so job definitions remain portable across backends. + +3. **Unified cleanup** -- Standardize container/pod cleanup policy across launchers (auto-remove on exit, configurable retention for debugging) and centralize it in the handle or engine. + +4. **Consistent timeout policy** -- The Process launcher has no start timeout while Docker and K8s both support configurable timeouts. Consider adding a start-timeout to the Process launcher and clearer error semantics when `launch_job()` returns `None` vs. a handle that immediately polls as failed. + +5. **Observability** -- Add an optional `get_info()` method to `JobHandleSpec` so the engine can log launcher-specific details (container ID, pod name, namespace, PID) for debugging and operations. + +6. **Testing** -- Provide `MockJobLauncher` and `MockJobHandle` implementations for unit tests that verify server/client flow without starting real processes or containers. diff --git a/nvflare/apis/job_launcher_spec.py b/nvflare/apis/job_launcher_spec.py index cb75130e6a..8fedfb866e 100644 --- a/nvflare/apis/job_launcher_spec.py +++ b/nvflare/apis/job_launcher_spec.py @@ -61,7 +61,7 @@ class JobHandleSpec: def terminate(self): """To terminate the job run. - Returns: the job run return code. + Returns: None """ raise NotImplementedError() @@ -94,7 +94,7 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec: job_meta: job metadata fl_ctx: FLContext - Returns: boolean to indicates the job launch success or fail. + Returns: a JobHandle instance. """ raise NotImplementedError() diff --git a/nvflare/app_common/resource_consumers/BE_resource_consumer.py b/nvflare/app_common/resource_consumers/BE_resource_consumer.py new file mode 100644 index 0000000000..08059ec621 --- /dev/null +++ b/nvflare/app_common/resource_consumers/BE_resource_consumer.py @@ -0,0 +1,21 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nvflare.apis.resource_manager_spec import ResourceConsumerSpec + + +class BEResourceConsumer(ResourceConsumerSpec): + + def consume(self, resources: dict): + pass diff --git a/nvflare/app_common/resource_managers/BE_resource_manager.py b/nvflare/app_common/resource_managers/BE_resource_manager.py new file mode 100644 index 0000000000..2a090dad76 --- /dev/null +++ b/nvflare/app_common/resource_managers/BE_resource_manager.py @@ -0,0 +1,40 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from nvflare.apis.fl_component import FLComponent +from nvflare.apis.fl_context import FLContext +from nvflare.apis.resource_manager_spec import ResourceManagerSpec + + +class BEResourceManager(ResourceManagerSpec, FLComponent): + def check_resources(self, resource_requirement: dict, fl_ctx: FLContext): + if not isinstance(resource_requirement, dict): + raise TypeError(f"resource_requirement should be of type dict, but got {type(resource_requirement)}.") + + token = str(uuid.uuid4()) + return True, token + + def cancel_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext): + return None + + def allocate_resources(self, resource_requirement: dict, token: str, fl_ctx: FLContext) -> dict: + return True + + def free_resources(self, resources: dict, token: str, fl_ctx: FLContext): + pass + + def report_resources(self, fl_ctx): + return True diff --git a/nvflare/app_common/resource_managers/gpu_resource_manager.py b/nvflare/app_common/resource_managers/gpu_resource_manager.py index f14f2f0ccd..56b32d9343 100644 --- a/nvflare/app_common/resource_managers/gpu_resource_manager.py +++ b/nvflare/app_common/resource_managers/gpu_resource_manager.py @@ -35,6 +35,7 @@ def __init__( num_gpu_key: str = "num_of_gpus", gpu_mem_key: str = "mem_per_gpu_in_GiB", expiration_period: Union[int, float] = 30, + ignore_host=False, ): """Resource manager for GPUs. @@ -62,17 +63,18 @@ def __init__( if expiration_period < 0: raise ValueError("expiration_period should be greater than or equal to 0.") - if num_of_gpus > 0: - num_host_gpus = len(get_host_gpu_ids()) - if num_of_gpus > num_host_gpus: - raise ValueError(f"num_of_gpus specified ({num_of_gpus}) exceeds available GPUs: {num_host_gpus}.") - - host_gpu_mem = get_host_gpu_memory_total() - for i in host_gpu_mem: - if mem_per_gpu_in_GiB * 1024 > i: - raise ValueError( - f"Memory per GPU specified ({mem_per_gpu_in_GiB * 1024}) exceeds available GPU memory: {i}." - ) + if not ignore_host: + if num_of_gpus > 0: + num_host_gpus = len(get_host_gpu_ids()) + if num_of_gpus > num_host_gpus: + raise ValueError(f"num_of_gpus specified ({num_of_gpus}) exceeds available GPUs: {num_host_gpus}.") + + host_gpu_mem = get_host_gpu_memory_total() + for i in host_gpu_mem: + if mem_per_gpu_in_GiB * 1024 > i: + raise ValueError( + f"Memory per GPU specified ({mem_per_gpu_in_GiB * 1024}) exceeds available GPU memory: {i}." + ) self.num_gpu_key = num_gpu_key self.gpu_mem_key = gpu_mem_key diff --git a/nvflare/app_opt/job_launcher/k8s_launcher.py b/nvflare/app_opt/job_launcher/k8s_launcher.py index 19e6716bc2..175fde591b 100644 --- a/nvflare/app_opt/job_launcher/k8s_launcher.py +++ b/nvflare/app_opt/job_launcher/k8s_launcher.py @@ -16,6 +16,7 @@ from abc import abstractmethod from enum import Enum +import yaml from kubernetes import config from kubernetes.client import Configuration from kubernetes.client.api import core_v1_api @@ -24,6 +25,7 @@ from nvflare.apis.event_type import EventType from nvflare.apis.fl_constant import FLContextKey, JobConstants from nvflare.apis.fl_context import FLContext +from nvflare.apis.job_def import JobMetaKey from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobProcessArgs, JobReturnCode, add_launcher from nvflare.utils.job_launcher_utils import extract_job_image, get_client_job_args, get_server_job_args @@ -36,12 +38,22 @@ class JobState(Enum): UNKNOWN = "unknown" +class POD_Phase(Enum): + PENDING = "Pending" + RUNNING = "Running" + SUCCEEDED = "Succeeded" + FAILED = "Failed" + TERMINATED = "Terminated" + UNKNOWN = "Unknown" + + POD_STATE_MAPPING = { - "Pending": JobState.STARTING, - "Running": JobState.RUNNING, - "Succeeded": JobState.SUCCEEDED, - "Failed": JobState.TERMINATED, - "Unknown": JobState.UNKNOWN, + POD_Phase.PENDING.value: JobState.STARTING, + POD_Phase.RUNNING.value: JobState.RUNNING, + POD_Phase.SUCCEEDED.value: JobState.SUCCEEDED, + POD_Phase.FAILED.value: JobState.TERMINATED, + POD_Phase.TERMINATED.value: JobState.TERMINATED, + POD_Phase.UNKNOWN.value: JobState.UNKNOWN, } JOB_RETURN_CODE_MAPPING = { @@ -52,13 +64,39 @@ class JobState(Enum): JobState.UNKNOWN: JobReturnCode.UNKNOWN, } +DEFAULT_CONTAINER_ARGS_MODULE_ARGS_DICT = { + "-m": None, + "-w": None, + "-t": None, + "-d": None, + "-n": None, + "-c": None, + "-p": None, + "-g": None, + "-scheme": None, + "-s": None, +} + + +class PV_NAME(Enum): + WORKSPACE = "nvflws" + DATA = "nvfldata" + ETC = "nvfletc" + + +VOLUME_MOUNT_LIST = [ + {"name": PV_NAME.WORKSPACE.value, "mountPath": "/var/tmp/nvflare/workspace"}, + {"name": PV_NAME.DATA.value, "mountPath": "/var/tmp/nvflare/data"}, + {"name": PV_NAME.ETC.value, "mountPath": "/var/tmp/nvflare/etc"}, +] + class K8sJobHandle(JobHandleSpec): def __init__(self, job_id: str, api_instance: core_v1_api, job_config: dict, namespace="default", timeout=None): super().__init__() self.job_id = job_id self.timeout = timeout - + self.terminal_state = None self.api_instance = api_instance self.namespace = namespace self.pod_manifest = { @@ -68,14 +106,16 @@ def __init__(self, job_id: str, api_instance: core_v1_api, job_config: dict, nam "spec": { "containers": None, # link to container_list "volumes": None, # link to volume_list - "restartPolicy": "OnFailure", + "restartPolicy": "Never", }, } - self.volume_list = [{"name": None, "hostPath": {"path": None, "type": "Directory"}}] + self.volume_list = [] + self.container_list = [ { "image": None, "name": None, + "resources": None, "command": ["/usr/local/bin/python"], "args": None, # args_list + args_dict + args_sets "volumeMounts": None, # volume_mount_list @@ -83,63 +123,34 @@ def __init__(self, job_id: str, api_instance: core_v1_api, job_config: dict, nam } ] self.container_args_python_args_list = ["-u", "-m", job_config.get("command")] - self.container_args_module_args_dict = { - "-m": None, - "-w": None, - "-t": None, - "-d": None, - "-n": None, - "-c": None, - "-p": None, - "-g": None, - "-scheme": None, - "-s": None, - } - self.container_volume_mount_list = [ - { - "name": None, - "mountPath": None, - } - ] + self.container_volume_mount_list = [] self._make_manifest(job_config) + self._last_phase = None + self._stuck_count = -10 + self._max_stuck_count = self.timeout + self.logger = logging.getLogger(self.__class__.__name__) def _make_manifest(self, job_config): - self.container_volume_mount_list = job_config.get( - "volume_mount_list", [{"name": "workspace-nvflare", "mountPath": "/workspace/nvflare"}] - ) + self.container_volume_mount_list.extend(job_config.get("volume_mount_list", [])) set_list = job_config.get("set_list") if set_list is None: self.container_args_module_args_sets = list() else: self.container_args_module_args_sets = ["--set"] + set_list - self.container_args_module_args_dict = job_config.get( - "module_args", - { - "-m": None, - "-w": None, - "-t": None, - "-d": None, - "-n": None, - "-c": None, - "-p": None, - "-g": None, - "-scheme": None, - "-s": None, - }, - ) + if job_config.get("module_args") is None: + self.container_args_module_args_dict = DEFAULT_CONTAINER_ARGS_MODULE_ARGS_DICT + else: + self.container_args_module_args_dict = job_config.get("module_args") self.container_args_module_args_dict_as_list = list() for k, v in self.container_args_module_args_dict.items(): self.container_args_module_args_dict_as_list.append(k) self.container_args_module_args_dict_as_list.append(v) - self.volume_list = job_config.get( - "volume_list", [{"name": None, "hostPath": {"path": None, "type": "Directory"}}] - ) - + self.volume_list.extend(job_config.get("volume_list", [])) self.pod_manifest["metadata"]["name"] = job_config.get("name") self.pod_manifest["spec"]["containers"] = self.container_list self.pod_manifest["spec"]["volumes"] = self.volume_list - self.container_list[0]["image"] = job_config.get("image", "nvflare/nvflare:2.5.0") + self.container_list[0]["image"] = job_config.get("image", "nvflare/nvflare:2.8.0") self.container_list[0]["name"] = job_config.get("container_name", "nvflare_job") self.container_list[0]["args"] = ( self.container_args_python_args_list @@ -147,6 +158,8 @@ def _make_manifest(self, job_config): + self.container_args_module_args_sets ) self.container_list[0]["volumeMounts"] = self.container_volume_mount_list + if job_config.get("resources", {}).get("limits", {}).get("nvidia.com/gpu") is not None: + self.container_list[0]["resources"] = job_config.get("resources") def get_manifest(self): return self.pod_manifest @@ -169,9 +182,13 @@ def terminate(self): resp = self.api_instance.delete_namespaced_pod( name=self.job_id, namespace=self.namespace, grace_period_seconds=0 ) - return self.enter_states([JobState.TERMINATED], timeout=self.timeout) + self.enter_states([JobState.TERMINATED], timeout=self.timeout) + self.terminal_state = JobState.TERMINATED + return None def poll(self): + if self.terminal_state is not None: + return self.terminal_state job_state = self._query_state() return JOB_RETURN_CODE_MAPPING.get(job_state, JobReturnCode.UNKNOWN) @@ -180,8 +197,20 @@ def _query_state(self): resp = self.api_instance.read_namespaced_pod(name=self.job_id, namespace=self.namespace) except ApiException: return JobState.UNKNOWN + if self._stuck(resp.status.phase): + self.terminate() + return JobState.TERMINATED return POD_STATE_MAPPING.get(resp.status.phase, JobState.UNKNOWN) + def _stuck(self, current_phase): + if self._max_stuck_count is None: + return False + if current_phase == POD_Phase.PENDING.value: + self._stuck_count += 1 + if self._stuck_count > self._max_stuck_count: + return True + return False + def wait(self): self.enter_states([JobState.SUCCEEDED, JobState.TERMINATED]) @@ -189,19 +218,27 @@ def wait(self): class K8sJobLauncher(JobLauncherSpec): def __init__( self, - config_file_path, - root_hostpath: str, - workspace: str, - mount_path: str, + config_file_path: str, + workspace_pvc: str, + etc_pvc: str, + data_pvc_file_path: str, timeout=None, namespace="default", ): super().__init__() + self.logger = logging.getLogger(self.__class__.__name__) - self.root_hostpath = root_hostpath - self.workspace = workspace - self.mount_path = mount_path + self.workspace_pvc = workspace_pvc + self.etc_pvc = etc_pvc + self.data_pvc_file_path = data_pvc_file_path self.timeout = timeout + self.namespace = namespace + with open(data_pvc_file_path, "rt") as f: + data_pvc_dict = yaml.safe_load(f) + # data_pvc_dict will be pvc: mountPath + # currently, support one pvc and always mount to /var/tmp/nvflare/data + # ie, ignore the mountPath in data_pvc_dict + self.data_pvc = list(data_pvc_dict.keys())[0] config.load_kube_config(config_file_path) try: @@ -211,17 +248,15 @@ def __init__( c.assert_hostname = False Configuration.set_default(c) self.core_v1 = core_v1_api.CoreV1Api() - self.namespace = namespace - self.job_handle = None - self.logger = logging.getLogger(self.__class__.__name__) def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec: - + site_name = fl_ctx.get_identity_name() job_id = job_meta.get(JobConstants.JOB_ID) args = fl_ctx.get_prop(FLContextKey.ARGS) - job_image = extract_job_image(job_meta, fl_ctx.get_identity_name()) - self.logger.info(f"launch job use image: {job_image}") + job_image = extract_job_image(job_meta, site_name) + site_resources = job_meta.get(JobMetaKey.RESOURCE_SPEC.value, {}).get(site_name, {}) + job_resource = site_resources.get("num_of_gpus", None) job_args = fl_ctx.get_prop(FLContextKey.JOB_PROCESS_ARGS) if not job_args: @@ -233,25 +268,27 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec: "image": job_image, "container_name": f"container-{job_id}", "command": job_cmd, - "volume_mount_list": [{"name": self.workspace, "mountPath": self.mount_path}], - "volume_list": [{"name": self.workspace, "hostPath": {"path": self.root_hostpath, "type": "Directory"}}], + "volume_mount_list": VOLUME_MOUNT_LIST, + "volume_list": [ + {"name": PV_NAME.WORKSPACE.value, "persistentVolumeClaim": {"claimName": self.workspace_pvc}}, + {"name": PV_NAME.DATA.value, "persistentVolumeClaim": {"claimName": self.data_pvc}}, + {"name": PV_NAME.ETC.value, "persistentVolumeClaim": {"claimName": self.etc_pvc}}, + ], "module_args": self.get_module_args(job_id, fl_ctx), "set_list": args.set, + "resources": {"limits": {"nvidia.com/gpu": job_resource}}, } - self.logger.info(f"launch job with k8s_launcher. Job_id:{job_id}") - job_handle = K8sJobHandle(job_id, self.core_v1, job_config, namespace=self.namespace, timeout=self.timeout) + pod_manifest = job_handle.get_manifest() + self.logger.info(f"launch job with k8s_launcher. {pod_manifest=}") try: - self.core_v1.create_namespaced_pod(body=job_handle.get_manifest(), namespace=self.namespace) - if job_handle.enter_states([JobState.RUNNING], timeout=self.timeout): - return job_handle - else: - job_handle.terminate() - return None - except ApiException: + self.core_v1.create_namespaced_pod(body=pod_manifest, namespace=self.namespace) + job_handle.enter_states([JobState.RUNNING], timeout=self.timeout) + return job_handle + except ApiException as e: job_handle.terminate() - return None + return job_handle def handle_event(self, event_type: str, fl_ctx: FLContext): if event_type == EventType.BEFORE_JOB_LAUNCH: diff --git a/nvflare/private/fed/client/communicator.py b/nvflare/private/fed/client/communicator.py index 1387f9ca9c..50d47f7005 100644 --- a/nvflare/private/fed/client/communicator.py +++ b/nvflare/private/fed/client/communicator.py @@ -273,9 +273,9 @@ def client_registration(self, client_name, project_name, fl_ctx: FLContext): start = time.time() while not self.cell: self.logger.info("Waiting for the client cell to be created.") - if time.time() - start > 15.0: + if time.time() - start > 600: raise RuntimeError("Client cell could not be created. Failed to login the client.") - time.sleep(0.5) + time.sleep(10) shared_fl_ctx = gen_new_peer_ctx(fl_ctx) private_key_file = None