TL/UCP: add allreduce ring algorithm#1258
TL/UCP: add allreduce ring algorithm#1258wfaderhold21 wants to merge 1 commit intoopenucx:masterfrom
Conversation
|
| Filename | Overview |
|---|---|
| src/components/tl/ucp/allreduce/allreduce_ring.c | New ring allreduce implementation using a schedule that chains reduce_scatter_ring + allgather_ring. Error handling in the init path deviates from the established codebase pattern and may cause issues when tasks have already been added to the schedule before a failure. |
| src/components/tl/ucp/allreduce/allreduce.h | Adds UCC_TL_UCP_ALLREDUCE_ALG_RING enum value (placed before LAST) and function declaration for ucc_tl_ucp_allreduce_ring_init. Changes look correct. |
| src/components/tl/ucp/allreduce/allreduce.c | Registers the new ring algorithm info entry with correct id, name, and description using designated-initializer syntax. |
| src/components/tl/ucp/tl_ucp_coll.c | Adds RING case to the alg_id_to_init switch, wiring UCC_TL_UCP_ALLREDUCE_ALG_RING to ucc_tl_ucp_allreduce_ring_init. Straightforward and correct. |
| src/components/tl/ucp/Makefile.am | Adds allreduce_ring.c to the build system. Correct. |
| test/gtest/coll/test_allreduce.cc | Adds ring algorithm tests (ring, ring_edge_cases, ring_persistent) and moves rab/rab_pipelined further down. Counts in new ring tests are divisible by their respective team sizes. ring_persistent only covers the non-inplace code path, missing coverage of the inplace branch in allreduce_ring_start. |
Last reviewed commit: 096bae9
test/gtest/coll/test_allreduce.cc
Outdated
| } | ||
|
|
||
| // Test with various data sizes: small, medium, large | ||
| for (auto count : {8, 65536, 123567}) { |
There was a problem hiding this comment.
test counts not divisible by team size (n_procs=15)
The ring algorithm requires count % tsize == 0 (enforced at allreduce_ring.c:101-105). With n_procs=15:
- count 8: 8 % 15 = 8 (fails)
- count 65536: 65536 % 15 = 1 (fails)
- count 123567: 123567 % 15 = 12 (fails)
Use counts divisible by 15, e.g. {15, 65520, 123570}
| for (auto count : {8, 65536, 123567}) { | |
| for (auto count : {15, 65520, 123570}) { |
test/gtest/coll/test_allreduce.cc
Outdated
| UccTeam_h team = job.create_team(team_size); | ||
| UccCollCtxVec ctxs; | ||
|
|
||
| for (auto count : {0, 1, 3, 17}) { |
There was a problem hiding this comment.
most test counts not divisible by team sizes
The ring algorithm requires count % tsize == 0. Most combinations will fail:
- team_size=3: only count 0 and 3 work (1, 17 fail)
- team_size=7: only count 0 works (1, 3, 17 fail)
- team_size=13: only count 0 works (1, 3, 17 fail)
Use counts that are multiples of all team sizes (e.g., LCM(3,7,13)=273):
| for (auto count : {0, 1, 3, 17}) { | |
| for (auto count : {0, 273, 546, 819}) { |
src/components/tl/ucp/tl_ucp_task.h
Outdated
| struct { | ||
| void *scratch; | ||
| ucc_mc_buffer_header_t *scratch_mc_header; | ||
| ucc_ee_executor_task_t *etask; | ||
| ucc_ee_executor_t *executor; | ||
| ucc_tl_ucp_copy_task_t *copy_task; | ||
| int phase; | ||
| int step; | ||
| size_t chunk_size; | ||
| int p2p_posted; |
There was a problem hiding this comment.
unused struct - schedule-based implementation doesn't use task state
The schedule-based ring allreduce (allreduce_ring.c) uses ucc_schedule_t and doesn't access task->allreduce_ring. This struct appears to be leftover from the original non-schedule implementation (PR #1082).
| struct { | |
| void *scratch; | |
| ucc_mc_buffer_header_t *scratch_mc_header; | |
| ucc_ee_executor_task_t *etask; | |
| ucc_ee_executor_t *executor; | |
| ucc_tl_ucp_copy_task_t *copy_task; | |
| int phase; | |
| int step; | |
| size_t chunk_size; | |
| int p2p_posted; |
Sergei-Lebedev
left a comment
There was a problem hiding this comment.
looks good to me, but the requirement of count%tsize is too strict imho. We can modify ring reduce scatter and ring allgather so that they can handle remainder correctly similar to knomial algorithms. This can be improved in next PR though
Pls fix greptile comments
| @@ -0,0 +1,196 @@ | |||
| /** | |||
| * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |||
6cb06b1 to
ce2cde7
Compare
| * For reduce_scatter_ring: | ||
| * - dst.info.count should be the per-rank output count (count/tsize) | ||
| * - The algorithm internally computes total = dst.info.count * tsize |
There was a problem hiding this comment.
comment is misleading - states count should be per-rank (count/tsize), but code passes total count at line 132. This is actually correct because IN_PLACE flag is set at line 134, and reduce_scatter_ring expects total count when in-place. Update comment to clarify this.
| if (UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.reduce_avg_pre_op && | ||
| coll_args->args.op == UCC_OP_AVG) { | ||
| return UCC_ERR_NOT_SUPPORTED; | ||
| } | ||
|
|
||
| /* Check that count is divisible by team size for ring algorithm */ | ||
| if (count % tsize != 0) { | ||
| tl_debug(team->context->lib, | ||
| "ring requires count (%zu) divisible by team size (%u)", | ||
| count, tsize); | ||
| return UCC_ERR_NOT_SUPPORTED; | ||
| } | ||
|
|
There was a problem hiding this comment.
Wrong count for ring
ucc_tl_ucp_allreduce_ring_init enforces count % tsize == 0 (where count is dst.info.count, i.e., total elements per rank), but then passes that same total count as rs_args.args.dst.info.count while also forcing UCC_COLL_ARGS_FLAG_IN_PLACE. In reduce_scatter_ring the in-place path interprets dst.info.count as per-rank block count and internally computes total count as dst.info.count * size (reduce_scatter/reduce_scatter_ring.c:95-114, 213-231, 364-366), so this will run with an effective total of count*tsize and produce incorrect offsets/results for normal allreduce inputs.
Also note that count=0 will currently be rejected by the count % tsize check even though reduce_scatter_ring supports it (it just results in no work), so edge-case tests can’t pass as written.
test/gtest/coll/test_allreduce.cc
Outdated
| for (auto count : {0, 273, 546, 819}) { | ||
| SET_MEM_TYPE(UCC_MEMORY_TYPE_HOST); | ||
| this->set_inplace(TEST_NO_INPLACE); | ||
| this->data_init(team_size, TypeParam::dt, count, ctxs, false); | ||
| UccReq req(team, ctxs); | ||
|
|
||
| req.start(); | ||
| req.wait(); | ||
| EXPECT_EQ(true, this->data_validate(ctxs)); |
There was a problem hiding this comment.
Zero-count ring test fails
ring_edge_cases includes count=0, but the ring allreduce init currently returns UCC_ERR_NOT_SUPPORTED when count % team_size != 0 (and 0 % team_size is 0, but earlier validation paths in data_init/coll selection can still make this fail depending on how count=0 is handled). More importantly, with the current allreduce_ring wiring, even if init succeeded, the reduce-scatter/allgather composition assumes a meaningful block size; it’s safer to avoid count=0 here unless the algorithm explicitly supports it end-to-end.
TEST: add tests for allreduce ring algorithm TL/UCP: convert allreduce ring to schedule rs+ag TEST: update and reorder gtest allreduce tests
e1229b6 to
096bae9
Compare
| /* Add reduce-scatter to schedule and subscribe to schedule start */ | ||
| UCC_CHECK_GOTO(ucc_schedule_add_task(schedule, rs_task), err_ag, status); | ||
| UCC_CHECK_GOTO( | ||
| ucc_event_manager_subscribe( | ||
| &schedule->super, | ||
| UCC_EVENT_SCHEDULE_STARTED, | ||
| rs_task, | ||
| ucc_task_start_handler), | ||
| err_ag, | ||
| status); | ||
|
|
||
| /* Add allgather to schedule and subscribe to reduce-scatter completion */ | ||
| UCC_CHECK_GOTO(ucc_schedule_add_task(schedule, ag_task), err_ag, status); | ||
| UCC_CHECK_GOTO( | ||
| ucc_event_manager_subscribe( | ||
| rs_task, UCC_EVENT_COMPLETED, ag_task, ucc_task_start_handler), | ||
| err_ag, | ||
| status); | ||
|
|
||
| schedule->super.post = ucc_tl_ucp_allreduce_ring_start; | ||
| schedule->super.finalize = ucc_tl_ucp_allreduce_ring_finalize; | ||
|
|
||
| *task_h = &schedule->super; | ||
| return UCC_OK; | ||
|
|
||
| err_ag: | ||
| ag_task->finalize(ag_task); | ||
| err_rs: | ||
| rs_task->finalize(rs_task); | ||
| err_schedule: | ||
| ucc_tl_ucp_put_schedule(schedule); | ||
| out: |
There was a problem hiding this comment.
Error handling inconsistent with codebase — potential double-finalize of scheduled tasks
Every other schedule-based TL/UCP algorithm (bcast_sag_knomial_init, reduce_scatter_ring_init, allreduce_sra_knomial_frag_init) handles errors in a single out: label that only calls ucc_tl_ucp_put_schedule(schedule). This PR introduces err_ag/err_rs labels that manually call task->finalize() before calling put_schedule.
The problem is that ucc_schedule_finalize (called in the normal allreduce_ring_finalize path) iterates schedule->tasks[] and calls each task's finalize. If an error occurs after ucc_schedule_add_task(schedule, rs_task) succeeds (e.g., the ucc_event_manager_subscribe for rs_task fails), we jump to err_ag:, which calls:
ag_task->finalize(ag_task)— correct (ag_task not yet added)rs_task->finalize(rs_task)— rs_task was already added toschedule->tasks[]ucc_tl_ucp_put_schedule(schedule)— returns schedule to pool
ucc_tl_ucp_put_schedule is ucc_mpool_put, which does not call ucc_schedule_finalize, so there is no double-finalize in the error path itself. However, schedule->tasks[0] now holds a dangling pointer to the freed rs_task. If the mpool reuses the schedule object (via ucc_schedule_init, which correctly resets n_tasks = 0), the dangling pointer is never accessed. This is technically safe, but fragile.
More importantly, after ucc_schedule_add_task(schedule, rs_task) is called, rs_task->schedule = schedule is set and an internal UCC_EVENT_COMPLETED_SCHEDULE subscription is added to rs_task's event manager. Manually calling rs_task->finalize while the schedule still references it is architecturally inconsistent with how UCC schedules take ownership of their tasks.
The established pattern is:
err_schedule:
ucc_tl_ucp_put_schedule(schedule);
out:
return status;Tasks that were not added to the schedule (only rs_task and ag_task before any ucc_schedule_add_task call) should be finalized before put_schedule. Once tasks are added, they are owned by the schedule and should not be manually finalized in the error path.
| UccReq req(team, ctxs); | ||
|
|
||
| req.start(); | ||
| req.wait(); | ||
| EXPECT_EQ(true, this->data_validate(ctxs)); | ||
| this->data_fini(ctxs); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| TYPED_TEST(test_allreduce_alg, ring_persistent) { | ||
| // Test persistent operation - results should be consistent across multiple calls | ||
| int n_procs = 8; | ||
| ucc_job_env_t env = {{"UCC_CL_BASIC_TUNE", "inf"}, | ||
| {"UCC_TL_UCP_TUNE", "allreduce:0-inf:@ring"}}; | ||
| UccJob job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL, env); | ||
| UccTeam_h team = job.create_team(n_procs); | ||
| int repeat = 3; | ||
| UccCollCtxVec ctxs; | ||
| std::vector<ucc_memory_type_t> mt = {UCC_MEMORY_TYPE_HOST}; | ||
|
|
||
| if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier? | ||
| mt.push_back(UCC_MEMORY_TYPE_CUDA); | ||
| } | ||
|
|
||
| for (auto count : {65536, 123567}) { | ||
| for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { | ||
| for (auto m : mt) { | ||
| SET_MEM_TYPE(m); | ||
| this->set_inplace(inplace); | ||
| this->data_init(n_procs, TypeParam::dt, count, ctxs, true); | ||
| UccReq req(team, ctxs); | ||
|
|
||
| for (auto i = 0; i < repeat; i++) { | ||
| req.start(); | ||
| req.wait(); | ||
| EXPECT_EQ(true, this->data_validate(ctxs)); | ||
| this->reset(ctxs); | ||
| } | ||
| this->data_fini(ctxs); | ||
| } | ||
| } | ||
| SET_MEM_TYPE(UCC_MEMORY_TYPE_HOST); | ||
| this->set_inplace(TEST_NO_INPLACE); | ||
| // Use a larger buffer for persistent test | ||
| size_t count = 1024; |
There was a problem hiding this comment.
ring_persistent does not cover the inplace code path
allreduce_ring_start has two distinct branches: when UCC_IS_INPLACE is true it skips the ucc_mc_memcpy, and when false it performs the copy. The ring_persistent test hard-codes this->set_inplace(TEST_NO_INPLACE), so the inplace branch is never exercised here.
While the basic ring test does cover TEST_INPLACE, ring_persistent specifically validates correct behaviour across multiple iterations of the same request — a scenario where a stale inplace buffer or a mis-handled inplace flag could silently produce wrong results only on later iterations. Consider adding a second loop or a separate ring_persistent_inplace variant:
// also test inplace persistent
this->set_inplace(TEST_INPLACE);
this->data_init(n_procs, TypeParam::dt, count, ctxs, true);
UccReq req_ip(team, ctxs);
for (int i = 0; i < 5; i++) {
req_ip.start();
req_ip.wait();
EXPECT_EQ(true, this->data_validate(ctxs));
this->reset(ctxs);
}
this->data_fini(ctxs);
What
This a reproduction of PR #1082 with some changes. The original algorithm was an implementation of a ring-based reduce-scatter + allgather algorithm to perform allreduce. This is the same but has been converted to a schedule-based approach to reuse reduce-scatter and allgather ring algorithms. This improves performance compared to the original approach.
Allreduce Performance Comparison
Configuration: Thor cluster, 16 Nodes, 32 PPN (512 processes total)
See attached graphs for visual graphs of the table above.
allreduce_comparison.pdf
allreduce_comparison_large.pdf