Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
af63614
UCP/RMA: Add RNDV PUT protocol
tvegas1 May 21, 2026
0048063
UCP/RMA: Add RNDV GET protocol
tvegas1 May 22, 2026
cc950cb
UCP/RMA: Add RNDV GET push protocol
tvegas1 May 22, 2026
0e3b87b
UCP/RMA: Cleanup code
tvegas1 May 22, 2026
316b334
UCP/RMA: Merge common code
tvegas1 May 22, 2026
309dcdd
UCP/RMA: Keep only push-based RNDV GET
tvegas1 May 22, 2026
7d8a0fe
UCP/RMA: Reuse RTS for RMA case
tvegas1 May 22, 2026
2eb3d8e
UCP/RMA: Add GTEST for RMA rendezvous
tvegas1 May 22, 2026
22a4fc1
UCP/RMA: Block flush when Get RMA rendezvous is in progress
tvegas1 May 23, 2026
44c2a73
UCP/RMA: Get RMA rendezvous waits for endpoint
tvegas1 May 24, 2026
7bf40d1
UCP/RMA: Put rendezvous blocks flush
tvegas1 May 24, 2026
33859eb
UCP/RMA: Fix build failure
tvegas1 May 24, 2026
9354481
UCP/RMA/RNDV: Flush in RNDV code
tvegas1 May 29, 2026
4c0fe95
UCP/RMA/RNDV: Fix static analysis failure
tvegas1 Jun 1, 2026
9a0da0d
Merge remote-tracking branch 'upstream/master' into rma_rndv
tvegas1 Jun 1, 2026
9be1795
UCP/PROTO: Update maximum number of protocols and DT support
tvegas1 Jun 1, 2026
38d65cc
UCP/RMA/RNDV: GET operations supports all rendezvous schemes
tvegas1 Jun 8, 2026
1961abc
UCP/RMA/RNDV: Address review comments remote_mem_info
tvegas1 Jun 10, 2026
7142147
UCP/RMA/RNDV: Address review comments: why needed
tvegas1 Jun 10, 2026
98e5e80
UCP/RMA/RNDV: Address review comments: cleanups
tvegas1 Jun 10, 2026
749beeb
UCP/RMA/RNDV: Address review comments: typos
tvegas1 Jun 10, 2026
c243fd7
UCP/RMA/RNDV: Address review comments: revert op_id_flags
tvegas1 Jun 10, 2026
2892bb5
UCP/RMA/RNDV: Address review comments: common dt_iter init
tvegas1 Jun 10, 2026
9ce21f1
UCP/RMA/RNDV: Address review comments: simpler put rndv init
tvegas1 Jun 10, 2026
763e8a6
UCP/RMA/RNDV: Address review comments: merge rma query functions
tvegas1 Jun 10, 2026
005bcdb
UCP/RMA/RNDV: Address review comments: typo
tvegas1 Jun 10, 2026
91c92da
Merge remote-tracking branch 'upstream/master' into rma_rndv
tvegas1 Jun 12, 2026
da84568
UCP/RMA/RNDV: Fix GET protocol selection
tvegas1 Jun 12, 2026
9931da3
UCP/RMA/RNDV: Fix GET protocol selection
tvegas1 Jun 12, 2026
99bd273
GTEST/MOCK: Fix cfg index lookup
tvegas1 Jun 12, 2026
c408474
UCP/RMA/RNDV: Improve proving
tvegas1 Jun 12, 2026
9353ebc
UCP/PROTO: Fix coverity
tvegas1 Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ noinst_HEADERS = \
proto/proto.h \
rma/rma.h \
rma/rma.inl \
rma/rma_rndv.h \
rndv/proto_rndv.h \
rndv/proto_rndv.inl \
rndv/rndv_mtype.inl \
Expand Down Expand Up @@ -141,6 +142,7 @@ libucp_la_SOURCES = \
rma/get_offload.c \
rma/put_am.c \
rma/put_offload.c \
rma/rma_rndv.c \
rma/rma_send.c \
rma/rma_sw.c \
rma/flush.c \
Expand Down
6 changes: 6 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,12 @@ void ucp_ep_req_purge(ucp_ep_h ucp_ep, ucp_request_t *req,
}

ucp_request_put(req);
} else if (req->flags & UCP_REQUEST_FLAG_RNDV_SEND_INTERNAL) {
ucs_assert(req->send.ep == ucp_ep);

ucp_datatype_iter_cleanup(&req->send.state.dt_iter, 1,
UCP_DT_MASK_ALL);
ucp_request_complete_send(req, status);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to cleanup the nested protocol too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed to handle the cleanup using the proto abort.

} else if (req->send.uct.func == ucp_amo_sw_proto.progress_fetch) {
/* Currently we don't support UCP EP request purging for proto mode */
ucs_assert(!ucp_ep->worker->context->config.ext.proto_enable);
Expand Down
7 changes: 6 additions & 1 deletion src/ucp/core/ucp_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ static const char *ucp_request_flag_names[] = {
[ucs_ilog2(UCP_REQUEST_FLAG_RECV_TAG)] = "rcv_tag",
[ucs_ilog2(UCP_REQUEST_FLAG_RKEY_INUSE)] = "rk_use",
[ucs_ilog2(UCP_REQUEST_FLAG_USER_HEADER_COPIED)] = "hdr_copy",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_RECV_INTERNAL)] = "rndv_rcv_int",

#if UCS_ENABLE_ASSERT
[ucs_ilog2(UCP_REQUEST_FLAG_STREAM_RECV)] = "strm_rcv",
[ucs_ilog2(UCP_REQUEST_DEBUG_FLAG_EXTERNAL)] = "extrn",
[ucs_ilog2(UCP_REQUEST_FLAG_SUPER_VALID)] = "spr_vld",
#endif
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_SEND_INTERNAL)] = "rndv_snd_int",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_RTR_REQ)] = "rndv_rtr_req",
[ucs_ilog2(UCP_REQUEST_FLAG_RNDV_FLUSH)] = "rndv_flush",
};

static ucs_memory_type_t ucp_request_get_mem_type(ucp_request_t *req)
Expand All @@ -59,7 +63,8 @@ static ucs_memory_type_t ucp_request_get_mem_type(ucp_request_t *req)
} else if (req->flags & (UCP_REQUEST_FLAG_SEND_AM | UCP_REQUEST_FLAG_SEND_TAG)) {
return req->send.mem_type;
} else if (req->flags &
(UCP_REQUEST_FLAG_RECV_AM | UCP_REQUEST_FLAG_RECV_TAG)) {
(UCP_REQUEST_FLAG_RECV_AM | UCP_REQUEST_FLAG_RECV_TAG |
UCP_REQUEST_FLAG_RNDV_RECV_INTERNAL)) {
return req->recv.dt_iter.mem_info.type;
} else {
return UCS_MEMORY_TYPE_UNKNOWN;
Expand Down
22 changes: 18 additions & 4 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,19 @@ enum {
UCP_REQUEST_FLAG_USER_HEADER_COPIED = UCS_BIT(19),
UCP_REQUEST_FLAG_USAGE_TRACKED = UCS_BIT(20),
UCP_REQUEST_FLAG_FENCE_REQUIRED = UCS_BIT(21),
UCP_REQUEST_FLAG_RNDV_RECV_INTERNAL = UCS_BIT(22),
#if UCS_ENABLE_ASSERT
UCP_REQUEST_FLAG_STREAM_RECV = UCS_BIT(22),
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(23),
UCP_REQUEST_FLAG_SUPER_VALID = UCS_BIT(24),
UCP_REQUEST_FLAG_STREAM_RECV = UCS_BIT(23),
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = UCS_BIT(24),
UCP_REQUEST_FLAG_SUPER_VALID = UCS_BIT(25),
#else
UCP_REQUEST_FLAG_STREAM_RECV = 0,
UCP_REQUEST_DEBUG_FLAG_EXTERNAL = 0,
UCP_REQUEST_FLAG_SUPER_VALID = 0
UCP_REQUEST_FLAG_SUPER_VALID = 0,
#endif
UCP_REQUEST_FLAG_RNDV_SEND_INTERNAL = UCS_BIT(26),
UCP_REQUEST_FLAG_RNDV_RTR_REQ = UCS_BIT(27),
UCP_REQUEST_FLAG_RNDV_FLUSH = UCS_BIT(28)
};


Expand Down Expand Up @@ -261,6 +265,9 @@ struct ucp_request {
/* Remote buffer address for get/put operation */
uint64_t remote_address;

/* Remote buffer memory info for RTR_REQ */
ucp_memory_info_t remote_mem_info;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need it here or can create a separate struct in the union below?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed as we should be able to use rkey for that, even with fragment requests.

/* Key for remote buffer operation */
ucp_rkey_h rkey;

Expand Down Expand Up @@ -477,6 +484,13 @@ struct ucp_request {
size_t length; /* Completion info to fill */
} stream;

struct {
/* Remote endpoint ID used to send internal completions */
uint64_t ep_id;
/* Completion callback for internal RNDV receives */
ucp_request_callback_t complete_cb;
} rndv;

struct {
ucp_am_recv_data_nbx_callback_t cb; /* Completion callback */
ucp_recv_desc_t *desc; /* Receive desc */
Expand Down
11 changes: 11 additions & 0 deletions src/ucp/core/ucp_request.inl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,17 @@ ucp_request_put(ucp_request_t *req)
ucs_mpool_put_inline(req);
}

static UCS_F_ALWAYS_INLINE void
ucp_request_rndv_flush_complete(ucp_request_t *req)
{
/* Complete the extra flush op held by a RNDV wrapper until the RNDV data
* path completes. */
if (ucs_unlikely(req->flags & UCP_REQUEST_FLAG_RNDV_FLUSH)) {
req->flags &= ~UCP_REQUEST_FLAG_RNDV_FLUSH;
ucp_worker_flush_ops_count_add(req->send.ep->worker, -1);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_request_complete_send(ucp_request_t *req, ucs_status_t status)
{
Expand Down
2 changes: 2 additions & 0 deletions src/ucp/proto/proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
_macro(ucp_get_am_bcopy_proto) \
_macro(ucp_get_offload_bcopy_proto) \
_macro(ucp_get_offload_zcopy_proto) \
_macro(ucp_get_rndv_proto) \
_macro(ucp_put_am_bcopy_proto) \
_macro(ucp_put_offload_short_proto) \
_macro(ucp_put_offload_bcopy_proto) \
_macro(ucp_put_offload_zcopy_proto) \
_macro(ucp_put_rndv_proto) \
_macro(ucp_eager_bcopy_multi_proto) \
_macro(ucp_eager_sync_bcopy_multi_proto) \
_macro(ucp_eager_zcopy_multi_proto) \
Expand Down
13 changes: 12 additions & 1 deletion src/ucp/proto/proto_common.inl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ ucp_proto_request_zcopy_clean(ucp_request_t *req, unsigned dt_mask)
}

static UCS_F_ALWAYS_INLINE void
ucp_proto_request_zcopy_complete(ucp_request_t *req, ucs_status_t status)
ucp_proto_request_zcopy_complete_cb(ucp_request_t *req, ucs_status_t status,
ucp_request_callback_t complete_cb)
{
ucp_datatype_iter_cleanup(&req->send.state.dt_iter, 1,
UCP_DT_MASK_CONTIG_IOV);
Expand All @@ -108,10 +109,20 @@ ucp_proto_request_zcopy_complete(ucp_request_t *req, ucs_status_t status)
!(req->send.ep->flags & UCP_EP_FLAG_FAILED)) {
ucp_proto_request_restart(req);
} else {
if (complete_cb != NULL) {
complete_cb(req);
}

ucp_request_complete_send(req, status);
}
}

static UCS_F_ALWAYS_INLINE void
ucp_proto_request_zcopy_complete(ucp_request_t *req, ucs_status_t status)
{
ucp_proto_request_zcopy_complete_cb(req, status, NULL);
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_proto_request_zcopy_complete_success(ucp_request_t *req)
{
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/proto/proto_debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ void ucp_proto_select_param_str(const ucp_proto_select_param_t *select_param,
[ucs_ilog2(UCP_OP_ATTR_FLAG_MULTI_SEND)] = "multi",
};
static const char *rndv_flag_names[] = {
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_PPLN_FRAG)] = "frag"
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_PPLN_FRAG)] = "frag",
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_RNDV_PUSH)] = "push"
};
static const char *am_flag_names[] = {
[ucs_ilog2(UCP_PROTO_SELECT_OP_FLAG_AM_EAGER)] = "egr",
Expand Down
13 changes: 11 additions & 2 deletions src/ucp/proto/proto_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,18 @@ ucp_proto_select_lookup_slow(ucp_worker_h worker,
return NULL;
}

/* add to hash after initializing the temp element, since calling
* ucp_proto_select_elem_init() can recursively modify the hash
/* Add to hash after initializing the temp element, since calling
* ucp_proto_select_elem_init() can recursively modify the hash.
* Re-check the key because recursive lookup may have initialized this
* exact selection already.
*/
khiter = kh_get(ucp_proto_select_hash, proto_select->hash, key.u64);
Comment thread
brminich marked this conversation as resolved.
if (khiter != kh_end(proto_select->hash)) {
ucp_proto_select_elem_cleanup(&tmp_select_elem);
select_elem = &kh_value(proto_select->hash, khiter);
goto out;
}

khiter = kh_put(ucp_proto_select_hash, proto_select->hash, key.u64,
&khret);
ucs_assert_always(khret == UCS_KH_PUT_BUCKET_EMPTY);
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/proto/proto_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
* Relevant for UCP_OP_ID_RNDV_SEND and UCP_OP_ID_RNDV_RECV. */
#define UCP_PROTO_SELECT_OP_FLAG_PPLN_FRAG (UCP_PROTO_SELECT_OP_FLAGS_BASE << 1)

/* Select only push-based rendezvous receive protocols. */
#define UCP_PROTO_SELECT_OP_FLAG_RNDV_PUSH (UCP_PROTO_SELECT_OP_FLAGS_BASE << 3)


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can avoid this change

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored

/* Select eager/rendezvous protocol for Active Message sends.
* Relevant for UCP_OP_ID_AM_SEND and UCP_OP_ID_AM_SEND_REPLY. */
Expand Down
Loading
Loading