-
Notifications
You must be signed in to change notification settings - Fork 91
IB host-no-atomic: GDRCopy + mlx5dv Data Direct for memory-consistent low-latency signaling
#753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 14 commits
e711b62
c881bc5
e227fdc
1e32e17
eafa6fb
3d8a2e7
a10aff5
1818709
5657e4a
0eae34c
4823583
403b2fb
305d157
b1f458e
6da12fa
7e4365f
b59196b
ba0451a
50f6a24
e26f8ab
7003fec
30b9891
b6ce0f2
d2efc2f
4afbf78
e40c72b
bed85b5
4d9acea
b693d1b
2b4adcc
b64536f
dcdd3fe
caeec75
b9609f8
41695ba
febdbf9
c4afbe1
04ebd9b
54e46ba
6c2bc8f
d0c709e
edda25d
2f02d38
2adf4a4
98b023a
22e5efb
2f27d7d
d88ee8d
11e27e2
25f31b4
75dfdd9
ac4d713
ac022c3
72407af
8effd97
fd7358d
67d1706
060982d
6b2f819
eb99a26
8c3a436
f4b8574
3b56b08
448ceb6
7ce841b
bbb9c10
60ff32c
00583da
c699b8a
284d913
75ac8be
e0c7ddb
c40a233
375bc13
bcb392f
ea1dd65
d6a6fa2
a9cf938
6647338
7a87c2c
cf505d7
757c0ec
e2a5be4
2a705f5
a38bd9d
e2a9692
2c4bab8
a937ce4
d66d7e4
5a65cc7
2297a3d
2756221
bff76d5
6082648
79a0149
0200532
80f554e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| # Find the GDRCopy libraries | ||
| # | ||
| # The following variables are optionally searched for defaults | ||
| # GDRCOPY_ROOT_DIR: Base directory where all GDRCopy components are found | ||
| # GDRCOPY_INCLUDE_DIR: Directory where GDRCopy headers are found | ||
| # GDRCOPY_LIB_DIR: Directory where GDRCopy libraries are found | ||
|
|
||
| # The following are set after configuration is done: | ||
| # GDRCOPY_FOUND | ||
| # GDRCOPY_INCLUDE_DIRS | ||
| # GDRCOPY_LIBRARIES | ||
|
|
||
| find_path(GDRCOPY_INCLUDE_DIRS | ||
| NAMES gdrapi.h | ||
| HINTS | ||
| ${GDRCOPY_INCLUDE_DIR} | ||
| ${GDRCOPY_ROOT_DIR} | ||
| ${GDRCOPY_ROOT_DIR}/include | ||
| /usr/local/include | ||
| /usr/include) | ||
|
|
||
| find_library(GDRCOPY_LIBRARIES | ||
| NAMES gdrapi | ||
| HINTS | ||
| ${GDRCOPY_LIB_DIR} | ||
| ${GDRCOPY_ROOT_DIR} | ||
| ${GDRCOPY_ROOT_DIR}/lib | ||
| /usr/local/lib | ||
| /usr/lib | ||
| /usr/lib/x86_64-linux-gnu) | ||
|
|
||
| include(FindPackageHandleStandardArgs) | ||
| find_package_handle_standard_args(GDRCopy DEFAULT_MSG GDRCOPY_INCLUDE_DIRS GDRCOPY_LIBRARIES) | ||
| mark_as_advanced(GDRCOPY_INCLUDE_DIRS GDRCOPY_LIBRARIES) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -197,13 +197,12 @@ void IBConnection::recvThreadFunc() { | |
| } | ||
| } | ||
|
|
||
| // Host-side buffer to receive newValue from imm_data (need 64-bit for cudaMemcpy) | ||
| uint64_t newValueHost = 0; | ||
|
|
||
| while (!stopRecvThread_.load(std::memory_order_relaxed)) { | ||
| auto qp = qp_.lock(); | ||
| if (!qp) break; | ||
| auto qp = qp_.lock(); | ||
| if (!qp) return; | ||
|
|
||
| while (!stopRecvThread_.load(std::memory_order_relaxed)) { | ||
| int wcNum = qp->pollRecvCq(); | ||
| if (wcNum < 0) { | ||
| WARN(NET, "IBConnection recvThreadFunc: pollRecvCq failed"); | ||
|
|
@@ -220,22 +219,32 @@ void IBConnection::recvThreadFunc() { | |
| continue; | ||
| } | ||
|
|
||
| // The imm_data contains newValue (32-bit, extended to 64-bit) | ||
| // Note: getRecvWcImmData already converts from network byte order via ntohl | ||
| unsigned int immData = qp->getRecvWcImmData(i); | ||
| newValueHost = static_cast<uint64_t>(immData); | ||
| // Read the token value written by the remote sender. | ||
| #if defined(DEBUG_CUFLUSH) && defined(MSCCLPP_USE_CUDA) | ||
| // cuFlush path: read from imm_data then flush NIC->GPU write pipeline for visibility. | ||
| newValueHost = static_cast<uint64_t>(qp->getRecvWcImmData(i)); | ||
| MSCCLPP_CUTHROW(cuFlushGPUDirectRDMAWrites(CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TARGET_CURRENT_CTX, | ||
| CU_FLUSH_GPU_DIRECT_RDMA_WRITES_TO_OWNER)); | ||
| #else | ||
| // Read the 64-bit token from the local signal GPU buffer via volatile load. | ||
| // localSignalGpuPtr_ points to either a GDRCopy BAR1 mapping (CUDA) or the | ||
| // GPU buffer directly (ROCm system-coherent/uncached memory). volatile is not | ||
| // strictly needed here (uncacheable memory and intervening function calls prevent | ||
| // stale reads), but is kept as a convention for NIC-written memory. | ||
| newValueHost = *static_cast<volatile uint64_t*>(localSignalGpuPtr_); | ||
| #endif | ||
|
|
||
| // Read dstGpuAddr from the local stored address (set by setRemoteUpdateDstAddr) | ||
| uint64_t dstGpuAddr = remoteUpdateDstAddr_; | ||
|
||
| if (dstGpuAddr != 0) { | ||
| uint64_t* dstPtr = reinterpret_cast<uint64_t*>(dstGpuAddr); | ||
|
|
||
| // Use cudaMemcpyAsync with our dedicated stream to avoid blocking on the default stream | ||
| MSCCLPP_CUDATHROW( | ||
| cudaMemcpyAsync(dstPtr, &newValueHost, sizeof(uint64_t), cudaMemcpyHostToDevice, signalStream_)); | ||
|
|
||
| INFO(CONN, "IBConnection recvThreadFunc: updated GPU ptr ", dstPtr, " to ", newValueHost, " (immData=", immData, | ||
| ")"); | ||
| if (remoteUpdateDstAddrMap_ && remoteUpdateDstAddrMap_->valid()) { | ||
| // Direct host-side write to GPU memory via GDRCopy BAR1 mapping | ||
| remoteUpdateDstAddrMap_->copyTo(&newValueHost, sizeof(uint64_t)); | ||
| } else { | ||
| *dstPtr = newValueHost; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this valid for CUDA? Maybe we can throw error if the dstAddrMap is invalid for cuda env |
||
| } | ||
| } | ||
|
|
||
| // Post another recv for future messages | ||
|
|
@@ -250,22 +259,63 @@ IBConnection::IBConnection(std::shared_ptr<Context> context, const Endpoint& loc | |
| : BaseConnection(context, localEndpoint), | ||
| transport_(localEndpoint.transport()), | ||
| remoteTransport_(remoteEndpoint.transport()), | ||
| dummyAtomicSource_(std::make_unique<uint64_t>(0)), | ||
| atomicSrc_(std::make_unique<uint64_t>(0)), | ||
| ibNoAtomic_(getImpl(localEndpoint).ibNoAtomic_), | ||
| stopRecvThread_(false), | ||
| localGpuDeviceId_(localEndpoint.device().id), | ||
| signalStream_(nullptr), | ||
| remoteUpdateDstAddr_(0) { | ||
| remoteUpdateDstAddr_(0), | ||
| remoteSignalGpuMrInfo_{0, 0}, | ||
| localSignalGpuPtr_(nullptr) { | ||
| qp_ = getImpl(localEndpoint).ibQp_; | ||
| qp_.lock()->rtr(getImpl(remoteEndpoint).ibQpInfo_); | ||
| qp_.lock()->rts(); | ||
| dummyAtomicSourceMem_ = context->registerMemory(dummyAtomicSource_.get(), sizeof(uint64_t), transport_); | ||
| validateTransport(dummyAtomicSourceMem_, transport_); | ||
| dstTransportInfo_ = getImpl(dummyAtomicSourceMem_).getTransportInfo(transport_); | ||
| atomicSrcMem_ = context->registerMemory(atomicSrc_.get(), sizeof(uint64_t), transport_); | ||
| validateTransport(atomicSrcMem_, transport_); | ||
| atomicSrcTransportInfo_ = getImpl(atomicSrcMem_).getTransportInfo(transport_); | ||
|
|
||
| if (ibNoAtomic_) { | ||
| // Create a CUDA stream for async memory copies | ||
| MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&signalStream_, cudaStreamNonBlocking)); | ||
| #if defined(MSCCLPP_USE_CUDA) | ||
| if (!gdrEnabled()) { | ||
| std::string reason = "unknown"; | ||
| switch (gdrStatus()) { | ||
| case GdrStatus::NotBuilt: | ||
| reason = "mscclpp was not built with GDRCopy support (MSCCLPP_USE_GDRCOPY not set)"; | ||
| break; | ||
| case GdrStatus::Disabled: | ||
| reason = "GDRCopy is disabled via MSCCLPP_FORCE_DISABLE_GDR environment variable"; | ||
| break; | ||
| case GdrStatus::DriverMissing: | ||
| reason = "GDRCopy kernel driver is not loaded (/dev/gdrdrv not found)"; | ||
| break; | ||
| case GdrStatus::OpenFailed: | ||
| reason = "gdr_open() failed; GDRCopy driver may be misconfigured"; | ||
| break; | ||
| default: | ||
| break; | ||
| } | ||
| THROW(CONN, Error, ErrorCode::InvalidUsage, "IB host-no-atomic mode on CUDA requires GDRCopy: ", reason); | ||
| } | ||
| #endif | ||
|
|
||
| // Extract remote endpoint's signal GPU buffer MR info for write-with-imm destination | ||
| const auto& remoteImpl = getImpl(remoteEndpoint); | ||
| remoteSignalGpuMrInfo_ = remoteImpl.ibSignalGpuMrInfo_; | ||
|
|
||
| // Create a GDR mapping of the local signal GPU buffer. recvThreadFunc reads the | ||
| // 64-bit token via localSignalGpuPtr_, which points to the BAR1-mapped host address | ||
| // (CUDA/GDRCopy) or the GPU buffer directly (ROCm system-coherent memory). | ||
| const auto& localImpl = getImpl(localEndpoint); | ||
| if (gdrEnabled() && localImpl.ibSignalGpuBuffer_) { | ||
| localSignalGpuMap_ = | ||
| std::make_unique<GdrMap>(std::static_pointer_cast<void>(localImpl.ibSignalGpuBuffer_), localGpuDeviceId_); | ||
| } | ||
| if (localSignalGpuMap_ && localSignalGpuMap_->valid()) { | ||
| // Use the BAR1-mapped host pointer; uncacheable MMIO ensures ordered volatile reads. | ||
| localSignalGpuPtr_ = localSignalGpuMap_->hostPtr(); | ||
| } else if (localImpl.ibSignalGpuBuffer_) { | ||
| // ROCm: GPU memory is system-coherent, so direct volatile read is safe. | ||
| localSignalGpuPtr_ = reinterpret_cast<uint64_t*>(localImpl.ibSignalGpuBuffer_.get()); | ||
| } | ||
|
|
||
| // Pre-post receive requests for incoming write-with-imm | ||
| auto qp = qp_.lock(); | ||
|
|
@@ -288,22 +338,25 @@ IBConnection::~IBConnection() { | |
| if (recvThread_.joinable()) { | ||
| recvThread_.join(); | ||
| } | ||
| if (signalStream_ != nullptr) { | ||
| // Synchronize stream to ensure all async copies are complete before destruction | ||
| // Ignore errors during teardown (CUDA context may already be destroyed) | ||
| MSCCLPP_CUDATHROW_IGNORE_TEARDOWN(cudaStreamSynchronize(signalStream_)); | ||
| MSCCLPP_CUDATHROW_IGNORE_TEARDOWN(cudaStreamDestroy(signalStream_)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Transport IBConnection::transport() const { return transport_; } | ||
|
|
||
| Transport IBConnection::remoteTransport() const { return remoteTransport_; } | ||
|
|
||
| void IBConnection::setRemoteUpdateDstAddr(uint64_t addr) { | ||
| remoteUpdateDstAddr_ = addr; | ||
| INFO(CONN, "IBConnection setRemoteUpdateDstAddr: ", (void*)addr); | ||
| bool IBConnection::usesRecvThread() const { return ibNoAtomic_; } | ||
|
|
||
| void IBConnection::setRemoteUpdateDstAddr(std::shared_ptr<uint64_t> gpuMem) { | ||
| remoteUpdateDstAddr_ = reinterpret_cast<uint64_t>(gpuMem.get()); | ||
| if (gdrEnabled()) { | ||
| if (gpuMem) { | ||
| remoteUpdateDstAddrMap_ = std::make_unique<GdrMap>(std::move(gpuMem), localGpuDeviceId_); | ||
| } else { | ||
| remoteUpdateDstAddrMap_.reset(); | ||
| } | ||
| } | ||
| INFO(CONN, "IBConnection setRemoteUpdateDstAddr: ", (void*)remoteUpdateDstAddr_); | ||
| } | ||
|
|
||
| void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, | ||
|
|
@@ -356,22 +409,24 @@ void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint6 | |
| *src = newValue; | ||
|
|
||
| if (ibNoAtomic_) { | ||
| // Use RDMA write-with-imm instead of atomic operation | ||
| // Send only newValue in imm_data (0-byte write) | ||
| // The remote's recvThreadFunc will use its stored remoteUpdateDstAddr_ to write | ||
| // Use RDMA write-with-imm instead of atomic operation. | ||
| // Write the token value (8 bytes) from the local host buffer to the remote signal GPU buffer, | ||
| // with newValue also in imm_data (32-bit). The remote's recvThreadFunc reads the token from | ||
| // the signal GPU buffer and forwards it to the semaphore's inbound token address. | ||
|
|
||
| // Put newValue in imm_data (truncated to 32-bit; semaphore counters should fit) | ||
| unsigned int immData = static_cast<unsigned int>(newValue); | ||
|
|
||
| // Send 0-byte write-with-imm; use dstMrInfo as target (we don't actually write anything) | ||
| qp_.lock()->stageSendWriteWithImm(nullptr, dstMrInfo, | ||
| /*size=*/0, /*wrId=*/0, | ||
| // Write the real token value into the host buffer, then RDMA write host->remote GPU | ||
| *atomicSrc_ = newValue; | ||
| qp_.lock()->stageSendWriteWithImm(atomicSrcTransportInfo_.ibMr, remoteSignalGpuMrInfo_, | ||
| /*size=*/sizeof(uint64_t), /*wrId=*/0, | ||
| /*srcOffset=*/0, /*dstOffset=*/0, | ||
| /*signaled=*/true, /*immData=*/immData); | ||
| qp_.lock()->postSend(); | ||
| INFO(CONN, "IBConnection write-with-imm: value ", oldValue, " -> ", newValue); | ||
| } else { | ||
| qp_.lock()->stageSendAtomicAdd(dstTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, newValue - oldValue, | ||
| qp_.lock()->stageSendAtomicAdd(atomicSrcTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, newValue - oldValue, | ||
| /*signaled=*/true); | ||
| qp_.lock()->postSend(); | ||
| INFO(CONN, "IBConnection atomic Write: from ", src, " to ", (uint8_t*)dstMrInfo.addr + dstOffset, ", ", oldValue, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to keep this code here?