Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/programming_guide/timeouts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,45 @@ application.conf Settings
# Shutdown
end_run_readiness_timeout = 10.0

# Server startup/dead-job safety flags
strict_start_job_reply_check = false
sync_client_jobs_require_previous_report = true


.. _server_startup_dead_job_safety_flags:

Server Startup and Dead-Job Safety Flags
----------------------------------------

These ``application.conf`` flags are server-side safety controls used during job startup
and client heartbeat synchronization:

.. list-table::
:header-rows: 1
:widths: 36 12 52

* - Parameter
- Default
- Purpose
* - strict_start_job_reply_check
- false
- Enables strict START_JOB reply validation (detects missing/timeout replies and non-OK return codes).
* - sync_client_jobs_require_previous_report
- true
- Requires a prior positive heartbeat report before treating "missing job on client" as a dead-job signal.

Recommended usage:

- ``strict_start_job_reply_check`` defaults to ``false`` for backward compatibility.
Enable it (``true``) for large-scale or hierarchical deployments where startup timeouts
are expected and you want the server to proceed with the subset of clients that responded,
rather than failing the entire job. With ``false``, a timed-out reply is treated as a
silent success, which can mask startup problems.
- Keep ``sync_client_jobs_require_previous_report=true`` (default) to prevent false
dead-job reports during startup races and transient heartbeat delays.
- Set ``sync_client_jobs_require_previous_report=false`` only to restore legacy behavior
where the first missing-job heartbeat immediately triggers dead-job detection.


Admin Client Session (Python API)
---------------------------------
Expand Down
11 changes: 11 additions & 0 deletions docs/user_guide/timeout_troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,17 @@ Via Configuration Files
get_task_timeout = 300.0
submit_task_result_timeout = 300.0

# Server startup/dead-job safety flags
strict_start_job_reply_check = false
sync_client_jobs_require_previous_report = true

Server-side safety flags guidance (see :ref:`server_startup_dead_job_safety_flags` for full details):

- ``strict_start_job_reply_check`` (default ``false``): keep default for backward-compatible startup behavior;
set to ``true`` to enforce stricter START_JOB reply checks.
- ``sync_client_jobs_require_previous_report`` (default ``true``): keep enabled to avoid false dead-job reports
caused by transient startup or sync races.

**comm_config.json** (system-level, in startup kit):

.. code-block:: json
Expand Down
6 changes: 6 additions & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,12 @@ class ConfigVarName:
# server: wait this long since job schedule time before starting to check dead/disconnected clients
DEAD_CLIENT_CHECK_LEAD_TIME = "dead_client_check_lead_time"

# server: require all start-job replies to be non-timeout and OK before considering the run started
STRICT_START_JOB_REPLY_CHECK = "strict_start_job_reply_check"

# server: require prior positive job observation before reporting "missing job on client" as dead-job
SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT = "sync_client_jobs_require_previous_report"

# customized nvflare decomposers module name
DECOMPOSER_MODULE = "nvflare_decomposers"

Expand Down
8 changes: 8 additions & 0 deletions nvflare/private/fed/client/client_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,15 @@ def get_job_clients(self, fl_ctx: FLContext):

"""
job_meta = fl_ctx.get_prop(FLContextKey.JOB_META)
if not isinstance(job_meta, dict):
raise RuntimeError(f"invalid job meta type: expected dict but got {type(job_meta)}")

job_clients = job_meta.get(JobMetaKey.JOB_CLIENTS)
if job_clients is None:
raise RuntimeError(f"missing {JobMetaKey.JOB_CLIENTS} in job meta")
if not isinstance(job_clients, list):
raise RuntimeError(f"invalid {JobMetaKey.JOB_CLIENTS} type: expected list but got {type(job_clients)}")

self.all_clients = [from_dict(d) for d in job_clients]
for c in self.all_clients:
self.name_to_clients[c.name] = c
Expand Down
62 changes: 56 additions & 6 deletions nvflare/private/fed/server/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from nvflare.fuel.hci.server.hci import AdminServer
from nvflare.fuel.hci.server.login import LoginModule, SessionManager
from nvflare.fuel.sec.audit import Auditor, AuditService
from nvflare.private.admin_defs import Message
from nvflare.private.admin_defs import Message, MsgHeader, ReturnCode
from nvflare.private.defs import ERROR_MSG_PREFIX, RequestHeader
from nvflare.private.fed.server.message_send import ClientReply, send_requests

Expand Down Expand Up @@ -77,20 +77,70 @@ def __init__(self, client, req: Message):
self.req = req


def check_client_replies(replies: List[ClientReply], client_sites: List[str], command: str):
def check_client_replies(
replies: List[ClientReply], client_sites: List[str], command: str, strict: bool = False
) -> List[str]:
"""Check client replies for errors.

Args:
replies: list of client replies
client_sites: list of expected client names
command: command description for error messages
strict: if True, detect timed-out clients (reply=None) and return them as a list
rather than raising. Explicit errors (non-OK return code or error body)
always raise regardless of this flag.

Returns:
List of client names whose reply was None (timed out). Only populated when
strict=True; always empty when strict=False.

Raises:
RuntimeError: if no replies were received, reply count mismatches, structurally
missing replies (strict mode), or any client returned an explicit error.
"""
display_sites = ", ".join(client_sites)
if not replies:
raise RuntimeError(f"Failed to {command} to the clients {display_sites}: no replies.")
if len(replies) != len(client_sites):
raise RuntimeError(f"Failed to {command} to the clients {display_sites}: not enough replies.")

error_msg = ""
for r, client_name in zip(replies, client_sites):
if r.reply and ERROR_MSG_PREFIX in r.reply.body:
error_msg += f"\t{client_name}: {r.reply.body}\n"
if error_msg != "":
timed_out_clients = []
replies_by_client = {r.client_name: r for r in replies}

if strict:
missing_clients = [c for c in client_sites if c not in replies_by_client]
if missing_clients:
raise RuntimeError(
f"Failed to {command} to the clients {display_sites}: missing replies from {missing_clients}."
)

for client_name in client_sites:
r = replies_by_client[client_name]
if not r.reply:
# Timeout: record and continue — caller decides whether to exclude or abort.
timed_out_clients.append(client_name)
continue

return_code = r.reply.get_header(MsgHeader.RETURN_CODE, ReturnCode.OK)
if return_code != ReturnCode.OK:
detail = r.reply.body if r.reply.body else f"return code {return_code}"
error_msg += f"\t{client_name}: {detail}\n"
continue

if isinstance(r.reply.body, str) and r.reply.body.startswith(ERROR_MSG_PREFIX):
error_msg += f"\t{client_name}: {r.reply.body}\n"
else:
for client_name in client_sites:
r = replies_by_client.get(client_name)
if r and r.reply and isinstance(r.reply.body, str) and r.reply.body.startswith(ERROR_MSG_PREFIX):
error_msg += f"\t{client_name}: {r.reply.body}\n"

if error_msg:
raise RuntimeError(f"Failed to {command} to the following clients: \n{error_msg}")

return timed_out_clients


class FedAdminServer(AdminServer):
def __init__(
Expand Down
73 changes: 61 additions & 12 deletions nvflare/private/fed/server/fed_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,13 @@ def __init__(
self.name_to_reg = {}
self.cred_keeper = CredKeeper()

# Tracks per-job which client tokens have been positively observed running the job.
# Keyed by job_id -> set of client tokens. Used by _sync_client_jobs() to require
# a prior positive heartbeat before classifying a client's missing job as "dead".
# Entries are cleaned up as soon as the job is no longer in run_processes.
self._job_reported_clients: Dict[str, set] = {}
self._job_reported_clients_lock = threading.Lock()

# these are used when the server sends a message to itself.
self.my_own_auth_client_name = "server"
self.my_own_token = "server"
Expand Down Expand Up @@ -795,21 +802,63 @@ def client_heartbeat(self, request: Message) -> Message:
def _sync_client_jobs(self, request, client_token):
# jobs that are running on client but not on server need to be aborted!
client_jobs = request.get_header(CellMessageHeaderKeys.JOB_IDS)
server_jobs = self.engine.run_processes.keys()
jobs_need_abort = list(set(client_jobs).difference(server_jobs))

# also check jobs that are running on server but not on the client
jobs_on_server_but_not_on_client = list(set(server_jobs).difference(client_jobs))
if jobs_on_server_but_not_on_client:
# notify all the participating clients these jobs are not running on server anymore
for job_id in jobs_on_server_but_not_on_client:
job_info = self.engine.run_processes[job_id]
if not isinstance(client_jobs, (list, tuple, set)):
client_jobs = []

client_jobs = set(client_jobs)
server_jobs = set(self.engine.run_processes.keys())
jobs_need_abort = list(client_jobs.difference(server_jobs))

require_previous_report = ConfigService.get_bool_var(
name=ConfigVarName.SYNC_CLIENT_JOBS_REQUIRE_PREVIOUS_REPORT,
conf=SystemConfigs.APPLICATION_CONF,
default=True,
)

with self._job_reported_clients_lock:
# Remove stale tracking entries for jobs that are no longer running.
for stale_job_id in list(self._job_reported_clients.keys()):
if stale_job_id not in server_jobs:
del self._job_reported_clients[stale_job_id]

# Record jobs that this client has reported at least once.
# If require_previous_report is enabled, we only treat "missing job on client"
# as dead-job after first positive observation.
for job_id in server_jobs.intersection(client_jobs):
job_info = self.engine.run_processes.get(job_id)
if not job_info:
continue

participating_clients = job_info.get(RunProcessKey.PARTICIPANTS, None)
if participating_clients:
if not participating_clients or client_token not in participating_clients:
continue

self._job_reported_clients.setdefault(job_id, set()).add(client_token)

# Also check jobs that are running on server but not on the client.
jobs_on_server_but_not_on_client = list(server_jobs.difference(client_jobs))
dead_job_notifications = []
if jobs_on_server_but_not_on_client:
for job_id in jobs_on_server_but_not_on_client:
job_info = self.engine.run_processes.get(job_id)
if not job_info:
continue

participating_clients = job_info.get(RunProcessKey.PARTICIPANTS, None)
if not participating_clients:
continue

# this is a dict: token => nvflare.apis.client.Client
client = participating_clients.get(client_token, None)
if client:
self._notify_dead_job(client, job_id, "missing job on client")
if not client:
continue

reported_clients = self._job_reported_clients.get(job_id, set())
if (not require_previous_report) or (client_token in reported_clients):
dead_job_notifications.append((client, job_id))

for client, job_id in dead_job_notifications:
self._notify_dead_job(client, job_id, "missing job on client")

return jobs_need_abort

Expand Down
72 changes: 67 additions & 5 deletions nvflare/private/fed/server/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@
from nvflare.apis.client import Client
from nvflare.apis.event_type import EventType
from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_constant import AdminCommandNames, FLContextKey, RunProcessKey, SiteType, SystemComponents
from nvflare.apis.fl_constant import (
AdminCommandNames,
ConfigVarName,
FLContextKey,
RunProcessKey,
SiteType,
SystemComponents,
SystemConfigs,
)
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import ALL_SITES, Job, JobMetaKey, RunStatus
from nvflare.apis.job_scheduler_spec import DispatchInfo
from nvflare.apis.workspace import Workspace
from nvflare.fuel.utils.argument_utils import parse_vars
from nvflare.fuel.utils.config_service import ConfigService
from nvflare.lighter.utils import verify_folder_signature
from nvflare.private.admin_defs import Message, MsgHeader, ReturnCode
from nvflare.private.defs import RequestHeader, TrainingTopic
Expand Down Expand Up @@ -214,7 +223,12 @@ def _deploy_job(self, job: Job, sites: dict, fl_ctx: FLContext) -> Tuple[str, li
else:
deploy_detail.append(f"{client_name}: OK")
else:
deploy_detail.append(f"{client_name}: unknown")
# No reply means the client timed out during deployment.
# Count this as a failure so the min_sites / required_sites check
# can decide whether to abort, rather than silently treating a
# timed-out client as successfully deployed.
failed_clients.append(client_name)
deploy_detail.append(f"{client_name}: no reply (deployment timeout)")

# see whether any of the failed clients are required
if failed_clients:
Expand Down Expand Up @@ -248,15 +262,63 @@ def _start_run(self, job_id: str, job: Job, client_sites: Dict[str, DispatchInfo
# job_clients is a dict of: token => Client
assert isinstance(job_clients, dict)
participating_clients = [c.to_dict() for c in job_clients.values()]
# start_client_job serializes job.meta into request headers; make sure
# JOB_CLIENTS is available before client startup.
job.meta[JobMetaKey.JOB_CLIENTS] = participating_clients
err = engine.start_app_on_server(fl_ctx, job=job, job_clients=job_clients)
if err:
raise RuntimeError(f"Could not start the server App for job: {job_id}.")

replies = engine.start_client_job(job, client_sites, fl_ctx)
client_sites_names = list(client_sites.keys())
check_client_replies(replies=replies, client_sites=client_sites_names, command=f"start job ({job_id})")
display_sites = ",".join(client_sites_names)
all_client_sites = list(client_sites.keys())
active_client_sites = list(all_client_sites)
strict_start_reply_check = ConfigService.get_bool_var(
name=ConfigVarName.STRICT_START_JOB_REPLY_CHECK,
conf=SystemConfigs.APPLICATION_CONF,
default=False,
)
timed_out = check_client_replies(
replies=replies,
client_sites=all_client_sites,
command=f"start job ({job_id})",
strict=strict_start_reply_check,
)
if timed_out:
active_count = len(all_client_sites) - len(timed_out)

# A required site timing out is fatal regardless of min_sites, same as deploy phase.
if job.required_sites:
for c in timed_out:
if c in job.required_sites:
raise RuntimeError(f"start job ({job_id}): required client {c} timed out")

if job.min_sites and active_count < job.min_sites:
raise RuntimeError(
f"start job ({job_id}): {len(timed_out)} client(s) timed out and remaining "
f"{active_count} < min_sites {job.min_sites}: {timed_out}"
)
self.log_warning(
fl_ctx,
f"start job ({job_id}): {len(timed_out)} client(s) timed out at start-job: {timed_out}; "
f"{active_count} of {len(all_client_sites)} clients started successfully.",
)
active_client_sites = [c for c in all_client_sites if c not in timed_out]

if not strict_start_reply_check:
# In non-strict mode, check_client_replies() does not return timed-out clients.
# Build active clients directly from actual replies so JOB_CLIENTS stays accurate.
replies_by_client = {r.client_name: r for r in replies}
active_client_sites = []
for client_name in all_client_sites:
client_reply = replies_by_client.get(client_name)
if client_reply and client_reply.reply:
active_client_sites.append(client_name)

# Set metadata once, after any timeout exclusion, so it always reflects active participants.
active_sites = set(active_client_sites)
participating_clients = [c.to_dict() for c in job_clients.values() if c.name in active_sites]
job.meta[JobMetaKey.JOB_CLIENTS] = participating_clients
display_sites = ",".join(active_client_sites)

self.log_info(fl_ctx, f"Started run: {job_id} for clients: {display_sites}")
self.fire_event(EventType.JOB_STARTED, fl_ctx)
Expand Down
13 changes: 13 additions & 0 deletions tests/unit_test/private/fed/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading
Loading