Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 162 additions & 51 deletions src/components/tl/ucp/tl_ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@

#include "tl_ucp.h"
#include "tl_ucp_ep.h"
#include "utils/ucc_time.h"

//NOLINTNEXTLINE
// NOLINTNEXTLINE
static void ucc_tl_ucp_err_handler(void *arg, ucp_ep_h ep, ucs_status_t status)
{
/* In case we don't have OOB barrier, errors are expected.
* This cb will suppress UCX from raising errors*/
;
}

static inline ucc_status_t ucc_tl_ucp_connect_ep(ucc_tl_ucp_context_t *ctx,
int is_service, ucp_ep_h *ep,
void *ucp_address)
static inline ucc_status_t ucc_tl_ucp_connect_ep(
ucc_tl_ucp_context_t *ctx, int is_service, ucp_ep_h *ep, void *ucp_address)
{
ucp_worker_h worker =
(is_service) ? ctx->service_worker.ucp_worker : ctx->worker.ucp_worker;
ucp_worker_h worker = (is_service) ? ctx->service_worker.ucp_worker
: ctx->worker.ucp_worker;
ucp_ep_params_t ep_params;
ucs_status_t status;
if (*ep) {
Expand All @@ -34,28 +34,33 @@ static inline ucc_status_t ucc_tl_ucp_connect_ep(ucc_tl_ucp_context_t *ctx,
ep_params.err_mode = UCP_ERR_HANDLING_MODE_PEER;
ep_params.err_handler.cb = ucc_tl_ucp_err_handler;
ep_params.err_handler.arg = NULL;
ep_params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER;
ep_params.field_mask |= UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE |
UCP_EP_PARAM_FIELD_ERR_HANDLER;
}
status = ucp_ep_create(worker, &ep_params, ep);

if (ucc_unlikely(UCS_OK != status)) {
tl_error(ctx->super.super.lib, "ucp returned connect error: %s",
ucs_status_string(status));
tl_error(
ctx->super.super.lib,
"ucp returned connect error: %s",
ucs_status_string(status));
return ucs_status_to_ucc_status(status);
}
return UCC_OK;
}

ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
ucc_rank_t core_rank, ucp_ep_h *ep)
ucc_status_t ucc_tl_ucp_connect_team_ep(
ucc_tl_ucp_team_t *team, ucc_rank_t core_rank, ucp_ep_h *ep)
{
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
int use_service_worker = USE_SERVICE_WORKER(team);
void *addr;

addr = ucc_get_team_ep_addr(UCC_TL_CORE_CTX(team), UCC_TL_CORE_TEAM(team),
core_rank, ucc_tl_ucp.super.super.id);
addr = ucc_get_team_ep_addr(
UCC_TL_CORE_CTX(team),
UCC_TL_CORE_TEAM(team),
core_rank,
ucc_tl_ucp.super.super.id);
addr = use_service_worker ? TL_UCP_EP_ADDR_WORKER_SERVICE(addr)
: TL_UCP_EP_ADDR_WORKER(addr);

Expand All @@ -65,8 +70,8 @@ ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
/* Finds next non-NULL ep in the storage and returns that handle
for closure. In case of "hash" storage it pops the item,
in case of "array" sets it to NULL */
static inline ucp_ep_h get_next_ep_to_close(ucc_tl_ucp_worker_t * worker,
ucc_tl_ucp_context_t *ctx, int *i)
static inline ucp_ep_h get_next_ep_to_close(
ucc_tl_ucp_worker_t *worker, ucc_tl_ucp_context_t *ctx, int *i)
{
ucp_ep_h ep = NULL;
ucc_rank_t size;
Expand All @@ -84,39 +89,145 @@ static inline ucp_ep_h get_next_ep_to_close(ucc_tl_ucp_worker_t * worker,
return ep;
}

void ucc_tl_ucp_close_eps(ucc_tl_ucp_worker_t * worker,
ucc_tl_ucp_context_t *ctx)
#define UCC_TL_UCP_EP_CLOSE_TIMEOUT 60.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout constant is hardcoded as a compile-time #define with no corresponding config registration or environment variable. Other UCC parameters are exposed through the standard UCC config mechanism so users can tune them at run time without recompilation.

On large clusters, 60 seconds may be either too generous (delaying shutdown) or too restrictive (e.g., on high-latency interconnects under load). Consider registering this as a configurable parameter so it can be overridden via environment variable or config at run time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with greptile here


void ucc_tl_ucp_close_eps(
ucc_tl_ucp_worker_t *worker, ucc_tl_ucp_context_t *ctx)
{
int i = 0;
ucp_ep_h ep;
ucs_status_t status;
ucs_status_ptr_t close_req;
ucp_request_param_t param;

param.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
param.flags = 0; // 0 means FLUSH
ep = get_next_ep_to_close(worker, ctx, &i);
while (ep) {
close_req = ucp_ep_close_nbx(ep, &param);

if (UCS_PTR_IS_PTR(close_req)) {
do {
ucp_worker_progress(ctx->worker.ucp_worker);
if (ctx->cfg.service_worker != 0) {
ucp_worker_progress(ctx->service_worker.ucp_worker);
}
status = ucp_request_check_status(close_req);
} while (status == UCS_INPROGRESS);
ucp_request_free(close_req);
} else {
status = UCS_PTR_STATUS(close_req);
}
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(ctx->super.super.lib,
"error during ucp ep close, ep %p, status %s",
ep, ucs_status_string(status));
}
ep = get_next_ep_to_close(worker, ctx, &i);
}
int i = 0;
int n_reqs = 0;
int timed_out = 0;
int n_inflight;
int j;
ucp_ep_h ep;
ucs_status_t status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable is declared without an initializer. In the sequential-fallback path, if timed_out is already 1 on entry to an iteration, the do/while wait-loop is skipped (line 135–150), so status is never assigned before the guard at line 156. While the guard at line 156 if (!timed_out) prevents use of uninitialized status, static analyzers can still flag this as a potential issue. Initializing at declaration removes the ambiguity:

Suggested change
ucs_status_t status;
ucs_status_t status = UCS_OK;

ucs_status_ptr_t close_req;
ucp_request_param_t param;
size_t max_eps;
ucs_status_ptr_t *reqs;
double deadline;

max_eps = worker->eps
? (size_t)ctx->super.super.ucc_context->params.oob.n_oob_eps
: kh_size(worker->ep_hash);
Comment on lines +110 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_eps over-counts for array-based worker storage

When worker->eps is non-NULL (OOB/array mode), max_eps is set to n_oob_eps — the total number of context-level OOB endpoints. This is the capacity of the array, not the number of actually connected endpoints. In a large cluster where only a subset of processes have exchanged addresses, most slots in worker->eps will be NULL, so n_reqs will be well below max_eps, but the reqs allocation will be sized for all n_oob_eps entries.

On very large jobs (e.g., 100k-process runs) where a single context closes only a handful of endpoints, this over-allocates a potentially large array during teardown — at precisely the time when memory pressure may already be elevated.

Consider counting actual non-NULL entries in worker->eps before allocating, or using a smaller initial size with reallocation, to avoid an unnecessary large allocation:

if (worker->eps) {
    size_t n_oob = ctx->super.super.ucc_context->params.oob.n_oob_eps;
    for (size_t k = 0; k < n_oob; k++) {
        if (worker->eps[k]) max_eps++;
    }
} else {
    max_eps = kh_size(worker->ep_hash);
}

if (max_eps == 0) {
return;
}

/* Without OOB, peers cannot coordinate graceful shutdown and flush may
hang indefinitely on unreachable peers. Use force-close to avoid
blocking. With OOB, a barrier after close ensures all peers are
reachable, so graceful flush is safe. */
param.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS;
param.flags = UCC_TL_CTX_HAS_OOB(ctx) ? 0 : UCP_EP_CLOSE_FLAG_FORCE;

reqs = (ucs_status_ptr_t *)ucc_calloc(max_eps, sizeof(*reqs), "close_reqs");
if (!reqs) {
tl_warn(
ctx->super.super.lib,
"failed to allocate close_reqs, falling back to sequential "
"close");
deadline = ucc_get_time() + UCC_TL_UCP_EP_CLOSE_TIMEOUT;
ep = get_next_ep_to_close(worker, ctx, &i);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability, please consider moving the sequential fallback to its own function

while (ep) {
close_req = ucp_ep_close_nbx(ep, &param);
if (UCS_PTR_IS_PTR(close_req)) {
if (!timed_out) {
do {
if (ucc_unlikely(ucc_get_time() > deadline)) {
tl_warn(
ctx->super.super.lib,
"ep close timed out in sequential "
"fallback");
timed_out = 1;
Comment on lines +138 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback timeout warning omits remaining-endpoint count

The main batch path logs "ep close timed out, %d requests still in-flight" with a concrete count of how many requests are pending. The sequential fallback just says "ep close timed out in sequential fallback" with no count. After the timeout fires, the loop continues iterating remaining endpoints, so an approximate count of those not yet waited-on would help operators diagnose how many endpoints were left unacknowledged:

tl_warn(
    ctx->super.super.lib,
    "ep close timed out in sequential fallback, remaining eps not waited");

Or, track a counter of skipped endpoints and include it in the message, matching the verbosity of the main path.

break;
}
ucp_worker_progress(ctx->worker.ucp_worker);
if (ctx->cfg.service_worker != 0) {
ucp_worker_progress(ctx->service_worker.ucp_worker);
}
status = ucp_request_check_status(close_req);
} while (status == UCS_INPROGRESS);
}
ucp_request_free(close_req);
} else {
status = UCS_PTR_STATUS(close_req);
}
if (!timed_out) {
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(
ctx->super.super.lib,
"error during ucp ep close, ep %p, status %s",
ep,
ucs_status_string(status));
}
}
ep = get_next_ep_to_close(worker, ctx, &i);
}
return;
}

ep = get_next_ep_to_close(worker, ctx, &i);
while (ep) {
close_req = ucp_ep_close_nbx(ep, &param);
if (UCS_PTR_IS_PTR(close_req)) {
reqs[n_reqs++] = close_req;
} else {
status = UCS_PTR_STATUS(close_req);
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(
ctx->super.super.lib,
"error during ucp ep close, ep %p, status %s",
ep,
ucs_status_string(status));
}
}
ep = get_next_ep_to_close(worker, ctx, &i);
}

n_inflight = n_reqs;
deadline = ucc_get_time() + UCC_TL_UCP_EP_CLOSE_TIMEOUT;
while (n_inflight > 0) {
if (ucc_unlikely(ucc_get_time() > deadline)) {
tl_warn(
ctx->super.super.lib,
"ep close timed out, %d requests still in-flight",
n_inflight);
break;
}
ucp_worker_progress(ctx->worker.ucp_worker);
if (ctx->cfg.service_worker != 0) {
ucp_worker_progress(ctx->service_worker.ucp_worker);
}
n_inflight = 0;
for (j = 0; j < n_reqs; j++) {
if (!reqs[j]) {
continue;
}
status = ucp_request_check_status(reqs[j]);
if (status != UCS_INPROGRESS) {
ucc_assert(status <= UCS_OK);
if (status != UCS_OK) {
tl_error(
ctx->super.super.lib,
"error during ucp ep close, status %s",
ucs_status_string(status));
}
ucp_request_free(reqs[j]);
reqs[j] = NULL;
} else {
n_inflight++;
}
}
}

for (j = 0; j < n_reqs; j++) {
if (reqs[j]) {
ucp_request_free(reqs[j]);
}
}

ucc_free(reqs);
}
4 changes: 2 additions & 2 deletions src/components/tl/ucp/tl_ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ typedef struct ucc_tl_ucp_team ucc_tl_ucp_team_t;
ucc_status_t ucc_tl_ucp_connect_team_ep(ucc_tl_ucp_team_t *team,
ucc_rank_t team_rank, ucp_ep_h *ep);

void ucc_tl_ucp_close_eps(ucc_tl_ucp_worker_t * worker,
ucc_tl_ucp_context_t *ctx);
void ucc_tl_ucp_close_eps(
ucc_tl_ucp_worker_t *worker, ucc_tl_ucp_context_t *ctx);
Comment on lines +44 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The continuation line is indented with 12 leading spaces, which does not align with the opening parenthesis and is inconsistent with the style used for the function declaration above it (lines 41–42). For consistency with the surrounding code style:

Suggested change
void ucc_tl_ucp_close_eps(
ucc_tl_ucp_worker_t *worker, ucc_tl_ucp_context_t *ctx);
void ucc_tl_ucp_close_eps(ucc_tl_ucp_worker_t *worker,
ucc_tl_ucp_context_t *ctx);

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


static inline ucc_context_addr_header_t *
ucc_tl_ucp_get_team_ep_header(ucc_tl_ucp_team_t *team, ucc_rank_t core_rank)
Expand Down