Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 75 additions & 17 deletions src/components/tl/ucp/alltoall/alltoall_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "alltoall.h"
#include "core/ucc_progress_queue.h"
#include "utils/ucc_math.h"
#include "tl_ucp_coll.h"
#include "tl_ucp_sendrecv.h"

#define CONGESTION_THRESHOLD 8
Expand Down Expand Up @@ -100,13 +101,29 @@ void ucc_tl_ucp_alltoall_onesided_get_progress(ucc_coll_task_t *ctask)
ucc_rank_t peer = (grank + *posted + 1) % gsize;
ucc_mem_map_mem_h src_memh;
size_t nelems;
ucc_status_t status;

nelems = TASK_ARGS(task).src.info.count;
nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype);
src_memh = (TASK_ARGS(task).flags & UCC_COLL_ARGS_FLAG_DST_MEMH_GLOBAL)
if (task->flags & UCC_TL_UCP_TASK_FLAG_USE_DYN_SEG) {
status = ucc_tl_ucp_test_dynamic_segment(task);
if (status == UCC_INPROGRESS) {
return;
}
if (UCC_OK != status) {
task->super.status = status;
tl_error(UCC_TL_TEAM_LIB(team),
"failed to exchange dynamic segments");
return;
}
src_memh = task->dynamic_segments.dst_local;
dst_memh = (ucc_mem_map_mem_h *)task->dynamic_segments.src_global;
} else {
src_memh = (TASK_ARGS(task).flags & UCC_COLL_ARGS_FLAG_DST_MEMH_GLOBAL)
? TASK_ARGS(task).dst_memh.global_memh[grank]
: TASK_ARGS(task).dst_memh.local_memh;
}

nelems = TASK_ARGS(task).src.info.count;
nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype);
for (; *posted < gsize; peer = (peer + 1) % gsize) {
UCPCHECK_GOTO(ucc_tl_ucp_get_nb(PTR_OFFSET(dest, peer * nelems),
PTR_OFFSET(src, grank * nelems),
Expand All @@ -122,7 +139,10 @@ void ucc_tl_ucp_alltoall_onesided_get_progress(ucc_coll_task_t *ctask)

alltoall_onesided_wait_completion(task, npolls);
out:
return;
if (task->super.status != UCC_INPROGRESS &&
(task->flags & UCC_TL_UCP_TASK_FLAG_USE_DYN_SEG)) {
task->super.status = ucc_tl_ucp_coll_dynamic_segment_finalize(task);
}
Comment on lines +148 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: dynamic segment finalization happens after completion/error but the finalize call's error status overwrites the original task status. If the task completed successfully (UCC_OK) but finalization fails, the error is propagated; however if the task already failed, the finalization error replaces it, losing the original failure reason. Should finalization errors be logged separately while preserving the original task failure status?

}

void ucc_tl_ucp_alltoall_onesided_put_progress(ucc_coll_task_t *ctask)
Expand All @@ -142,12 +162,28 @@ void ucc_tl_ucp_alltoall_onesided_put_progress(ucc_coll_task_t *ctask)
ucc_rank_t peer = (grank + *posted + 1) % gsize;
ucc_mem_map_mem_h src_memh;
size_t nelems;
ucc_status_t status;

nelems = TASK_ARGS(task).src.info.count;
nelems = (nelems / gsize) * ucc_dt_size(TASK_ARGS(task).src.info.datatype);
src_memh = (TASK_ARGS(task).flags & UCC_COLL_ARGS_FLAG_SRC_MEMH_GLOBAL)
if (task->flags & UCC_TL_UCP_TASK_FLAG_USE_DYN_SEG) {
status = ucc_tl_ucp_test_dynamic_segment(task);
if (status == UCC_INPROGRESS) {
return;
}
if (UCC_OK != status) {
task->super.status = status;
tl_error(UCC_TL_TEAM_LIB(team),
"failed to exchange dynamic segments");
return;
}
src_memh = task->dynamic_segments.src_local;
dst_memh = (ucc_mem_map_mem_h *)task->dynamic_segments.dst_global;
} else {
src_memh = (TASK_ARGS(task).flags & UCC_COLL_ARGS_FLAG_SRC_MEMH_GLOBAL)
? TASK_ARGS(task).src_memh.global_memh[grank]
: TASK_ARGS(task).src_memh.local_memh;
}

for (; *posted < gsize; peer = (peer + 1) % gsize) {
UCPCHECK_GOTO(
Expand All @@ -165,15 +201,30 @@ void ucc_tl_ucp_alltoall_onesided_put_progress(ucc_coll_task_t *ctask)

alltoall_onesided_wait_completion(task, npolls);
out:
return;
if (task->super.status != UCC_INPROGRESS &&
(task->flags & UCC_TL_UCP_TASK_FLAG_USE_DYN_SEG)) {
task->super.status = ucc_tl_ucp_coll_dynamic_segment_finalize(task);
}
Comment on lines +210 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: same issue as GET progress: finalization status overwrites task status

}

ucc_status_t ucc_tl_ucp_alltoall_onesided_start(ucc_coll_task_t *ctask)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(ctask, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_status_t status;

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
if (task->flags & UCC_TL_UCP_TASK_FLAG_USE_DYN_SEG) {
status = ucc_tl_ucp_coll_dynamic_segment_exchange_nb(task);
if (UCC_OK != status && UCC_INPROGRESS != status) {
task->super.status = status;
tl_error(UCC_TL_TEAM_LIB(team),
"failed to exchange dynamic segments");
return task->super.status;
}
}
Comment on lines +223 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: when dynamic segment exchange completes synchronously (returns UCC_OK), the function continues to line 223 and enqueues the task. However, the progress functions (GET/PUT) will call ucc_tl_ucp_test_dynamic_segment again at lines 106 and 164, which may not handle the already-completed case correctly. Does ucc_tl_ucp_test_dynamic_segment return UCC_OK immediately when called again after successful completion?


/* Start the onesided operations */
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

Expand All @@ -190,7 +241,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
};
size_t perc_bw =
UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.alltoall_onesided_percent_bw;
ucc_tl_ucp_alltoall_onesided_alg_t alg =
ucc_tl_ucp_onesided_alg_type alg =
UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.alltoall_onesided_alg;
ucc_tl_ucp_schedule_t *tl_schedule = NULL;
ucc_rank_t group_size = 1;
Expand All @@ -208,15 +259,6 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_sbgp_t *sbgp;

ALLTOALL_TASK_CHECK(coll_args->args, tl_team);
if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_FLAGS) ||
(coll_args->args.mask & UCC_COLL_ARGS_FIELD_FLAGS &&
(!(coll_args->args.flags &
UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS)))) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"non memory mapped buffers are not supported");
status = UCC_ERR_NOT_SUPPORTED;
return status;
}

if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_MEM_MAP_SRC_MEMH)) {
coll_args->args.src_memh.global_memh = NULL;
Expand All @@ -228,7 +270,6 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
return status;
}
}

if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_MEM_MAP_DST_MEMH)) {
coll_args->args.dst_memh.global_memh = NULL;
} else {
Expand All @@ -239,6 +280,8 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
return status;
}
}


status = ucc_tl_ucp_get_schedule(tl_team, coll_args,
(ucc_tl_ucp_schedule_t **)&tl_schedule);
if (ucc_unlikely(UCC_OK != status)) {
Expand All @@ -264,6 +307,20 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
task->super.finalize = ucc_tl_ucp_alltoall_onesided_finalize;
a2a_task = &task->super;

/* initialize dynamic segments */
if (alg == UCC_TL_UCP_ALLTOALL_ONESIDED_GET ||
(alg == UCC_TL_UCP_ALLTOALL_ONESIDED_AUTO &&
sbgp->group_size >= CONGESTION_THRESHOLD)) {
alg = UCC_TL_UCP_ALLTOALL_ONESIDED_GET;
}
status = ucc_tl_ucp_coll_dynamic_segment_init(&coll_args->args, alg, task);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to initialize dynamic segments");
ucc_tl_ucp_coll_finalize(&task->super);
goto out;
}
Comment on lines +328 to +336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: if dynamic segment initialization fails, the code jumps to out: but the task object has already been allocated (line296) and is never cleaned up, causing a memory leak


status = ucc_tl_ucp_coll_init(&barrier_coll_args, team, &barrier_task);
if (status != UCC_OK) {
goto out;
Expand Down Expand Up @@ -313,6 +370,7 @@ ucc_status_t ucc_tl_ucp_alltoall_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_task_subscribe_dep(a2a_task, barrier_task,
UCC_EVENT_COMPLETED);
*task_h = &schedule->super;

return status;
out:
if (tl_schedule) {
Expand Down
33 changes: 24 additions & 9 deletions src/components/tl/ucp/alltoallv/alltoallv_onesided.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "utils/ucc_math.h"
#include "tl_ucp_sendrecv.h"

void ucc_tl_ucp_alltoallv_onesided_progress(ucc_coll_task_t *ctask);

ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(ctask, ucc_tl_ucp_task_t);
Expand All @@ -32,6 +34,12 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)

ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

if (TASK_ARGS(task).mask & UCC_COLL_ARGS_FIELD_MEM_MAP_SRC_MEMH) {
if (TASK_ARGS(task).flags & UCC_COLL_ARGS_FLAG_SRC_MEMH_GLOBAL) {
src_memh = TASK_ARGS(task).src_memh.global_memh[grank];
}
}

/* perform a put to each member peer using the peer's index in the
* destination displacement. */
for (peer = (grank + 1) % gsize; task->onesided.put_posted < gsize;
Expand All @@ -43,8 +51,8 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
ucc_coll_args_get_displacement(&TASK_ARGS(task), d_disp, peer) *
rdt_size;
data_size =
ucc_coll_args_get_count(
&TASK_ARGS(task), TASK_ARGS(task).src.info_v.counts, peer) *
ucc_coll_args_get_count(&TASK_ARGS(task),
TASK_ARGS(task).src.info_v.counts, peer) *
sdt_size;

UCPCHECK_GOTO(ucc_tl_ucp_put_nb(PTR_OFFSET(src, sd_disp),
Expand All @@ -56,22 +64,22 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_start(ucc_coll_task_t *ctask)
dst_memh, team),
task, out);
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}

void ucc_tl_ucp_alltoallv_onesided_progress(ucc_coll_task_t *ctask)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(ctask, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);
long *pSync = TASK_ARGS(task).global_work_buffer;
ucc_tl_ucp_task_t *task = ucc_derived_of(ctask, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
long *pSync = TASK_ARGS(task).global_work_buffer;
ucc_rank_t gsize = UCC_TL_TEAM_SIZE(team);

if (ucc_tl_ucp_test_onesided(task, gsize) == UCC_INPROGRESS) {
return;
}

pSync[0] = 0;
task->super.status = UCC_OK;
}
Expand All @@ -81,8 +89,8 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_status_t status = UCC_OK;
ucc_tl_ucp_task_t *task;
ucc_status_t status;

ALLTOALLV_TASK_CHECK(coll_args->args, tl_team);
if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER)) {
Expand All @@ -104,13 +112,20 @@ ucc_status_t ucc_tl_ucp_alltoallv_onesided_init(ucc_base_coll_args_t *coll_args,
}
if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_MEM_MAP_DST_MEMH)) {
coll_args->args.dst_memh.global_memh = NULL;
} else {
if (!(coll_args->args.flags & UCC_COLL_ARGS_FLAG_DST_MEMH_GLOBAL)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"onesided alltoallv requires global memory handles for dst "
"buffers");
status = UCC_ERR_INVALID_PARAM;
goto out;
}
}
Comment on lines 110 to 123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: destination memory‑handle check (lines 106–116) now moved after the global work buffer check; source handle (lines 103–105) is still set to NULL if missing. Consider validating that the source memory handle flag is also set and is global when provided (similar to the destination logic) to avoid silent failures later. Is there a reason source memory handles are always accepted as local or missing, while destination handles require the global flag? Should source handles also be validated when present?


task = ucc_tl_ucp_init_task(coll_args, team);
*task_h = &task->super;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Missing NULL check for ucc_tl_ucp_init_task

ucc_tl_ucp_init_task can return NULL on OOM, but the next line immediately dereferences the result via *task_h = &task->super. This would cause a NULL pointer dereference and crash.

Notably, alltoall_onesided_init was updated in this same PR to include the NULL guard (see the alltoall init where if (ucc_unlikely(!task)) { status = UCC_ERR_NO_MEMORY; goto out; } was added), but the same fix was not applied here.

Suggested change
task = ucc_tl_ucp_init_task(coll_args, team);
*task_h = &task->super;
task = ucc_tl_ucp_init_task(coll_args, team);
if (ucc_unlikely(!task)) {
status = UCC_ERR_NO_MEMORY;
goto out;
}
*task_h = &task->super;

task->super.post = ucc_tl_ucp_alltoallv_onesided_start;
task->super.progress = ucc_tl_ucp_alltoallv_onesided_progress;
status = UCC_OK;
out:
return status;
}
8 changes: 4 additions & 4 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ static ucs_config_field_t ucc_tl_ucp_context_config_table[] = {
ucc_offsetof(ucc_tl_ucp_context_config_t, memtype_copy_enable),
UCC_CONFIG_TYPE_BOOL},

{"EXPORTED_MEMORY_HANDLE", "n",
"If set to yes, initialize UCP context with the exported memory handle "
"feature, which is useful for offload devices such as a DPU. Otherwise "
"disable the use of this feature.",
{"EXPORTED_MEMORY_HANDLE", "0",
"If set to 1, initialize UCP context with the exported memory handle "
"feature, which is useful for offload devices such as a DPU. Set to 0 "
"to disable this feature (default is 0).",
ucc_offsetof(ucc_tl_ucp_context_config_t, exported_memory_handle),
UCC_CONFIG_TYPE_BOOL},

Expand Down
23 changes: 18 additions & 5 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ typedef struct ucc_tl_ucp_iface {
/* Extern iface should follow the pattern: ucc_tl_<tl_name> */
extern ucc_tl_ucp_iface_t ucc_tl_ucp;

typedef enum ucc_tl_ucp_alltoall_onesided_alg_type {
typedef enum ucc_tl_ucp_onesided_alg_type {
UCC_TL_UCP_ALLTOALL_ONESIDED_PUT,
UCC_TL_UCP_ALLTOALL_ONESIDED_GET,
UCC_TL_UCP_ALLTOALL_ONESIDED_AUTO,
UCC_TL_UCP_ALLTOALL_ONESIDED_LAST
} ucc_tl_ucp_alltoall_onesided_alg_t;
} ucc_tl_ucp_onesided_alg_type;

typedef struct ucc_tl_ucp_lib_config {
ucc_tl_lib_config_t super;
Expand Down Expand Up @@ -88,7 +88,7 @@ typedef struct ucc_tl_ucp_lib_config {
ucc_ternary_auto_value_t use_topo;
int use_reordering;
uint32_t alltoall_onesided_percent_bw;
ucc_tl_ucp_alltoall_onesided_alg_t alltoall_onesided_alg;
ucc_tl_ucp_onesided_alg_type alltoall_onesided_alg;
} ucc_tl_ucp_lib_config_t;

typedef enum ucc_tl_ucp_local_copy_type {
Expand Down Expand Up @@ -168,8 +168,6 @@ typedef struct ucc_tl_ucp_team {
ucc_status_t status;
uint32_t seq_num;
ucc_tl_ucp_task_t *preconnect_task;
void * va_base[MAX_NR_SEGMENTS];
size_t base_length[MAX_NR_SEGMENTS];
ucc_tl_ucp_worker_t * worker;
ucc_tl_ucp_team_config_t cfg;
const char * tuning_str;
Expand Down Expand Up @@ -313,4 +311,19 @@ void ucc_tl_ucp_pre_register_mem(ucc_tl_ucp_team_t *team, void *addr,
ucc_status_t ucc_tl_ucp_ctx_remote_populate(ucc_tl_ucp_context_t *ctx,
ucc_mem_map_params_t map,
ucc_team_oob_coll_t oob);

ucc_status_t ucc_tl_ucp_mem_map(const ucc_base_context_t *context,
ucc_mem_map_mode_t mode,
ucc_mem_map_memh_t *memh,
ucc_mem_map_tl_t *tl_h);

ucc_status_t ucc_tl_ucp_memh_pack(const ucc_base_context_t *context,
ucc_mem_map_mode_t mode,
ucc_mem_map_tl_t *tl_h,
void **pack_buffer);

ucc_status_t ucc_tl_ucp_mem_unmap(const ucc_base_context_t *context,
ucc_mem_map_mode_t mode,
ucc_mem_map_tl_t *memh);

#endif
Loading