Skip to content

Fix pull_task to enforce server-required minimum timeout#4198

Open
YuanTingHsieh wants to merge 4 commits intoNVIDIA:mainfrom
YuanTingHsieh:enforce-min-get-task-timeout
Open

Fix pull_task to enforce server-required minimum timeout#4198
YuanTingHsieh wants to merge 4 commits intoNVIDIA:mainfrom
YuanTingHsieh:enforce-min-get-task-timeout

Conversation

@YuanTingHsieh
Copy link
Collaborator

Summary

  • When tensor streaming is active, the server sets a minimum get_task_timeout via MIN_GET_TASK_TIMEOUT. If the caller passes a smaller config-based timeout, it was used as-is, causing fast clients to time out while the server waits for all clients to receive pushed tensors.
  • Use `max(timeout, self.timeout)` so the caller's value is respected when larger, but never falls below the server-required minimum.

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

Copilot AI review requested due to automatic review settings February 17, 2026 23:31
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 17, 2026

Greptile Summary

This PR fixes a bug in pull_task where a caller-supplied timeout smaller than the server-required minimum (self.timeout, bumped via MIN_GET_TASK_TIMEOUT during tensor streaming) was used as-is, causing clients to time out prematurely while the server was still pushing tensors to slower clients.

Changes made:

  • nvflare/private/fed/client/communicator.py: Adds an else branch so that an explicit caller timeout is clamped upward to self.timeout via max(timeout, self.timeout), while still respecting larger caller-provided values.
  • tests/unit_test/app_opt/tensor_stream/timeout_management_test.py: Adds test_explicit_timeout_enforces_server_minimum with 5 parametrised cases covering caller-below, caller-equal, and caller-above the server minimum.

One minor observation:

  • self.timeout is read on line 377 without holding _state_lock, while writes to self.timeout (line 416) are guarded by the lock. This was already true for the pre-existing if not timeout: timeout = self.timeout branch (line 375) and is not newly introduced by this PR, but the inconsistency is worth being aware of in a multithreaded setting.

Confidence Score: 4/5

  • This PR is safe to merge; the fix is minimal, correct, and well-tested.
  • The logic change is a single max() call in exactly the right place, directly addressing the described timeout race. Five parametrised test cases cover all interesting boundary conditions. The only concern is a pre-existing inconsistency in lock usage around self.timeout reads, which is a best-practice issue rather than an active bug.
  • No files require special attention beyond the minor locking note in nvflare/private/fed/client/communicator.py.

Important Files Changed

Filename Overview
nvflare/private/fed/client/communicator.py Adds else: timeout = max(timeout, self.timeout) to enforce the server-required minimum when a caller-supplied timeout is smaller; change is minimal and correct, though self.timeout is read outside _state_lock.
tests/unit_test/app_opt/tensor_stream/timeout_management_test.py Adds test_explicit_timeout_enforces_server_minimum with 5 well-chosen parametrised cases covering caller-below, caller-equal, and caller-above the server minimum; tests are clear and correctly validate the new behaviour.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Communicator
    participant Server

    Note over Communicator: self.timeout = 5.0 (initial)

    Caller->>Communicator: pull_task(timeout=5.0)
    Communicator->>Server: send_request(timeout=5.0)
    Server-->>Communicator: response with MIN_GET_TASK_TIMEOUT=360.0
    Note over Communicator: self.timeout bumped → 360.0

    Caller->>Communicator: pull_task(timeout=30.0)
    Note over Communicator: max(30.0, 360.0) = 360.0 ✅
    Communicator->>Server: send_request(timeout=360.0)

    Caller->>Communicator: pull_task(timeout=400.0)
    Note over Communicator: max(400.0, 360.0) = 400.0 ✅
    Communicator->>Server: send_request(timeout=400.0)

    Caller->>Communicator: pull_task(timeout=None)
    Note over Communicator: timeout = self.timeout = 360.0 ✅
    Communicator->>Server: send_request(timeout=360.0)
Loading

Last reviewed commit: fc2d291

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@YuanTingHsieh
Copy link
Collaborator Author

/build

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes a timeout enforcement bug in the client communicator's pull_task method. When tensor streaming is active, the server communicates a minimum required timeout (MIN_GET_TASK_TIMEOUT) to ensure all clients have sufficient time to receive pushed tensors. Previously, if a caller passed a smaller config-based timeout value to pull_task(), it would be used as-is, causing fast clients to time out while the server waits. The fix uses max(timeout, self.timeout) to ensure the caller's larger timeout values are respected while never falling below the server-required minimum.

Changes:

  • Modified timeout handling in pull_task() to enforce server-required minimum timeout
  • Added explanatory inline comment documenting the rationale for the new behavior

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 27, 2026

Additional Comments (1)

nvflare/private/fed/client/communicator.py, line 377
self.timeout read outside _state_lock

self.timeout is written under _state_lock (line ~413) to guard against concurrent updates from the MIN_GET_TASK_TIMEOUT response path, but both reads here (line 375 and 377) happen without the lock. In practice CPython's GIL makes a plain attribute read atomic, so this is unlikely to cause a real bug today, but for consistency and correctness it is worth reading under the same lock:

        with self._state_lock:
            effective_timeout = self.timeout
        if not timeout:
            timeout = effective_timeout
        else:
            timeout = max(timeout, effective_timeout)

This matches the locking discipline already applied to the write site and makes the intent clearer.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants