Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
492 changes: 492 additions & 0 deletions docs/design/multiproject.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions nvflare/apis/job_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class JobMetaKey(str, Enum):
CUSTOM_PROPS = "custom_props"
EDGE_METHOD = "edge_method"
JOB_CLIENTS = "job_clients" # clients that participated the job
PROJECT = "project"

def __repr__(self):
return self.value
Expand Down
1 change: 1 addition & 0 deletions nvflare/apis/utils/format_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"email": r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$",
"org": r"^[A-Za-z0-9_]+$",
"simple_name": r"^[A-Za-z0-9_]+$",
"project": r"^[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?$",
}


Expand Down
10 changes: 9 additions & 1 deletion nvflare/app_opt/job_launcher/docker_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobReturnCode, add_launcher
from nvflare.apis.workspace import Workspace
from nvflare.utils.job_launcher_utils import extract_job_image, generate_client_command, generate_server_command
Expand Down Expand Up @@ -117,8 +118,15 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
command = f' /bin/bash -c "export PYTHONPATH={python_path};{cmd}"'
self.logger.info(f"Launch image:{job_image}, run command: {command}")

project = job_meta.get(JobMetaKey.PROJECT.value, "")
docker_workspace = os.environ.get("NVFL_DOCKER_WORKSPACE")
self.logger.info(f"launch_job {job_id} in docker_workspace: {docker_workspace}")
# Keep legacy jobs on the existing workspace root; only non-default projects
# get a project-specific subdirectory under the configured Docker workspace.
if docker_workspace and isinstance(project, str) and project and project != "default":
docker_workspace = os.path.join(docker_workspace, project)

self.logger.info(f"launch_job {job_id} in docker_workspace: {docker_workspace} (project={project})")

docker_client = docker.from_env()
try:
container = docker_client.containers.run(
Expand Down
7 changes: 7 additions & 0 deletions nvflare/app_opt/job_launcher/k8s_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:
raise RuntimeError(f"missing {FLContextKey.JOB_PROCESS_ARGS} in FLContext")

_, job_cmd = job_args[JobProcessArgs.EXE_MODULE]
# TODO: Make the K8s launcher project-aware with minimal code churn.
# The intended change is only to read the optional job_meta["project"]
# and use it to resolve project-specific Kubernetes settings before pod
# launch. That settings lookup may include workspace volume/path plus
# any other K8s deployment settings required for the selected project.
# Keep the existing launch flow unchanged; only the settings resolution
# should become project-aware.
job_config = {
"name": job_id,
"image": job_image,
Expand Down
50 changes: 42 additions & 8 deletions nvflare/fuel/flare_api/flare_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from nvflare.apis.fl_constant import AdminCommandNames
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.utils.format_check import name_check
from nvflare.apis.workspace import Workspace
from nvflare.fuel.common.excepts import ConfigError
from nvflare.fuel.hci.client.api import AdminAPI, APIStatus, ResultKey
Expand Down Expand Up @@ -69,6 +70,7 @@ def __init__(
startup_path: str,
secure_mode: bool = True,
debug: bool = False,
project: Optional[str] = None,
):
"""Initializes a session with the NVFLARE system.

Expand All @@ -77,11 +79,11 @@ def __init__(
startup_path (str): path to the provisioned startup kit, which contains endpoint of the system
secure_mode (bool): whether to log in with secure mode
debug (bool): turn on debug or not
project (Optional[str]): project name to tag submitted/cloned jobs; None keeps existing behavior
"""
assert isinstance(username, str), "username must be str"
assert isinstance(startup_path, str), "startup_path must be str"
assert os.path.isdir(startup_path), f"startup kit does not exist at {startup_path}"

workspace = Workspace(root_dir=startup_path)
conf = secure_load_admin_config(workspace)
admin_config = conf.get_admin_config()
Expand All @@ -105,6 +107,11 @@ def __init__(
)
self.upload_dir = upload_dir
self.download_dir = download_dir
if project is not None:
err, reason = name_check(project, "project")
if err:
raise ValueError(reason)
self._project = project

def close(self):
"""Close the session."""
Expand Down Expand Up @@ -209,7 +216,8 @@ def clone_job(self, job_id: str) -> str:

"""
self._validate_job_id(job_id)
result = self._do_command(AdminCommandNames.CLONE_JOB + " " + job_id)
props = {JobMetaKey.PROJECT.value: self._project} if self._project else None
result = self._do_command(AdminCommandNames.CLONE_JOB + " " + job_id, props=props)
meta = result[ResultKey.META]
job_id = meta.get(MetaKey.JOB_ID, None)
info = meta.get(MetaKey.INFO, "")
Expand Down Expand Up @@ -241,7 +249,8 @@ def submit_job(self, job_definition_path: str) -> str:
else:
raise InvalidJobDefinition(f"job_definition_path '{job_definition_path}' is not a valid folder")

result = self._do_command(AdminCommandNames.SUBMIT_JOB + " " + job_definition_path)
props = {JobMetaKey.PROJECT.value: self._project} if self._project else None
result = self._do_command(AdminCommandNames.SUBMIT_JOB + " " + job_definition_path, props=props)
meta = result[ResultKey.META]
job_id = meta.get(MetaKey.JOB_ID, None)
if not job_id:
Expand Down Expand Up @@ -935,40 +944,65 @@ def new_session(
secure_mode: bool = True,
debug: bool = False,
timeout: float = 10.0,
project: Optional[str] = None,
) -> Session:
session = Session(username=username, startup_path=startup_kit_location, debug=debug, secure_mode=secure_mode)
session = Session(
username=username,
startup_path=startup_kit_location,
debug=debug,
secure_mode=secure_mode,
project=project,
)
session.try_connect(timeout)
return session


def new_secure_session(username: str, startup_kit_location: str, debug: bool = False, timeout: float = 10.0) -> Session:
def new_secure_session(
username: str,
startup_kit_location: str,
debug: bool = False,
timeout: float = 10.0,
project: Optional[str] = None,
) -> Session:
"""Create a new secure FLARE API session with the NVFLARE system.

Args:
username (str): username assigned to the user
startup_kit_location (str): path to the provisioned startup folder, the root admin dir containing the startup folder
debug (bool): enable debug mode
timeout (float): how long to try to establish the session, in seconds
project (Optional[str]): project name to tag submitted/cloned jobs

Returns: a Session object

"""
return new_session(username, startup_kit_location, True, debug, timeout)
return new_session(username, startup_kit_location, True, debug, timeout, project=project)


def new_insecure_session(startup_kit_location: str, debug: bool = False, timeout: float = 10.0) -> Session:
def new_insecure_session(
startup_kit_location: str,
debug: bool = False,
timeout: float = 10.0,
project: Optional[str] = None,
) -> Session:
"""Create a new insecure FLARE API session with the NVFLARE system.

Args:
startup_kit_location (str): path to the provisioned startup folder
debug (bool): enable debug mode
timeout (float): how long to try to establish the session, in seconds
project (Optional[str]): project name to tag submitted/cloned jobs

Returns: a Session object

The username for insecure session is always "admin".

"""
return new_session(
username="", startup_kit_location=startup_kit_location, secure_mode=False, debug=debug, timeout=timeout
username="",
startup_kit_location=startup_kit_location,
secure_mode=False,
debug=debug,
timeout=timeout,
project=project,
)
36 changes: 36 additions & 0 deletions nvflare/private/fed/server/job_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec, RunStatus
from nvflare.apis.shareable import Shareable
from nvflare.apis.storage import DATA, JOB_ZIP, META, META_JSON, WORKSPACE, WORKSPACE_ZIP, StorageSpec
from nvflare.apis.utils.format_check import name_check
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.proto import ConfirmMethod, MetaKey, MetaStatusValue, make_meta
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
Expand Down Expand Up @@ -53,8 +54,11 @@
JobMetaKey.MIN_CLIENTS.value,
JobMetaKey.MANDATORY_CLIENTS.value,
JobMetaKey.DATA_STORAGE_FORMAT.value,
JobMetaKey.PROJECT.value,
}

PROJECT_CMD_PROP_KEY = JobMetaKey.PROJECT.value


def _create_list_job_cmd_parser():
parser = SafeArgumentParser(prog=AdminCommandNames.LIST_JOBS)
Expand All @@ -78,6 +82,34 @@ def __init__(self):
super().__init__()
self.logger = get_obj_logger(self)

@staticmethod
def _add_project_to_meta(meta: dict, conn: Connection) -> bool:
"""Validate optional project from command props and persist it into job metadata."""

cmd_props = conn.get_prop(ConnProps.CMD_PROPS)
project = ""
error = ""

if isinstance(cmd_props, dict):
candidate = cmd_props.get(PROJECT_CMD_PROP_KEY)
if candidate:
if not isinstance(candidate, str):
error = f"project must be str but got {type(candidate)}"
else:
invalid, reason = name_check(candidate, "project")
if invalid:
error = reason
else:
project = candidate

if error:
conn.append_error(error, meta=make_meta(MetaStatusValue.INVALID_JOB_DEFINITION, error))
return False

if project:
meta[JobMetaKey.PROJECT.value] = project
return True

def get_spec(self):
return CommandModuleSpec(
name="job_mgmt",
Expand Down Expand Up @@ -502,6 +534,8 @@ def clone_job(self, conn: Connection, args: List[str]):
job_meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG)
job_meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE)
job_meta[JobMetaKey.CLONED_FROM.value] = job_id
if not self._add_project_to_meta(job_meta, conn):
return

meta = job_def_manager.clone(from_jid=job_id, meta=job_meta, fl_ctx=fl_ctx)
new_job_id = meta.get(JobMetaKey.JOB_ID)
Expand Down Expand Up @@ -599,6 +633,8 @@ def submit_job(self, conn: Connection, args: List[str]):
meta[JobMetaKey.SUBMITTER_ORG.value] = conn.get_prop(ConnProps.USER_ORG, "")
meta[JobMetaKey.SUBMITTER_ROLE.value] = conn.get_prop(ConnProps.USER_ROLE, "")
meta[JobMetaKey.JOB_FOLDER_NAME.value] = folder_name
if not self._add_project_to_meta(meta, conn):
return
custom_props = conn.get_prop(ConnProps.CUSTOM_PROPS)
if custom_props:
meta[JobMetaKey.CUSTOM_PROPS.value] = custom_props
Expand Down
11 changes: 11 additions & 0 deletions nvflare/recipe/poc_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pydantic import BaseModel, conint, model_validator

from nvflare.apis.utils.format_check import name_check
from nvflare.job_config.api import FedJob
from nvflare.recipe.spec import ExecEnv
from nvflare.recipe.utils import _collect_non_local_scripts
Expand Down Expand Up @@ -50,6 +51,7 @@ class _PocEnvValidator(BaseModel):
docker_image: Optional[str] = None
project_conf_path: str = ""
username: str = DEFAULT_ADMIN_USER
project: Optional[str] = None

@model_validator(mode="after")
def check_client_configuration(self):
Expand All @@ -67,6 +69,10 @@ def check_client_configuration(self):
if self.clients is None and self.num_clients <= 0:
raise ValueError("num_clients must be greater than 0")

if self.project is not None:
err, reason = name_check(self.project, "project")
if err:
raise ValueError(reason)
return self


Expand All @@ -87,6 +93,7 @@ def __init__(
docker_image: Optional[str] = None,
project_conf_path: str = "",
username: str = DEFAULT_ADMIN_USER,
project: Optional[str] = None,
extra: Optional[dict] = None,
):
"""Initialize POC execution environment.
Expand All @@ -101,6 +108,7 @@ def __init__(
project_conf_path (str, optional): Path to the project configuration file. Defaults to "".
If specified, 'number_of_clients','clients' and 'docker' specific options will be ignored.
username (str, optional): Admin user. Defaults to "admin@nvidia.com".
project (Optional[str]): Project name to tag submitted/cloned jobs.
extra: extra env info.
"""
super().__init__(extra)
Expand All @@ -113,6 +121,7 @@ def __init__(
docker_image=docker_image,
project_conf_path=project_conf_path,
username=username,
project=project,
)

self.clients = v.clients
Expand All @@ -123,6 +132,7 @@ def __init__(
self.project_conf_path = v.project_conf_path
self.docker_image = v.docker_image
self.username = v.username
self.project = v.project
self._session_manager = None # Lazy initialization

def deploy(self, job: FedJob):
Expand Down Expand Up @@ -257,6 +267,7 @@ def _get_session_manager(self):
"username": self.username,
"startup_kit_location": self._get_admin_startup_kit_path(),
"timeout": self.get_extra_prop("login_timeout", 10),
"project": self.project,
}
self._session_manager = SessionManager(session_params)
return self._session_manager
11 changes: 11 additions & 0 deletions nvflare/recipe/prod_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from pydantic import BaseModel, PositiveFloat, model_validator

from nvflare.apis.utils.format_check import name_check
from nvflare.job_config.api import FedJob
from nvflare.recipe.spec import ExecEnv
from nvflare.recipe.utils import _collect_non_local_scripts
Expand All @@ -34,11 +35,16 @@ class _ProdEnvValidator(BaseModel):
startup_kit_location: str
login_timeout: PositiveFloat = 5.0
username: str = DEFAULT_ADMIN_USER
project: Optional[str] = None

@model_validator(mode="after")
def check_startup_kit_location_exists(self) -> "_ProdEnvValidator":
if not os.path.exists(self.startup_kit_location):
raise ValueError(f"startup_kit_location path does not exist: {self.startup_kit_location}")
if self.project is not None:
err, reason = name_check(self.project, "project")
if err:
raise ValueError(reason)
return self


Expand All @@ -48,6 +54,7 @@ def __init__(
startup_kit_location: str,
login_timeout: float = 5.0,
username: str = DEFAULT_ADMIN_USER,
project: Optional[str] = None,
extra: Optional[dict] = None,
):
"""Production execution environment for submitting and monitoring NVFlare jobs.
Expand All @@ -58,6 +65,7 @@ def __init__(
startup_kit_location (str): Path to the admin's startup kit directory.
login_timeout (float): Timeout (in seconds) for logging into the Flare API session. Must be > 0.
username (str): Username to log in with.
project (Optional[str]): Project name to tag submitted/cloned jobs.
extra: extra env info.
"""
super().__init__(extra)
Expand All @@ -66,11 +74,13 @@ def __init__(
startup_kit_location=startup_kit_location,
login_timeout=login_timeout,
username=username,
project=project,
)

self.startup_kit_location = v.startup_kit_location
self.login_timeout = v.login_timeout
self.username = v.username
self.project = v.project
self._session_manager = None # Lazy initialization

def get_job_status(self, job_id: str) -> Optional[str]:
Expand Down Expand Up @@ -103,6 +113,7 @@ def _get_session_manager(self):
"username": self.username,
"startup_kit_location": self.startup_kit_location,
"timeout": self.login_timeout,
"project": self.project,
}
self._session_manager = SessionManager(session_params)
return self._session_manager
Loading
Loading