fabtests/efa/multi_ep_stress: Fix thread safety and simplify architecture#11843
fabtests/efa/multi_ep_stress: Fix thread safety and simplify architecture#11843alekswn merged 9 commits intoofiwg:mainfrom
Conversation
4f1c781 to
6422718
Compare
| @@ -0,0 +1,125 @@ | |||
| /* | |||
| * Copyright (c) 2026 Amazon.com, Inc. or its affiliates. All rights reserved. | |||
| * | |||
There was a problem hiding this comment.
If we want this to be in the common header folder, can you make it as a separate commit for community to review
fabtests/include/ft_random.h
Outdated
| #include "shared.h" | ||
|
|
||
| #ifdef __GNUC__ | ||
| #define likely(x) __builtin_expect(!!(x), 1) |
There was a problem hiding this comment.
Prefer FT_UNLIKELY and FT_LIKELY similar to the OFI_LIKELY naming pattern
There was a problem hiding this comment.
Also, I think it should be in shared.h for general
| } | ||
|
|
||
| static int wait_for_comp(struct fid_cq *cq, int num_completions, int64_t timeout_ns) | ||
| static int wait_for_comp(struct fid_cq *cq, int num_completions) |
There was a problem hiding this comment.
Now we have a signature change of the function, can you add docstring to explain the newest behavior
|
|
||
| static int calculate_worker_distribution(int sender_id, int *num_peers, | ||
| int *peer_ids) | ||
| static int calculate_worker_distribution(uint16_t current_worker_id, |
There was a problem hiding this comment.
Since you are here, add documentation?
| while (true) { | ||
| struct ep_message msg; | ||
| ret = ep_message_queue_try_pop(ctx->control_queue, &msg); | ||
| if (ret == 0) { |
There was a problem hiding this comment.
did we remove av before this PR, I could not find it
There was a problem hiding this comment.
No we did not, should not we?
There was a problem hiding this comment.
I think making it optional with a cmd line arg would be good
| AC_CHECK_TYPE([__int128], | ||
| [AC_DEFINE(HAVE___INT128, 1, [Set to 1 to use 128-bit ints])]) | ||
|
|
||
| dnl Check for random_r function |
There was a problem hiding this comment.
similar comment, common code change should be in a separate commit
| ret = pthread_mutex_lock(&q->mutex); | ||
| if (ret) | ||
| return ret; | ||
| while (q->size == EP_MESSAGE_QUEUE_CAPACITY) { |
There was a problem hiding this comment.
It's not clear to me how such message queue works.... The goal is for receiver to send its new EP address to sender which is on a different node... How does the queue realize it?? does the message in the queue finally get sent over the wire?
There was a problem hiding this comment.
There is one queue on receiver side and multiple queues on sender side.
Receiver workers push their messages to the common queue. Receiver's main thread pushes these messages to fabtests's OOB socket.
Sender's main thread pulls messages from the OOB socket and routes them to individual worker queues.
Sender threads pull messages from their queues and apply AV updates.
There was a problem hiding this comment.
Then a sender thread can start as soon as it receives one address. Why wait for all addresses?
There was a problem hiding this comment.
Will add that in the coming revision
| - **Receiver EP Cycles**: Controlled by `--receiver-ep-cycles` (default: 1) | ||
|
|
||
| Within each cycle: | ||
| 1. Endpoint is created with fresh CQ/AV resources |
There was a problem hiding this comment.
Test has shared AV and shared CQ options
| - Workers are assigned specific peer connections based on distribution logic | ||
| - Workers operate independently with thread-safe coordination | ||
|
|
||
| ## Key Features |
There was a problem hiding this comment.
Random sleeps with a configurable seed and thread safe per thread random number generation to make reproductions easy is a key feature
|
|
||
| For each sender worker thread: | ||
|
|
||
| 1. **Wait for peer addresses**: Block until all receiver endpoint addresses arrive via control queue |
There was a problem hiding this comment.
Can we have each sender thread start as soon as it receives the receiver address? Why block for all addresses? Each sender may not even talk to all of the receivers
That can help catch issues with overlapping control and data plane calls
There was a problem hiding this comment.
Will be done in coming revision
| - Post receive operations (for msg operations) | ||
| - For RMA writedata, skip posting (writes are one-sided) | ||
| 3. **Cycle completion**: | ||
| - For RMA: always wait for completions |
There was a problem hiding this comment.
It was a special condition I had introduced for writedata operation. Actually now I can see it is not necessary. Removing in next revision.
| while (true) { | ||
| struct ep_message msg; | ||
| ret = ep_message_queue_try_pop(ctx->control_queue, &msg); | ||
| if (ret == 0) { |
There was a problem hiding this comment.
I think making it optional with a cmd line arg would be good
| ret = pthread_mutex_lock(&q->mutex); | ||
| if (ret) | ||
| return ret; | ||
| while (q->size == EP_MESSAGE_QUEUE_CAPACITY) { |
There was a problem hiding this comment.
Then a sender thread can start as soon as it receives one address. Why wait for all addresses?
6422718 to
aed1bae
Compare
|
|
||
| // Worker status tracking | ||
| struct worker_status { | ||
| // Multy producer single consumer thread-safe queue |
There was a problem hiding this comment.
Nit: spelling "multy producer-> multi-producer". No need to re-spin the patch just for this though
| b.tv_sec += timeout; | ||
| ret = pthread_mutex_timedlock(&shared_cq_lock, &b); | ||
| if (ret == ETIMEDOUT) { | ||
| fprintf(stderr, |
There was a problem hiding this comment.
Nit: shouldn't this be one line so that we can search for "timeout expired while waiting" in the code base?
|
bot:aws:retest |
| int buffer_multiplier) | ||
| { | ||
| int ret; | ||
| const struct ep_message terminator = {0}; |
There was a problem hiding this comment.
This assumes EP_MESSAGE_TYPE_TERMINATOR is 0, can we make it assigned explicitly so it won't be broken accidentally when someone change EP_MESSAGE_TYPE_TERMINATOR to 1
| // Dispatch control messages from OOB channel to worker's control channels | ||
| struct ep_message msg; | ||
| while (true) { | ||
| ret = ft_sock_recv(oob_sock, (void*)&msg, sizeof(msg)); |
There was a problem hiding this comment.
When ret is non zero that means the receiver closed its socket? need some comments here
| } | ||
| } | ||
| struct ep_message terminator = {0}; | ||
| ret = ft_sock_send(oob_sock, &terminator, sizeof(terminator)); |
There was a problem hiding this comment.
similar comments on terminator explicit assignment
|
|
||
| for (size_t i = 0; i < ctx->num_peers; i++) { | ||
| msg.info.worker_id = ctx->peer_ids[i]; | ||
| msg.info.peer_idx = ctx->worker_id / topts.num_sender_workers; |
There was a problem hiding this comment.
IIUC, worker_id here is the sender's ctx id (worker_id), peer_idx is the index of this receiver worker in sender worker's peer_ids array. Right? Some more documentation is appreciated, otherwise the reader won't know what they refer ti
| ret = pthread_create(&threads[i], NULL, run_sender_worker, | ||
| &workers[i]); | ||
| assert(msg.type == EP_MESSAGE_TYPE_TERMINATOR); | ||
| ret = ep_message_queue_push(workers[i].control_queue, &msg); |
There was a problem hiding this comment.
This means sender got a terminating signal from the receiver (the whole), right? Need a comment to explain what it means
| 2. **Enter EP cycle loop**: | ||
| - Create new endpoint with CQ/AV | ||
| - Insert all peer addresses into AV | ||
| - Insert all cached peer addresses into AV |
There was a problem hiding this comment.
what does cached mean here?
There was a problem hiding this comment.
I meant restoring AV vector addresses saved at ep_addr array. I think it's referenced as cache in code comments as well.
Added reference to "caching" AV updates to p.2
| For each sender worker thread: | ||
|
|
||
| 1. **Wait for peer addresses**: Block until all receiver endpoint addresses arrive via control queue | ||
| 2. **Enter EP cycle loop**: |
There was a problem hiding this comment.
since you remove #1, you need to change 2 to 1 here?
| || ops_posted_for_peer[peer_idx] == topts.msgs_per_sender) { | ||
| if (++peer_idx == ctx->num_peers) | ||
| peer_idx = 0; | ||
| sched_yield(); |
There was a problem hiding this comment.
what does sched_yield do here?
There was a problem hiding this comment.
It's useful to allow main thread to make progress faster if we're running more workers than CPU cores.
Added a comment.
| printf("Sender %d: All cycles completed, total ops: %lu\n", | ||
| ctx->worker_id, total_ops); | ||
|
|
||
| do { |
There was a problem hiding this comment.
what does this loop of pops do?
| printf("Sender %d: All cycles completed, total ops: %lu\n", | ||
| ctx->worker_id, total_ops); | ||
|
|
||
| do { |
There was a problem hiding this comment.
So the goal of the loop is to waiting for a terminator message from the main thread? Need a comment
There was a problem hiding this comment.
Yes, also there might be some late updates which need to be drained. That can happen if sender had been skipping completion waiting frequently, but receiver did wait for most completions.
Added a comment
There was a problem hiding this comment.
also there might be some late updates which need to be drained. That can happen if sender had been skipping completion waiting frequently, but receiver did wait for most completions.
Does sender care about these late updates (except for terminator) after it is done? It doesn't care, right?
There was a problem hiding this comment.
No, it does not care.
it's too late to apply these updates, so sender just skips them while waiting for terminator.
Replace non-thread-safe rand() calls in multi_ep_stress.c with a new thread-safe random number generation API. The new ft_random.h header provides: - Thread-local storage for random state using _Thread_local - Fallback to rand_r() when random_r() is not available - Autoconf detection for random_r() function availability - Per-thread seeded random number generators to eliminate race conditions This fixes potential race conditions in multi-threaded stress tests where multiple threads were sharing the global rand() state. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
…ture The multi_ep_stress test had multiple race conditions when running with many worker threads and endpoint recycling. This commit redesigns the test architecture to eliminate these races while simplifying the code. Thread Safety Fixes: - Add thread-safe random number generation using ft_random.h with _Thread_local storage. The previous rand() calls caused races when multiple threads accessed the shared global state. - Replace socket-based control channel with MPSC message queue using condition variables. The socket approach had complex error handling and race conditions during endpoint updates. The new queue provides clean thread-safe communication between workers. - Introduce context_pool for pre-registered memory buffers and fi_context2 structures. This eliminates repeated memory registration overhead and ensures proper cleanup without double-free issues. Architecture Simplification: - Unify sender_context and receiver_context into single worker_context structure. Both sides now use the same code paths, reducing duplication and making the test easier to maintain. - Remove --sender-addr command line option. The test now uses the existing OOB (out-of-band) mechanism for address exchange, eliminating the need for manual address specification and making pytest tests simpler. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
Document the architecture, workflow, and stress testing methodology of the multi_ep_stress test including endpoint recycling, dynamic address updates, and resource sharing modes. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
Added new command line argument `--remove_av` to enable removing old AV on sender side if an AV update has been recived from reciver. Do not remove old AV by default. All old AVs are discarded on sender recycling independently of the flag value. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
There is a possability for sender workers to exit before applying all updates if they skip waiting for completions. This can cause main thread to hung while trying to push an update message to control queue. After present commit workers will wait for terminator message before exiting. Main thread sends terminate message to each thread if one recived on OOB channel. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
…edata The restriction was for writedata specifically and is not necessary. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
Previously sender had been waiting for initial updates from all peers before posting any operations. This commit allows sender worker to start posting operations once the first update received. Signed-off-by: Alexey Novikov <nalexey@amazon.com>
Signed-off-by: Alexey Novikov <nalexey@amazon.com>
Signed-off-by: Alexey Novikov <nalexey@amazon.com>
10e9b4b to
1b152f0
Compare
The multi_ep_stress test had multiple race conditions when running with many worker threads and endpoint recycling. This commit redesigns the test architecture to eliminate these races while simplifying the code.
Thread Safety Fixes:
Add thread-safe random number generation using ft_random.h with _Thread_local storage. The previous rand() calls caused races when multiple threads accessed the shared global state.
Replace socket-based control channel with MPSC message queue using condition variables. The socket approach had complex error handling and race conditions during endpoint updates. The new queue provides clean thread-safe communication between workers.
Introduce context_pool for pre-registered memory buffers and fi_context2 structures. This eliminates repeated memory registration overhead and ensures proper cleanup without double-free issues.
Architecture Simplification:
Unify sender_context and receiver_context into single worker_context structure. Both sides now use the same code paths, reducing duplication and making the test easier to maintain.
Remove --sender-addr command line option. The test now uses the existing OOB (out-of-band) mechanism for address exchange, eliminating the need for manual address specification and making pytest tests simpler.