Skip to content

Cherry-pick of #4084 and #4132#4286

Open
YuanTingHsieh wants to merge 3 commits intoNVIDIA:mainfrom
YuanTingHsieh:cherry-pick-4084
Open

Cherry-pick of #4084 and #4132#4286
YuanTingHsieh wants to merge 3 commits intoNVIDIA:mainfrom
YuanTingHsieh:cherry-pick-4084

Conversation

@YuanTingHsieh
Copy link
Collaborator

@YuanTingHsieh YuanTingHsieh commented Mar 11, 2026

Description

Cherry-pick 2.7 changes to main #4084 and #4132

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

Copilot AI review requested due to automatic review settings March 11, 2026 00:43
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a 3-mode ignore_result_error policy (dynamic/strict/resilient) to key FL controllers via shared utilities, and introduces an ExecEnv.stop() hook to ensure execution environments (notably POC) can be cleaned up after a run.

Changes:

  • Added shared utilities for deciding whether to ignore client result errors and for generating consistent log/panic messages.
  • Updated BaseModelController and Scatter-and-Gather workflows to support ignore_result_error=None (dynamic) with per-task failure tracking and special handling for unknown/late tasks.
  • Added ExecEnv.stop() and wired Run.get_result() to invoke environment cleanup; updated POC env stop logic and tests.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/unit_test/recipe/poc_env_test.py Updates POC stop test to reflect idempotent running-state checks.
tests/unit_test/app_common/utils/error_handling_utils_test.py New unit tests covering the 3-mode error handling utilities and controller integration points.
nvflare/recipe/spec.py Adds ExecEnv.stop() default no-op cleanup hook.
nvflare/recipe/run.py Calls exec_env.stop() from Run.get_result() (cleanup integration).
nvflare/recipe/poc_env.py Makes POC stop() more explicitly idempotent and optionally removes workspace when already stopped.
nvflare/app_common/workflows/scatter_and_gather_scaffold.py Changes default ignore_result_error to None and documents 3-mode behavior.
nvflare/app_common/workflows/scatter_and_gather.py Implements 3-mode result error handling with dynamic tracking and unknown-task behavior.
nvflare/app_common/workflows/scaffold.py Updates docstring to describe new ignore_result_error semantics.
nvflare/app_common/workflows/base_model_controller.py Implements dynamic error policy tracking/reset per task and propagates accept/reject from _accept_train_result().
nvflare/app_common/utils/error_handling_utils.py New shared utility functions for ignore decision + message formatting.
nvflare/app_common/utils/__init__.py Exposes the new utility functions via package exports.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +150 to +152
# Store task context for dynamic ignore_result_error mode
num_targets = len(targets) if targets else len(self.engine.get_clients())
self._current_min_responses = min_responses if min_responses > 0 else num_targets
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In broadcast_model(), num_targets = len(targets) if targets else len(self.engine.get_clients()) treats an empty targets=[] the same as targets=None (all clients). That makes the dynamic ignore_result_error=None tolerance math incorrect, and it also conflicts with later logic that passes targets through to broadcast_and_wait. Use an explicit targets is not None check (and consider validating that targets is non-empty if empty is not meaningful).

Copilot uses AI. Check for mistakes.
Comment on lines +256 to +260
# Now try to convert result to FLModel
try:
result_model = FLModelUtils.from_shareable(result)
result_model.meta["props"] = client_task.task.props[AppConstants.META_DATA]
result_model.meta["client_name"] = client_name
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_process_result() converts the raw Shareable to FLModel before calling _accept_train_result(), and then converts it again inside the new try/except block. This duplicates work and (more importantly) the first conversion is unguarded and can raise for errored/invalid results that should have been rejected by _accept_train_result() first. Convert only once, after _accept_train_result() returns True, and keep the conversion inside the try/except.

Copilot uses AI. Check for mistakes.
train_task_name=AppConstants.TASK_TRAIN,
train_timeout: int = 0,
ignore_result_error: bool = False,
ignore_result_error: bool = None,
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter is typed as bool but now defaults to None. Update the annotation to Optional[bool] (or bool | None) to match the new 3-mode behavior and avoid type-checker/IDE inconsistencies.

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 11, 2026

Greptile Summary

This PR cherry-picks two changes from branch 2.7 to main: a new three-mode error handling utility (error_handling_utils.py) with refactored BaseModelController and ScatterAndGather to use it (#4084), and a new Recipe API with PocEnv, Run, and ExecEnv spec classes for local POC execution (#4132). The error-handling refactor correctly centralises should_ignore_result_error and get_error_handling_message logic, and the new recipe classes are well-structured with good test coverage.

Key issues found:

  • Critical — ScatterAndGatherScaffold error tracking never initialised: control_flow overrides the parent without resetting _current_num_targets (stays 0) or _current_failed_clients per round. In dynamic mode (ignore_result_error=None), remaining_good_clients = 0 - 1 = -1 < min_clients on the very first failure, causing an immediate panic regardless of tolerance settings. Stale failures also carry over across rounds.
  • Logic — dead FLModel conversion before error check in BaseModelController._process_result: lines 243–245 create a result_model that is unconditionally overwritten later (lines 257–260) and is never used. The conversion runs outside any try/except before the error-handling check, risking an unhandled exception on malformed errored results.
  • Logic — _PocEnvValidator can raise TypeError on lines 63 and 69 when num_clients=None is passed, because the comparisons None > 0 and None <= 0 are not guarded, producing a TypeError instead of a clean ValueError.
  • Style — Run.get_result holds lock during blocking call: the lock is held for the full duration of the potentially long get_job_result(timeout=…) call, preventing abort() from running concurrently from another thread.

Confidence Score: 2/5

  • Not safe to merge — ScatterAndGatherScaffold has a critical bug that breaks dynamic error-mode tolerance entirely.
  • The ScatterAndGatherScaffold bug means any federated training job using that workflow with ignore_result_error=None (the default) will panic on the very first client error, even when tolerance should be available. This is a regression introduced by the cherry-pick and affects a production workflow class. The redundant FLModel conversion in BaseModelController is a secondary logic issue that could mask errors on malformed results.
  • nvflare/app_common/workflows/scatter_and_gather_scaffold.py (critical missing reset), nvflare/app_common/workflows/base_model_controller.py (dead code before error check)

Important Files Changed

Filename Overview
nvflare/app_common/utils/error_handling_utils.py New utility module implementing three-mode error handling policy (None/dynamic, False/strict, True/resilient); logic is correct and well-tested.
nvflare/app_common/workflows/base_model_controller.py Refactored to use shared error handling utilities; contains a redundant FLModel conversion (lines 243–245) that is dead code and missing error handling before the error check runs.
nvflare/app_common/workflows/scatter_and_gather.py Refactored to use shared error utilities and adds per-round reset of _current_failed_clients/_current_num_targets; logic is correct for the base class.
nvflare/app_common/workflows/scatter_and_gather_scaffold.py Critical bug: control_flow overrides parent without resetting _current_num_targets (stays 0) and _current_failed_clients per round, causing dynamic error mode to always panic on the first client failure.
nvflare/recipe/poc_env.py New PocEnv class for recipe API; validator has TypeError when num_clients=None is explicitly passed alongside None-or-provided clients.
nvflare/recipe/run.py New Run class wrapping job lifecycle; lock is held across the long blocking get_job_result call, preventing concurrent abort() from running during a timed wait.

Sequence Diagram

sequenceDiagram
    participant Controller as ScatterAndGather/<br/>BaseModelController
    participant EHU as error_handling_utils
    participant Client as FL Client

    Controller->>Client: broadcast_and_wait(task)
    Note over Controller: Reset _current_failed_clients = set()<br/>Reset _current_num_targets = N

    Client-->>Controller: result_received_cb (ClientTask)
    Controller->>Controller: _accept_train_result()

    alt rc == OK
        Controller->>Controller: set TRAINING_RESULT in fl_ctx
        Controller-->>Controller: return True (accepted)
    else rc != OK
        Controller->>EHU: should_ignore_result_error(mode, client, failed_set, N, min)
        alt mode == True (Resilient)
            EHU-->>Controller: True (ignore)
        else mode == False (Strict)
            EHU-->>Controller: False (panic)
        else mode == None (Dynamic)
            EHU->>EHU: failed_clients.add(client_name)
            EHU->>EHU: remaining = N - len(failed_clients)
            alt remaining >= min_responses
                EHU-->>Controller: True (ignore)
            else remaining < min_responses
                EHU-->>Controller: False (panic)
            end
        end

        Controller->>EHU: get_error_handling_message(mode, ...)
        EHU-->>Controller: message string

        alt should_ignore == True
            Controller->>Controller: log warning(msg)
            Controller-->>Controller: return False (rejected, error ignored)
        else should_ignore == False
            Controller->>Controller: panic(msg) / system_panic(msg)
            Controller-->>Controller: return False (rejected, panic triggered)
        end
    end
Loading

Comments Outside Diff (4)

  1. nvflare/app_common/workflows/scatter_and_gather_scaffold.py, line 162-168 (link)

    Missing per-round reset of error tracking state

    ScatterAndGatherScaffold.control_flow fully reimplements the training loop without resetting _current_failed_clients and _current_num_targets before each round's broadcast_and_wait. The parent ScatterAndGather.control_flow does this explicitly (lines 264–265):

    self._current_failed_clients = set()
    self._current_num_targets = len(self._engine.get_clients())

    Because ScatterAndGatherScaffold never runs that code, the dynamic mode (ignore_result_error=None) is broken in two ways:

    1. _current_num_targets is always 0 (never updated from its __init__ default). For any client error, remaining_good_clients = 0 - 1 = -1 < min_clients, so should_ignore_result_error always returns False and triggers a panic — even when tolerance should be available.
    2. _current_failed_clients accumulates across rounds. After round 1, failures from that round bleed into round 2, making the remaining-client calculation pessimistic and causing premature panics in later rounds.

    The fix is to add the same reset block used in the parent before calling broadcast_and_wait inside the loop:

    # Reset tracking for dynamic ignore_result_error mode
    self._current_failed_clients = set()
    self._current_num_targets = len(self._engine.get_clients())
    
    self.broadcast_and_wait(
        task=train_task,
        min_responses=self._min_clients,
        ...
    )
  2. nvflare/app_common/workflows/base_model_controller.py, line 243-254 (link)

    Redundant FLModel conversion before error check

    result_model is created at lines 243–245 (before _accept_train_result) and then unconditionally overwritten inside the try block at lines 257–260 (after the check). The first creation is never read or used — its value is discarded whether or not the result is accepted.

    Beyond being dead code, this ordering has a subtle risk: FLModelUtils.from_shareable(result) at line 243 is called outside any try/except, meaning an exception here (e.g. for a malformed errored result) would propagate before _accept_train_result ever runs, bypassing the configured error handling policy entirely.

    The first block (lines 243–245) should be removed:

    # Make round available on callback fl_ctx before contribution-accept handlers run.
    current_round = client_task.task.data.get_header(AppConstants.CURRENT_ROUND, None)
    if current_round is None:
        current_round = result.get_header(AppConstants.CURRENT_ROUND, None)
    if current_round is not None:
        fl_ctx.set_prop(AppConstants.CURRENT_ROUND, current_round, private=True, sticky=True)
    
    self.event(AppEventType.BEFORE_CONTRIBUTION_ACCEPT)
    accepted = self._accept_train_result(client_name=client_name, result=result, fl_ctx=fl_ctx)
    self.event(AppEventType.AFTER_CONTRIBUTION_ACCEPT)
  3. nvflare/recipe/poc_env.py, line 63-70 (link)

    TypeError when num_clients=None in validator

    The validator performs comparisons with self.num_clients on lines 63 and 69 without guarding against None:

    • Line 63: self.clients is not None and self.num_clients > 0 and ... — if clients is provided but num_clients=None, the expression None > 0 raises TypeError rather than a clean ValueError.
    • Line 69: self.clients is None and self.num_clients <= 0 — if both are None, None <= 0 raises TypeError.

    Since num_clients is declared Optional[conint(gt=0)] = None in the validator, these code paths are reachable. Fix by adding explicit None guards:

    # Check if both num_clients and clients are specified and inconsistent
    if self.clients is not None and self.num_clients is not None and self.num_clients > 0 and len(self.clients) != self.num_clients:
        raise ValueError(
            f"Inconsistent: num_clients={self.num_clients} but clients list has {len(self.clients)} entries"
        )
    
    # Check if num_clients is valid when clients is None
    if self.clients is None and (self.num_clients is None or self.num_clients <= 0):
        raise ValueError("num_clients must be greater than 0")
  4. nvflare/recipe/run.py, line 88-113 (link)

    Lock held during long blocking call prevents concurrent abort()

    get_result acquires self._lock at the top and holds it for the entire call, including the potentially long-running self.exec_env.get_job_result(self.job_id, timeout=timeout). Since abort() also acquires self._lock, any thread calling run.abort() while get_result(timeout=60) is in progress will be blocked for up to timeout seconds.

    This defeats the primary concurrent use-case: "wait for result with a timeout, and abort from a watchdog thread if it hangs." The Run class docstring advertises thread safety, but the current design prevents abort from running during get_result.

    Consider restructuring so that only the state mutations (setting _stopped, _cached_status, _cached_result) are performed under the lock, while the blocking get_job_result call is made outside it:

    def get_result(self, timeout: float = 0.0) -> Optional[str]:
        with self._lock:
            if self._stopped:
                return self._cached_result
    
        # Blocking call outside the lock so abort() can run concurrently
        result = None
        try:
            result = self.exec_env.get_job_result(self.job_id, timeout=timeout)
        except Exception as e:
            self.logger.warning(f"Failed to get job result: {e}")
    
        # Re-acquire lock to update shared state
        with self._lock:
            self._cached_result = result
            try:
                self._cached_status = self.exec_env.get_job_status(self.job_id)
            except Exception as e:
                self.logger.warning(f"Failed to get job status: {e}")
            try:
                self.exec_env.stop(clean_up=True)
            except Exception as e:
                self.logger.warning(f"Failed to stop execution environment: {e}")
            finally:
                self._stopped = True
        return result

Last reviewed commit: 9e87f6c

@YuanTingHsieh YuanTingHsieh changed the title Cherry-pick of [2.7] Add dynamic ignore_result_error logic and POC environment cleanup (#4084) Cherry-pick of #4084 and #4132 Mar 11, 2026
chesterxgchen and others added 2 commits March 11, 2026 11:17
…up (NVIDIA#4084)

This PR introduces a flexible 3-mode error handling policy for FL
controllers and adds proper cleanup for POC environment after job
execution.

- **Dynamic `ignore_result_error` mode**: Controllers now support three
modes for handling client result errors:
- `None` (default): **Dynamic mode** - ignore errors if
`min_responses`/`min_clients` can still be reached, panic otherwise
  - `False`: **Strict mode** - always panic on any client error
- `True`: **Resilient mode** - always ignore client errors and continue

- **Shared utility functions**: Created reusable error handling logic to
avoid code duplication across controllers

- Added `stop()` method to `ExecEnv` base class (default no-op)
- `Run.get_result()` now calls `exec_env.stop()` in a `finally` block
- Ensures POC services are always stopped after job execution, making
each run independent

| File | Changes |
|------|---------|
| `nvflare/app_common/utils/error_handling_utils.py` | **New** - Shared
utility functions: `should_ignore_result_error()`,
`get_error_handling_message()` |
| `nvflare/app_common/workflows/base_model_controller.py` | Updated
`ignore_result_error` default to `None`, added 3-mode logic in
`_accept_train_result()`, added tracking for `_current_failed_clients`,
`_current_num_targets`, `_current_min_responses` |
| `nvflare/app_common/workflows/scatter_and_gather.py` | Updated
`ignore_result_error` default to `None`, added 3-mode logic in
`_accept_train_result()`, added tracking variables, updated docstring |
| `nvflare/app_common/workflows/scatter_and_gather_scaffold.py` |
Updated `ignore_result_error` signature and docstring |
| `nvflare/app_common/workflows/scaffold.py` | Updated
`ignore_result_error` docstring |
| `nvflare/recipe/spec.py` | Added `stop()` method to `ExecEnv` base
class |
| `nvflare/recipe/run.py` | Updated `get_result()` to call
`exec_env.stop()` in finally block |
| `tests/unit_test/app_common/utils/error_handling_utils_test.py` |
**New** - 17 unit tests for error handling logic |

Controllers that inherit from `BaseModelController` automatically get
the new behavior:
- `FedAvg`, `BaseFedAvg`, `Scaffold`, `Cyclic`, `PTFedAvg`, `FedOpt`,
etc.

Controllers with direct changes:
- `ScatterAndGather`, `ScatterAndGatherScaffold`

- [x] Added 17 unit tests for `ignore_result_error` logic
- [x] All tests pass
- [x] Code formatted with black, isort, flake8
## Summary

This PR fixes an issue with the `Run` class where calling `get_status()`
after `get_result()` would fail because the POC environment was already
stopped. It also ensures proper cleanup of the POC workspace.

### Changes

**`nvflare/recipe/run.py`**
- Added state caching: `_stopped`, `_cached_status`, `_cached_result`
attributes
- Added proper logging using `get_obj_logger(self)`
- `get_result()` now caches status before stopping POC for later
retrieval
- `get_status()` returns cached status after POC is stopped
- `abort()` is a no-op after POC is stopped

**`nvflare/recipe/spec.py`**
- Updated `ExecEnv.stop()` signature to accept `clean_up: bool = False`
parameter

**`nvflare/recipe/poc_env.py`**
- Renamed parameter `clean_poc` to `clean_up` for consistency
- Workspace removal now only happens when `clean_up=True` (was
unconditional before)

**`tests/unit_test/recipe/run_test.py`**
- Updated 13 unit tests to verify new behavior

## Test Plan

- [x] 13 unit tests pass for `Run` class
- [x] Integration test verified with hello-numpy example using PocEnv
  - Job executes and completes successfully
  - Status is cached correctly (`FINISHED:COMPLETED`)
  - Result is cached for subsequent calls
  - POC workspace is cleaned up after `get_result()`

---------

Co-authored-by: Peter Cnudde <pcnudde@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants