Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions ucm/transport/kv/asu/test/case/buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,32 @@ TEST_F(BufferManagerTest, InitWithProviderRegistersMemory)
ASSERT_NE(provider.lastAddr, 0);
ASSERT_EQ(provider.lastSize, 1024 * 10);
ASSERT_EQ(mgr.GetTokenId(), 42);

ScatterGatherEntry sge;
ASSERT_TRUE(mgr.Allocate(64, sge).ok());
ASSERT_EQ(sge.addr, sge.device_addr);
}

TEST_F(BufferManagerTest, HostPinnedRegistersDeviceAddress)
{
StubTransProvider provider;

BufferManager mgr;
auto status = mgr.Init("test_rdma_pinned", MemoryType::HOST_PINNED, 4096, 1, &provider);
ASSERT_TRUE(status.ok()) << status.message;
ASSERT_EQ(provider.registerCount, 1);
ASSERT_EQ(provider.lastMemType, TransProvider::MemType::MEM_DEVICE);

ScatterGatherEntry sge;
ASSERT_TRUE(mgr.Allocate(64, sge).ok());
ASSERT_NE(sge.addr, 0);
ASSERT_NE(sge.device_addr, 0);
ASSERT_NE(sge.addr, sge.device_addr);
ASSERT_EQ(provider.lastAddr, sge.device_addr);

// The CPU writes through addr while HCOMM and remote RDMA use device_addr.
std::memset(reinterpret_cast<void*>(sge.addr), 0x5A, sge.length);
ASSERT_EQ(*reinterpret_cast<unsigned char*>(sge.addr), 0x5A);
}

TEST_F(BufferManagerTest, InitWithProviderAllocateReturnsTokenId)
Expand Down
15 changes: 9 additions & 6 deletions ucm/transport/kv/asu/trans/src/asu_submit_flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,15 @@ Status AsuTransportImpl::BuildSubBatchSendBuffers(
continue;
}

if (subBatchContext.flagBuffer.addr == 0 || subBatchContext.flagBuffer.length == 0) {
if (subBatchContext.sendSge.device_addr == 0 || subBatchContext.flagBuffer.addr == 0 ||

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about sendSge.addr and sendSge.length?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] Missing validation for sendSge.length

The validation now checks sendSge.device_addr, flagBuffer.addr, flagBuffer.device_addr, and flagBuffer.length, but sendSge.length is not validated. If sendSge.length is 0 or invalid, the SendIoBatch will have an incorrect length parameter, potentially causing buffer overflow or underflow in RDMA operations.

Add: sendSge.length == 0 to the validation condition.

subBatchContext.flagBuffer.device_addr == 0 || subBatchContext.flagBuffer.length == 0) {
const auto status =
Status::Error(StatusCode::NOT_INITIALIZED, "sub-batch flag buffer is not ready");
Status::Error(StatusCode::NOT_INITIALIZED, "sub-batch transport buffers are not ready");
UC_ERROR(
"Sub-batch flag buffer is not ready index={} cid={} flag_addr={} flag_length={}",
index, subBatchContext.cid, subBatchContext.flagBuffer.addr,
"Sub-batch transport buffers are not ready index={} cid={} send_device_addr={} "
"flag_addr={} flag_device_addr={} flag_length={}",
index, subBatchContext.cid, subBatchContext.sendSge.device_addr,
subBatchContext.flagBuffer.addr, subBatchContext.flagBuffer.device_addr,
subBatchContext.flagBuffer.length);
SetSubBatchSendFailed(subBatchContext, status);
if (finalStatus.ok()) { finalStatus = status; }
Expand All @@ -134,8 +137,8 @@ Status AsuTransportImpl::BuildSubBatchSendBuffers(

ioBatches.push_back(
TransProvider::SendIoBatch{subBatchContext.channel->GetConnection(),
reinterpret_cast<void*>(subBatchContext.sendSge.addr),
reinterpret_cast<void*>(subBatchContext.flagBuffer.addr),
reinterpret_cast<void*>(subBatchContext.sendSge.device_addr),
reinterpret_cast<void*>(subBatchContext.flagBuffer.device_addr),
subBatchContext.sendSge.length});
subBatchIndexes.emplace_back(index);
}
Expand Down
16 changes: 10 additions & 6 deletions ucm/transport/kv/asu/trans/src/asu_transport_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,18 @@ Status AsuTransportImpl::Init(const TransportConfig& config)
auto status = ValidateSqeRequestAttrs();
if (!status.ok()) { return status; }

status =
sendBufferManager_.Init("asu send buffer", MemoryType::HOST, config_.sendBufferSlotSize,
config_.sendBufferSlotNum, transProvider_.get());
// SQEs are packed through the CPU mapping, while AICPU/HCOMM sends from the
// device-visible mapping of the same host-pinned allocation.
status = sendBufferManager_.Init("asu send buffer", MemoryType::HOST_PINNED,
config_.sendBufferSlotSize, config_.sendBufferSlotNum,
transProvider_.get());
if (!status.ok()) { return status; }

status =
flagBufferManager_.Init("asu flag buffer", MemoryType::HOST, config_.flagBufferSlotSize,
config_.flagBufferSlotNum, transProvider_.get());
// The server writes the completion to device_addr via RDMA; CompletionLoop
// observes that write through the coherent CPU mapping in flagBuffer.addr.
status = flagBufferManager_.Init("asu flag buffer", MemoryType::HOST_PINNED,
config_.flagBufferSlotSize, config_.flagBufferSlotNum,
transProvider_.get());
if (!status.ok()) { return status; }
protocolManager_ = std::make_unique<ProtocolManager>();

Expand Down
105 changes: 88 additions & 17 deletions ucm/transport/kv/asu/trans/src/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,79 @@
#include "trans/ascend/ascend_buffer.h"

namespace UC::ASU {
namespace {

struct BufferRegion {
std::shared_ptr<void> owner;
void* localAddr{nullptr};
void* deviceAddr{nullptr};
TransProvider::MemType providerMemType{TransProvider::MemType::MEM_HOST};
};

class BufferRegionCreator : public Trans::AscendBuffer {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming problem, the parent class is a Buffer and the child class is a Creator?

public:
Status MakeRegion(MemoryType type, std::size_t size, BufferRegion& region)
{
switch (type) {
case MemoryType::HOST: {
auto owner = MakeHostBuffer(size);
if (!owner) { return AllocationFailed("host"); }
region = {owner, owner.get(), owner.get(), TransProvider::MemType::MEM_HOST};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] device_addr semantic inconsistency for HOST memory type

For MemoryType::HOST, deviceAddr is set equal to localAddr (owner.get()). This is semantically ambiguous because plain host memory does not have a device-visible mapping. The comment in buffer_manager.h states device_addr is for device-visible address used by HCOMM/RDMA, but setting it equal to addr for HOST type contradicts this.

Consider:

  1. Setting device_addr = 0 for HOST type (forcing callers to handle the case)
  2. Or explicitly documenting that addr == device_addr for HOST memory means the same physical address is used

return Status::OK();
}
case MemoryType::HOST_PINNED: return MakeHostPinnedBuffer(size, region);
case MemoryType::ASCEND_DEVICE: {
auto owner = MakeDeviceBuffer(size);
if (!owner) { return AllocationFailed("device"); }
region = {owner, owner.get(), owner.get(), TransProvider::MemType::MEM_DEVICE};
return Status::OK();
}
default:
return Status::Error(StatusCode::INVALID_ARGUMENT, "unsupported memory type");
}
}

private:
static Status AllocationFailed(const char* type)
{
return Status::Error(StatusCode::INTERNAL_ERROR,
std::string("failed to allocate ") + type + " memory");
}

Status MakeHostPinnedBuffer(std::size_t size, BufferRegion& region)
{
void* hostAddr = nullptr;
auto ret = aclrtMallocHost(&hostAddr, size);
if (ret != ACL_SUCCESS) { return AllocationFailed("host-pinned"); }

ret = aclrtHostRegisterV2(hostAddr, size, ACL_HOST_REG_MAPPED | ACL_HOST_REG_PINNED);
if (ret != ACL_SUCCESS) {
aclrtFreeHost(hostAddr);
return Status::Error(StatusCode::INTERNAL_ERROR,
"failed to register host-pinned memory with ACL");
}

void* deviceAddr = nullptr;
ret = aclrtHostGetDevicePointer(hostAddr, &deviceAddr, 0);
if (ret != ACL_SUCCESS) {
aclrtHostUnregister(hostAddr);
aclrtFreeHost(hostAddr);
return Status::Error(StatusCode::INTERNAL_ERROR,
"failed to get host-pinned device address");
}

// The owner keeps the ACL registration alive until after HCOMM has
// unregistered the region in BufferManager's destructor.
auto owner = std::shared_ptr<void>(hostAddr, [](void* addr) {
aclrtHostUnregister(addr);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Warning] Custom deleter lacks error handling

The custom deleter calls aclrtHostUnregister(addr) and aclrtFreeHost(addr) without checking return values. If aclrtHostUnregister fails, calling aclrtFreeHost on still-registered memory may cause undefined behavior or resource leaks.

Consider:

  1. Logging errors in the deleter (even if silent)
  2. Ensuring the cleanup order is safe per ACL documentation
  3. Using aclrtFreeHost only if unregister succeeded

aclrtFreeHost(addr);
});
region = {owner, hostAddr, deviceAddr, TransProvider::MemType::MEM_DEVICE};
return Status::OK();
}
};

} // namespace

BufferManager::~BufferManager()
{
Expand All @@ -38,6 +111,7 @@ BufferManager::~BufferManager()
provider_->UnregisterMemory(descs);
}
memory_.reset();
device_memory_ = nullptr;
slot_size_ = 0;
slot_num_ = 0;
}
Expand All @@ -60,22 +134,18 @@ Status BufferManager::Init(std::string name, MemoryType type, std::size_t slot_s

std::size_t total = slot_size * slot_num;

Trans::AscendBuffer allocator;
switch (memory_type_) {
case MemoryType::HOST: memory_ = allocator.MakeHostBuffer(total); break;
case MemoryType::HOST_PINNED: memory_ = allocator.MakeHostBuffer4DirectIo(total); break;
case MemoryType::ASCEND_DEVICE: memory_ = allocator.MakeDeviceBuffer(total); break;
default:
return Status::Error(StatusCode::INVALID_ARGUMENT, name_ + ": unsupported memory type");
}

if (!memory_) {
return Status::Error(StatusCode::INTERNAL_ERROR, name_ + ": failed to allocate memory");
}
BufferRegionCreator regionCreator;
BufferRegion region;
auto allocStatus = regionCreator.MakeRegion(memory_type_, total, region);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Violating the inherit spirit, the child class has its own public function and will be called directly.

if (!allocStatus.ok()) { return allocStatus; }
memory_ = std::move(region.owner);
device_memory_ = region.deviceAddr;
provider_mem_type_ = region.providerMemType;

if (memory_type_ == MemoryType::ASCEND_DEVICE) {
if (aclrtMemset(memory_.get(), total, 0, total) != ACL_SUCCESS) {
memory_.reset();
device_memory_ = nullptr;
return Status::Error(StatusCode::INTERNAL_ERROR,
name_ + ": failed to zero device memory");
}
Expand All @@ -91,6 +161,7 @@ Status BufferManager::Init(std::string name, MemoryType type, std::size_t slot_s
if (!regStatus.ok()) {
provider_ = nullptr;
memory_.reset();
device_memory_ = nullptr;
return regStatus;
}
}
Expand All @@ -100,11 +171,9 @@ Status BufferManager::Init(std::string name, MemoryType type, std::size_t slot_s

Status BufferManager::RegisterMemory()
{
auto memType = (memory_type_ == MemoryType::ASCEND_DEVICE) ? TransProvider::MemType::MEM_DEVICE
: TransProvider::MemType::MEM_HOST;
std::size_t total = slot_size_ * slot_num_;
std::vector<TransProvider::RegisterMemoryDesc> descs{
{memType, reinterpret_cast<uintptr_t>(memory_.get()), total}
{provider_mem_type_, reinterpret_cast<uintptr_t>(device_memory_), total}
};
std::vector<TransProvider::MemHandle> memHandles;
auto regStatus = provider_->RegisterMemory(nullptr, descs, memHandles);
Expand Down Expand Up @@ -141,8 +210,10 @@ Status BufferManager::Allocate(std::size_t size, ScatterGatherEntry& sge)
if (idx == IndexPool::npos) {
return Status::Error(StatusCode::RESOURCE_BUSY, name_ + ": no free slots");
}
void* addr = static_cast<char*>(memory_.get()) + idx * slot_size_;
sge.addr = reinterpret_cast<std::uint64_t>(addr);
const auto offset = idx * slot_size_;
sge.addr = reinterpret_cast<std::uint64_t>(static_cast<char*>(memory_.get()) + offset);
sge.device_addr =
reinterpret_cast<std::uint64_t>(static_cast<char*>(device_memory_) + offset);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Warning] Potential null pointer arithmetic if device_memory_ is null

The device_addr calculation uses static_cast<char*>(device_memory_) + offset. If device_memory_ is nullptr (e.g., due to a failed Init that did not properly set it), this pointer arithmetic produces a garbage address value.

While Allocate() checks memory_ is valid at line 201, it does not verify device_memory_ is non-null. Consider adding: if (!device_memory_) return Status::Error(...)

sge.length = static_cast<std::uint32_t>(size);
sge.tokenId = tokenId_;
sge.slot_index = idx;
Expand Down
5 changes: 5 additions & 0 deletions ucm/transport/kv/asu/trans/src/buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
namespace UC::ASU {

struct ScatterGatherEntry {
// Local address used by CPU code for SQE packing and completion polling.
std::uint64_t addr{0};
// Device-visible address used by HCOMM/HIXL and remote RDMA operations.
std::uint64_t device_addr{0};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Performance] Struct size increase may impact cache efficiency

Adding device_addr increases ScatterGatherEntry from ~24 bytes to ~32 bytes (assuming 64-bit system). In high-throughput scenarios with many pending SQEs, this 33% size increase could:

  1. Increase memory footprint for sub-batch contexts
  2. Reduce cache efficiency when processing batches
  3. Impact memory bandwidth in tight loops

This is acceptable given the feature requirement, but worth noting for performance-sensitive deployments.

std::uint32_t length{0};
std::uint32_t tokenId{0};
std::uint32_t slot_index{UINT32_MAX};
Expand Down Expand Up @@ -66,6 +69,8 @@ class BufferManager {
MemoryType memory_type_{MemoryType::HOST};

std::shared_ptr<void> memory_;
void* device_memory_{nullptr};
TransProvider::MemType provider_mem_type_{TransProvider::MemType::MEM_HOST};
IndexPool index_pool_;

TransProvider* provider_{nullptr};
Expand Down
10 changes: 5 additions & 5 deletions ucm/transport/kv/asu/trans/src/sqe_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ KvBatchStoreRequest BuildBatchStoreRequest(
request.kv_ns_id = GetTransportConfigAttr<std::uint32_t>(attrs, "kv_ns_id");
request.dtype = GetTransportConfigAttr<std::uint8_t>(attrs, "dtype");
request.dspec = GetTransportConfigAttr<std::uint8_t>(attrs, "dspec");
request.response_buffer_addr = flagBuffer.addr;
request.response_buffer_addr = flagBuffer.device_addr;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] Architectural consistency for user-provided buffer addresses

The response_buffer_addr now correctly uses flagBuffer.device_addr for RDMA operations. However, the entry.buffer_addr at line 212 uses entries[index].buffer.region.addr (CPU address). If user-provided KVBuffer regions are also host-pinned memory, they should similarly use device addresses for RDMA operations.

This creates a semantic inconsistency: internal buffers use device_addr for RDMA, but user buffers use addr. Consider:

  1. Adding a device_addr field to MemoryRegion struct
  2. Or documenting that user-provided buffers must use device-visible addresses if they are host-pinned
  3. Or providing a registration API that captures both addresses

request.response_mr_key = flagBuffer.tokenId;
request.lr = GetTransportConfigAttr<bool>(attrs, "lr");
request.rflag = true;
Expand All @@ -224,7 +224,7 @@ KvBatchRetrieveRequest BuildBatchRetrieveRequest(
KvBatchRetrieveRequest request;
request.cid = cid;
request.kv_ns_id = GetTransportConfigAttr<std::uint32_t>(attrs, "kv_ns_id");
request.response_buffer_addr = flagBuffer.addr;
request.response_buffer_addr = flagBuffer.device_addr;
request.response_mr_key = flagBuffer.tokenId;
request.lr = GetTransportConfigAttr<bool>(attrs, "lr");
request.rflag = true;
Expand Down Expand Up @@ -259,7 +259,7 @@ KvDeleteRequest BuildDeleteRequest(const BatchView<CacheKey>& keys,
KvDeleteRequest request;
request.cid = cid;
request.kv_ns_id = GetTransportConfigAttr<std::uint32_t>(attrs, "kv_ns_id");
request.response_buffer_addr = flagBuffer.addr;
request.response_buffer_addr = flagBuffer.device_addr;
request.response_mr_key = flagBuffer.tokenId;
request.rflag = true;
request.keys = CopyKeys(keys);
Expand All @@ -274,7 +274,7 @@ KvExistRequest BuildExistRequest(const BatchView<CacheKey>& keys,
KvExistRequest request;
request.cid = cid;
request.kv_ns_id = GetTransportConfigAttr<std::uint32_t>(attrs, "kv_ns_id");
request.response_buffer_addr = flagBuffer.addr;
request.response_buffer_addr = flagBuffer.device_addr;
request.response_mr_key = flagBuffer.tokenId;
request.rflag = true;
request.sc = GetTransportConfigAttr<bool>(attrs, "sc");
Expand All @@ -287,7 +287,7 @@ KvKeepAliveRequest BuildKeepAliveRequest(std::uint16_t cid, const ScatterGatherE
{
KvKeepAliveRequest request;
request.cid = cid;
request.response_buffer_addr = flagBuffer.addr;
request.response_buffer_addr = flagBuffer.device_addr;
request.response_mr_key = flagBuffer.tokenId;
request.rflag = true;
return request;
Expand Down
39 changes: 38 additions & 1 deletion ucm/transport/kv/asu/trans/test/asu_submit_flow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,43 @@ TEST_F(AsuSubmitFlowBufferTest, BuildSubBatchSendBuffersMarksMissingFlagBufferFa
}
}

TEST_F(AsuSubmitFlowBufferTest, BuildSubBatchSendBuffersUsesHostPinnedDeviceAddresses)
{
ASSERT_TRUE(transport_->sendBufferManager_
.Init("test send buffer", MemoryType::HOST_PINNED, 4096, 1)
.ok());
ASSERT_TRUE(transport_->flagBufferManager_
.Init("test flag buffer", MemoryType::HOST_PINNED, 128, 1)
.ok());

transport_->connManager_ =
std::make_unique<ConnectionManager>(*transport_->transProvider_, "", 5000);
ASSERT_TRUE(transport_->connManager_->AddGroup(AsuEndpoint{}, 1).ok());

std::vector<TransportSubBatchContext> subBatchContexts(1);
auto& subBatchContext = subBatchContexts[0];
subBatchContext.state = TransportSubBatchState::PENDING;
subBatchContext.channel = transport_->connManager_->SelectConnection();
subBatchContext.entryStatus.assign(1, Status::OK());
ASSERT_NE(subBatchContext.channel, nullptr);
ASSERT_TRUE(transport_->sendBufferManager_.Allocate(64, subBatchContext.sendSge).ok());
ASSERT_TRUE(transport_->flagBufferManager_.Allocate(64, subBatchContext.flagBuffer).ok());
ASSERT_NE(subBatchContext.sendSge.addr, subBatchContext.sendSge.device_addr);
ASSERT_NE(subBatchContext.flagBuffer.addr, subBatchContext.flagBuffer.device_addr);

std::vector<TransProvider::SendIoBatch> ioBatches;
std::vector<std::size_t> subBatchIndexes;
const auto status =
transport_->BuildSubBatchSendBuffers(subBatchContexts, ioBatches, subBatchIndexes);

ASSERT_TRUE(status.ok()) << status.message;
ASSERT_EQ(ioBatches.size(), std::size_t{1});
EXPECT_EQ(ioBatches[0].sendBuffer,
reinterpret_cast<void*>(subBatchContext.sendSge.device_addr));
EXPECT_EQ(ioBatches[0].flagBuffer,
reinterpret_cast<void*>(subBatchContext.flagBuffer.device_addr));
}

TEST(AsuSubmitFlowTest, SendSubBatchBuffersFailsAllSentSubBatchesWhenStatusCountMismatches)
{
g_sendStatuses = {Status::OK()};
Expand Down Expand Up @@ -290,4 +327,4 @@ TEST(AsuSubmitFlowTest, SendSubBatchBuffersFailsAllSentSubBatchesWhenStatusCount
}

} // namespace
} // namespace UC::ASU
} // namespace UC::ASU
20 changes: 14 additions & 6 deletions ucm/transport/kv/asu/trans/test/sqe_request_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ class SqeRequestTest : public ::testing::Test {
transport_->config_.attrs = DefaultAttrs();
transport_->nextRequestCid_.store(1, std::memory_order_relaxed);
auto* provider = transport_->transProvider_.get();
auto status =
transport_->flagBufferManager_.Init("test flag buffer", MemoryType::HOST,
kFlagBufferSlotSize, kFlagBufferSlotNum, provider);
auto status = transport_->flagBufferManager_.Init(
"test flag buffer", MemoryType::HOST_PINNED, kFlagBufferSlotSize,
kFlagBufferSlotNum, provider);
ASSERT_TRUE(status.ok()) << status.message;
status = transport_->sendBufferManager_.Init("test send buffer", MemoryType::HOST,
kTestSendBufferSlotSize,
kTestSendBufferSlotNum, provider);
status = transport_->sendBufferManager_.Init(
"test send buffer", MemoryType::HOST_PINNED, kTestSendBufferSlotSize,
kTestSendBufferSlotNum, provider);
ASSERT_TRUE(status.ok()) << status.message;
transport_->protocolManager_ = std::make_unique<ProtocolManager>();
}
Expand Down Expand Up @@ -190,6 +190,14 @@ TEST_F(SqeRequestTest, SubmitBatchStoreAllocatesFlagBufferAndBuildsRequest)
EXPECT_EQ(subBatchContext.state, TransportSubBatchState::PENDING);
EXPECT_TRUE(subBatchContext.status.ok());
EXPECT_NE(subBatchContext.sendSge.addr, std::uint64_t{0});
EXPECT_NE(subBatchContext.sendSge.addr, subBatchContext.sendSge.device_addr);
EXPECT_NE(subBatchContext.flagBuffer.addr, subBatchContext.flagBuffer.device_addr);

const auto* packedSqe = reinterpret_cast<const std::uint32_t*>(subBatchContext.sendSge.addr);
const auto packedResponseAddr =
static_cast<std::uint64_t>(packedSqe[3]) |
(static_cast<std::uint64_t>(packedSqe[4]) << 32);
EXPECT_EQ(packedResponseAddr, subBatchContext.flagBuffer.device_addr);
ASSERT_EQ(subBatchContext.entryStatus.size(), entries.size());
for (const auto& entryStatus : subBatchContext.entryStatus) { EXPECT_TRUE(entryStatus.ok()); }
}
Expand Down