Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 96 additions & 49 deletions src/components/tl/ucp/tl_ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@
#include "tl_ucp.h"
#include "tl_ucp_ep.h"

//NOLINTNEXTLINE
// NOLINTNEXTLINE
static void ucc_tl_ucp_err_handler(void *arg, ucp_ep_h ep, ucs_status_t status)
{
/* In case we don't have OOB barrier, errors are expected.
* This cb will suppress UCX from raising errors*/
;
}

static inline ucc_status_t ucc_tl_ucp_connect_ep(ucc_tl_ucp_context_t *ctx,
int is_service, ucp_ep_h *ep,
void *ucp_address)
static inline ucc_status_t ucc_tl_ucp_connect_ep(
ucc_tl_ucp_context_t *ctx, int is_service, ucp_ep_h *ep, void *ucp_address)
{
ucp_worker_h worker =
(is_service) ? ctx->service_worker.ucp_worker : ctx->worker.ucp_worker;
ucp_worker_h worker = (is_service) ? ctx->service_worker.ucp_worker
: ctx->worker.ucp_worker;
ucp_ep_params_t ep_params;
ucs_status_t status;
if (*ep) {
Expand All @@ -34,28 +33,33 @@ static inline ucc_status_t ucc_tl_ucp_connect_ep(ucc_tl_ucp_context_t *ctx,
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = ucc_tl_ucp_err_handler;
ep_params.err_handler.arg = NULL;
ep_params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER;
ep_params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER;
}
status = ucp_ep_create(worker, &ep_params, ep);

if (ucc_unlikely(UCS_OK != status)) {
tl_error(ctx->super.super.lib, "ucp returned connect error: %s",
ucs_status_string(status));
tl_error(
ctx->super.super.lib,
"ucp returned connect error: %s",
ucs_status_string(status));
return ucs_status_to_ucc_status(status);
}
return UCC_OK;
}

ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
ucc_rank_t core_rank, ucp_ep_h *ep)
ucc_status_t ucc_tl_ucp_connect_team_ep(
ucc_tl_ucp_team_t *team, ucc_rank_t core_rank, ucp_ep_h *ep)
{
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
int use_service_worker = USE_SERVICE_WORKER(team);
void *addr;

addr = ucc_get_team_ep_addr(UCC_TL_CORE_CTX(team), UCC_TL_CORE_TEAM(team),
core_rank, ucc_tl_ucp.super.super.id);
addr = ucc_get_team_ep_addr(
UCC_TL_CORE_CTX(team),
UCC_TL_CORE_TEAM(team),
core_rank,
ucc_tl_ucp.super.super.id);
addr = use_service_worker ? TL_UCP_EP_ADDR_WORKER_SERVICE(addr)
: TL_UCP_EP_ADDR_WORKER(addr);

Expand All @@ -65,8 +69,8 @@ ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
/* Finds next non-NULL ep in the storage and returns that handle
for closure. In case of "hash" storage it pops the item,
in case of "array" sets it to NULL */
static inline ucp_ep_h get_next_ep_to_close(ucc_tl_ucp_worker_t * worker,
ucc_tl_ucp_context_t *ctx, int *i)
static inline ucp_ep_h get_next_ep_to_close(
ucc_tl_ucp_worker_t *worker, ucc_tl_ucp_context_t *ctx, int *i)
{
ucp_ep_h ep = NULL;
ucc_rank_t size;
Expand All @@ -84,39 +88,82 @@ static inline ucp_ep_h get_next_ep_to_close(ucc_tl_ucp_worker_t * worker,
return ep;
}

void ucc_tl_ucp_close_eps(ucc_tl_ucp_worker_t * worker,
ucc_tl_ucp_context_t *ctx)
void ucc_tl_ucp_close_eps(
ucc_tl_ucp_worker_t *worker, ucc_tl_ucp_context_t *ctx)
{
int i = 0;
ucp_ep_h ep;
ucs_status_t status;
ucs_status_ptr_t close_req;
ucp_request_param_t param;
int i = 0;
int n_reqs = 0;
int n_inflight;
int j;
ucp_ep_h ep;
ucs_status_t status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable is declared without an initializer. In the sequential-fallback path, if timed_out is already 1 on entry to an iteration, the do/while wait-loop is skipped (line 135–150), so status is never assigned before the guard at line 156. While the guard at line 156 if (!timed_out) prevents use of uninitialized status, static analyzers can still flag this as a potential issue. Initializing at declaration removes the ambiguity:

Suggested change
ucs_status_t status;
ucs_status_t status = UCS_OK;

ucs_status_ptr_t close_req;
ucp_request_param_t param;
size_t max_eps;
ucs_status_ptr_t *reqs;

param.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
param.flags = 0; // 0 means FLUSH
ep = get_next_ep_to_close(worker, ctx, &i);
while (ep) {
close_req = ucp_ep_close_nbx(ep, &param);
max_eps = worker->eps
? (size_t)ctx->super.super.ucc_context->params.oob.n_oob_eps
: kh_size(worker->ep_hash);
Comment on lines +110 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_eps over-counts for array-based worker storage

When worker->eps is non-NULL (OOB/array mode), max_eps is set to n_oob_eps — the total number of context-level OOB endpoints. This is the capacity of the array, not the number of actually connected endpoints. In a large cluster where only a subset of processes have exchanged addresses, most slots in worker->eps will be NULL, so n_reqs will be well below max_eps, but the reqs allocation will be sized for all n_oob_eps entries.

On very large jobs (e.g., 100k-process runs) where a single context closes only a handful of endpoints, this over-allocates a potentially large array during teardown — at precisely the time when memory pressure may already be elevated.

Consider counting actual non-NULL entries in worker->eps before allocating, or using a smaller initial size with reallocation, to avoid an unnecessary large allocation:

if (worker->eps) {
    size_t n_oob = ctx->super.super.ucc_context->params.oob.n_oob_eps;
    for (size_t k = 0; k < n_oob; k++) {
        if (worker->eps[k]) max_eps++;
    }
} else {
    max_eps = kh_size(worker->ep_hash);
}

if (max_eps == 0) {
return;
}
reqs = (ucs_status_ptr_t *)ucc_calloc(max_eps, sizeof(*reqs), "close_reqs");
if (!reqs) {
tl_error(
ctx->super.super.lib, "failed to allocate close requests array");
return;
}

/* Use graceful flush with OOB, force close otherwise */
param.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
param.flags = UCC_TL_CTX_HAS_OOB(ctx) ? 0 : UCP_EP_CLOSE_FLAG_FORCE;
ep = get_next_ep_to_close(worker, ctx, &i);
while (ep) {
close_req = ucp_ep_close_nbx(ep, &param);
if (UCS_PTR_IS_PTR(close_req)) {
reqs[n_reqs++] = close_req;
} else {
status = UCS_PTR_STATUS(close_req);
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(
ctx->super.super.lib,
"error during ucp ep close, ep %p, status %s",
ep,
ucs_status_string(status));
}
}
ep = get_next_ep_to_close(worker, ctx, &i);
}

n_inflight = n_reqs;
while (n_inflight > 0) {
ucp_worker_progress(ctx->worker.ucp_worker);
if (ctx->cfg.service_worker != 0) {
ucp_worker_progress(ctx->service_worker.ucp_worker);
}
n_inflight = 0;
for (j = 0; j < n_reqs; j++) {
if (!reqs[j]) {
continue;
}
status = ucp_request_check_status(reqs[j]);
if (status != UCS_INPROGRESS) {
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(
ctx->super.super.lib,
"error during ucp ep close, status %s",
ucs_status_string(status));
}
ucp_request_free(reqs[j]);
reqs[j] = NULL;
} else {
n_inflight++;
}
}
}

if (UCS_PTR_IS_PTR(close_req)) {
do {
ucp_worker_progress(ctx->worker.ucp_worker);
if (ctx->cfg.service_worker != 0) {
ucp_worker_progress(ctx->service_worker.ucp_worker);
}
status = ucp_request_check_status(close_req);
} while (status == UCS_INPROGRESS);
ucp_request_free(close_req);
} else {
status = UCS_PTR_STATUS(close_req);
}
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(ctx->super.super.lib,
"error during ucp ep close, ep %p, status %s",
ep, ucs_status_string(status));
}
ep = get_next_ep_to_close(worker, ctx, &i);
}
ucc_free(reqs);
}