Skip to content

Added origin to the stream lookup key#4032

Open
nvidianz wants to merge 6 commits intoNVIDIA:mainfrom
nvidianz:add_origin_in_stream_matching
Open

Added origin to the stream lookup key#4032
nvidianz wants to merge 6 commits intoNVIDIA:mainfrom
nvidianz:add_origin_in_stream_matching

Conversation

@nvidianz
Copy link
Collaborator

Description

Added origin to the stream lookup key to further reduce the chance of a collision.

Types of changes

  • Non-breaking change (fix or new feature that would not break existing functionality).
  • Breaking change (fix or new feature that would cause existing functionality to change).
  • New tests added to cover the changes.
  • Quick tests passed locally by running ./runtest.sh.
  • In-line docstrings updated.
  • Documentation updated.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 23, 2026

Greptile Overview

Greptile Summary

Changed the stream lookup key in RxTask.rx_task_map from using only stream ID (sid) to a composite key of (origin, sid) to reduce collision risk when multiple senders use the same stream IDs.

  • Updated rx_task_map type annotation to Dict[Tuple[str, int], "RxTask"]
  • Modified all dictionary operations (get, set, pop) to use the (origin, sid) tuple key
  • Added new test file stream_utils_test.py to validate gen_stream_id function behavior
  • All four references to rx_task_map in byte_receiver.py updated consistently

The change is backward-compatible in behavior since origin was already being tracked in RxTask instances.

Confidence Score: 5/5

  • This PR is safe to merge - simple refactoring with consistent updates across all usage points
  • The changes are straightforward and well-contained: updating a dictionary key from a single value to a tuple. All four references to rx_task_map have been updated consistently (type annotation, get, set, pop operations). The logic remains unchanged except for the improved collision avoidance. The new test file has minor cleanup needed but doesn't affect the core change.
  • No files require special attention - all changes are correctly implemented

Important Files Changed

Filename Overview
nvflare/fuel/f3/streaming/byte_receiver.py Changed rx_task_map lookup key from sid alone to (origin, sid) tuple to prevent stream ID collisions across different origins
tests/unit_test/fuel/f3/streaming/stream_utils_test.py New test file for gen_stream_id function with basic tests; unused helper function generate_stream_ids leftover from removed multiprocess tests

Sequence Diagram

sequenceDiagram
    participant Sender as Stream Sender (Origin A)
    participant Cell as CoreCell
    participant ByteReceiver
    participant RxTask
    participant Map as rx_task_map

    Note over Sender,Map: First chunk arrives (seq=0)
    Sender->>Cell: Send stream message (SID=123, Origin=A)
    Cell->>ByteReceiver: _data_handler(message)
    ByteReceiver->>RxTask: find_or_create_task(message)
    RxTask->>Map: get((origin_A, 123))
    Map-->>RxTask: None (task not found)
    RxTask->>RxTask: Create new RxTask(sid=123, origin=A)
    RxTask->>Map: Store task at key (origin_A, 123)
    RxTask-->>ByteReceiver: Return new task
    ByteReceiver->>ByteReceiver: Invoke stream callback

    Note over Sender,Map: Subsequent chunks arrive
    Sender->>Cell: Send stream message (SID=123, Origin=A)
    Cell->>ByteReceiver: _data_handler(message)
    ByteReceiver->>RxTask: find_or_create_task(message)
    RxTask->>Map: get((origin_A, 123))
    Map-->>RxTask: Return existing task
    RxTask->>RxTask: process_chunk(message)

    Note over Sender,Map: Final chunk processed
    RxTask->>RxTask: stop() called
    RxTask->>Map: pop((origin_A, 123))
    Map-->>RxTask: Task removed

    Note over Map: Key structure prevents collision:<br/>Different origins with same SID<br/>are now stored separately
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@nvidianz
Copy link
Collaborator Author

/build

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@nvidianz
Copy link
Collaborator Author

/build

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@chesterxgchen
Copy link
Collaborator

/build

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +20 to +30
def generate_stream_ids(num_ids: int, result_queue: mp.Queue) -> None:
"""Worker function to generate stream IDs in a separate process.

Args:
num_ids: Number of stream IDs to generate
result_queue: Queue to put the generated IDs
"""
ids = []
for _ in range(num_ids):
ids.append(gen_stream_id())
result_queue.put(ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused helper function, leftover from removed multiprocess tests

Suggested change
def generate_stream_ids(num_ids: int, result_queue: mp.Queue) -> None:
"""Worker function to generate stream IDs in a separate process.
Args:
num_ids: Number of stream IDs to generate
result_queue: Queue to put the generated IDs
"""
ids = []
for _ in range(num_ids):
ids.append(gen_stream_id())
result_queue.put(ids)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants