From 0619d98e31ded2dd547c1df12cff46f5470d6e63 Mon Sep 17 00:00:00 2001 From: hogura Date: Mon, 6 Jan 2025 09:11:36 +0000 Subject: [PATCH 1/8] Refactoring comm --- csrc/bindings.cpp | 11 +- csrc/include/binding_helper.h | 35 ---- csrc/include/binding_tests.hpp | 324 ++++++++++++++++----------------- csrc/include/comm.h | 31 ++-- csrc/include/muhelper.h | 4 +- csrc/muhelper/comm.cpp | 124 +++++++++---- csrc/muhelper/embedding.cpp | 2 +- csrc/muhelper/muhelper.cpp | 47 +++-- 8 files changed, 300 insertions(+), 278 deletions(-) delete mode 100644 csrc/include/binding_helper.h diff --git a/csrc/bindings.cpp b/csrc/bindings.cpp index 5bfcc54..1b8bf13 100644 --- a/csrc/bindings.cpp +++ b/csrc/bindings.cpp @@ -10,7 +10,6 @@ #include "datatypes.hpp" #include "block_manager.h" #include "permute.h" -#include "binding_helper.h" #include "binding_tests.hpp" #include "profiler.hpp" @@ -179,12 +178,12 @@ PYBIND11_MODULE(disagmoe_c, m) { /******** Test functions ********/ - m.def("test_nccl_p2p", &test_nccl_p2p); - m.def("test_nccl_group", &test_nccl_group); - m.def("test_parallel_attn_scheduler", &test_parallel_attn_scheduler); - m.def("test_multi_launch", &test_multi_launch); + // m.def("test_nccl_p2p", &test_nccl_p2p); + // m.def("test_nccl_group", &test_nccl_group); + // m.def("test_parallel_attn_scheduler", &test_parallel_attn_scheduler); + // m.def("test_multi_launch", &test_multi_launch); - REGISTER_FUNC(test_op_overlap); + // REGISTER_FUNC(test_op_overlap); // m.def("test_zmq_sub_pub", &test_zmq_sub_pub); // m.def("test_attn_dispatcher", &test_attn_dispatcher); // m.def("test_expert_dispatcher", &test_expert_dispatcher); diff --git a/csrc/include/binding_helper.h b/csrc/include/binding_helper.h deleted file mode 100644 index b07f380..0000000 --- a/csrc/include/binding_helper.h +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once - -#include "muhelper.h" -#include "comm.h" - -#include -#include - -class PyMuHelper: MuHelper { -public: - using MuHelper::MuHelper; - using MuHelper::start; - using MuHelper::terminate; - - void run() override { - PYBIND11_OVERRIDE_PURE(void, MuHelper, run); - } -}; - -class PyChannel: Channel { -public: - using Channel::Channel; - - void instantiate() override { - PYBIND11_OVERRIDE_PURE(void, Channel, instantiate); - } - - void send(uintptr_t data, const Metadata& metadata) override { - PYBIND11_OVERRIDE_PURE(void, Channel, send); - } - - void recv(uintptr_t data, const Metadata& metadata) override { - PYBIND11_OVERRIDE_PURE(void, Channel, recv); - } -}; \ No newline at end of file diff --git a/csrc/include/binding_tests.hpp b/csrc/include/binding_tests.hpp index c59e0e5..6c9b579 100644 --- a/csrc/include/binding_tests.hpp +++ b/csrc/include/binding_tests.hpp @@ -12,22 +12,22 @@ #include #include -void test_nccl_p2p(Channel_t c1, uintptr_t p1, Channel_t c2, uintptr_t p2, const Metadata& metadata) { - auto t1 = std::thread([&]{c1->send(p1, metadata);}); - auto t2 = std::thread([&]{c2->recv(p2, metadata);}); - t1.join(); - t2.join(); -} - -std::pair _init_channel(int s = 0, int r = 1) { - auto uid = get_nccl_unique_id(); - auto c1 = create_channel(s, r, uid); - auto c2 = create_channel(r, s, uid); - - auto t1 = std::thread([&]{ c1->instantiate(); }), t2 = std::thread([&]{ c2->instantiate(); }); - t1.join(); t2.join(); - return std::make_pair(c1, c2); -} +// void test_nccl_p2p(Channel_t c1, uintptr_t p1, Channel_t c2, uintptr_t p2, const Metadata& metadata) { +// auto t1 = std::thread([&]{c1->send(p1, metadata);}); +// auto t2 = std::thread([&]{c2->recv(p2, metadata);}); +// t1.join(); +// t2.join(); +// } + +// std::pair _init_channel(int s = 0, int r = 1) { +// auto uid = get_nccl_unique_id(); +// auto c1 = create_channel(s, r, uid); +// auto c2 = create_channel(r, s, uid); + +// auto t1 = std::thread([&]{ c1->instantiate(); }), t2 = std::thread([&]{ c2->instantiate(); }); +// t1.join(); t2.join(); +// return std::make_pair(c1, c2); +// } // void test_zmq_sub_pub() { // auto pr = _init_channel(); @@ -347,150 +347,150 @@ std::pair _init_channel(int s = 0, int r = 1) { // exit(0); // } -void test_nccl_group(int rank, std::vector ranks, std::string uid) { - auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); - auto c = static_cast(c_raw.get()); - c->instantiate(); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; +// void test_nccl_group(int rank, std::vector ranks, std::string uid) { +// auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); +// auto c = static_cast(c_raw.get()); +// c->instantiate(); +// std::this_thread::sleep_for(std::chrono::milliseconds(1000)); +// DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; - if (rank == 0) { - Metadata meta = Metadata { - /*shape=*/ std::vector({1, 4}), - /*dtype=*/ "fp16", - /*layer_id=*/ 1, - /*req_ids=*/ std::vector({2}), - /*exp_ids=*/ std::vector({3}), - /*prefill_poss=*/ std::vector({4}), - }; - uintptr_t buf = alloc_cuda_tensor(4, 0); - c->send_metadata(meta); - DMOE_LOG(INFO) << "Send metadata." << LEND; - c->send(buf, meta); - } else { - Metadata meta; - c->recv_metadata(meta); - DMOE_LOG(INFO) << "Got metadata: " << meta << LEND; - ASSERT(meta.num_element() == 4); - uintptr_t buf = alloc_cuda_tensor(meta.num_element(), 0); - c->recv(buf, meta); - ASSERT(meta.req_ids[0] == 2); - ASSERT(meta.exp_ids[0] == 3); - ASSERT(meta.prefill_poss[0] == 4); - } - - DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; -} - -void test_parallel_attn_scheduler(int rank, std::vector ranks, std::string uid) { - auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); - auto c = static_cast(c_raw.get()); - c->instantiate(); - DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; - - std::vector layer_ids{0, 1}; - std::vector channels{}; - mu_attn_pool_t pool = std::make_shared( - layer_ids, - rank, - channels - ); - - torch::Tensor dummy_tensor = torch::empty({0}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0)); - - std::vector> data_queue(2); - data_queue[0] = std::vector{ - AttentionBatch{dummy_tensor, std::make_shared( - AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {0}, {1}, {1}, {}} - )}, - AttentionBatch{dummy_tensor, std::make_shared( - AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {1}, {1}, {1}, {}} - )}, - }; - data_queue[1] = std::vector{ - AttentionBatch{dummy_tensor, std::make_shared( - AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {2}, {1}, {1}, {}} - )} - }; - std::vector token_per_layer {2, 1}; - pool->__set_attn_data_queue(data_queue, token_per_layer, 0); - - AttentionBatch result; - - if (rank == 0) { - // driver scheduler - AttentionDriverScheduler scheduler(pool, layer_ids, c_raw, c_raw); - result = scheduler.schedule(); - } else { - // worker scheduler - AttentionWorkerScheduler scheduler(pool, layer_ids, c_raw, c_raw); - result = scheduler.schedule(); - } - - ASSERT(result.metadata.get() != nullptr); - auto &seq_ids = result.metadata->seq_ids; - - for (int i: seq_ids) - DMOE_LOG(DEBUG) << "seq_id: " << i << LEND; - - ASSERT(seq_ids.size() == 2); - ASSERT(seq_ids[0] == 0 && seq_ids[1] == 1); - - DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; -} - -void test_multi_launch(int rank, std::vector ranks, std::vector uids) { - std::vector threads; - for (int i = 0; i < uids.size(); i ++) { - threads.push_back(std::thread( - [&](std::string uid, int i) { - auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); - auto c = std::dynamic_pointer_cast(c_raw); - c->instantiate(); - cudaStream_t stream; - cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, -2); - if (i == 0) { - if (rank == 0) { - DMOE_LOG(DEBUG) << "sending metadata" << LEND; - c->send_metadata(Metadata { - /*shape=*/ std::vector({1, 4}), - /*dtype=*/ "fp16", - /*layer_id=*/ 1, - /*req_ids=*/ std::vector({rank * 10 + 0}), - /*exp_ids=*/ std::vector({3}), - /*prefill_poss=*/ std::vector({4}), - }); - DMOE_LOG(DEBUG) << "thread " << i << "sleeping" << LEND; - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - // c->send_metadata(Metadata { - // /*shape=*/ std::vector({1, 4}), - // /*dtype=*/ "fp16", - // /*layer_id=*/ 1, - // /*req_ids=*/ std::vector({rank * 10 + 1}), - // /*exp_ids=*/ std::vector({3}), - // /*prefill_poss=*/ std::vector({4}), - // }); - } else { - Metadata meta; - DMOE_LOG(DEBUG) << "receiving metadata" << LEND; - c->recv_metadata(meta); - DMOE_LOG(DEBUG) << "get " << meta << LEND; - c->recv_metadata(meta); - DMOE_LOG(DEBUG) << "get " << meta << LEND; - } - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - DMOE_LOG(WARNING) << "thread " << i << " trying to allocate tensor" << LEND; - void* data; - CUDACHECK(cudaMallocAsync(&data, 4096, stream)); - CUDACHECK(cudaStreamSynchronize(stream)); - // c->all_reduce(data, {1, 4096}); - DMOE_LOG(WARNING) << "allocated tensor" << LEND; - } - }, uids[i], i - )); - } - for (auto &t: threads) - t.join(); - DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; -} \ No newline at end of file +// if (rank == 0) { +// Metadata meta = Metadata { +// /*shape=*/ std::vector({1, 4}), +// /*dtype=*/ "fp16", +// /*layer_id=*/ 1, +// /*req_ids=*/ std::vector({2}), +// /*exp_ids=*/ std::vector({3}), +// /*prefill_poss=*/ std::vector({4}), +// }; +// uintptr_t buf = alloc_cuda_tensor(4, 0); +// c->send_metadata(meta); +// DMOE_LOG(INFO) << "Send metadata." << LEND; +// c->send(buf, meta); +// } else { +// Metadata meta; +// c->recv_metadata(meta); +// DMOE_LOG(INFO) << "Got metadata: " << meta << LEND; +// ASSERT(meta.num_element() == 4); +// uintptr_t buf = alloc_cuda_tensor(meta.num_element(), 0); +// c->recv(buf, meta); +// ASSERT(meta.req_ids[0] == 2); +// ASSERT(meta.exp_ids[0] == 3); +// ASSERT(meta.prefill_poss[0] == 4); +// } + +// DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; +// } + +// void test_parallel_attn_scheduler(int rank, std::vector ranks, std::string uid) { +// auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); +// auto c = static_cast(c_raw.get()); +// c->instantiate(); +// DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; + +// std::vector layer_ids{0, 1}; +// std::vector channels{}; +// mu_attn_pool_t pool = std::make_shared( +// layer_ids, +// rank, +// channels +// ); + +// torch::Tensor dummy_tensor = torch::empty({0}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0)); + +// std::vector> data_queue(2); +// data_queue[0] = std::vector{ +// AttentionBatch{dummy_tensor, std::make_shared( +// AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {0}, {1}, {1}, {}} +// )}, +// AttentionBatch{dummy_tensor, std::make_shared( +// AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {1}, {1}, {1}, {}} +// )}, +// }; +// data_queue[1] = std::vector{ +// AttentionBatch{dummy_tensor, std::make_shared( +// AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {2}, {1}, {1}, {}} +// )} +// }; +// std::vector token_per_layer {2, 1}; +// pool->__set_attn_data_queue(data_queue, token_per_layer, 0); + +// AttentionBatch result; + +// if (rank == 0) { +// // driver scheduler +// AttentionDriverScheduler scheduler(pool, layer_ids, c_raw, c_raw); +// result = scheduler.schedule(); +// } else { +// // worker scheduler +// AttentionWorkerScheduler scheduler(pool, layer_ids, c_raw, c_raw); +// result = scheduler.schedule(); +// } + +// ASSERT(result.metadata.get() != nullptr); +// auto &seq_ids = result.metadata->seq_ids; + +// for (int i: seq_ids) +// DMOE_LOG(DEBUG) << "seq_id: " << i << LEND; + +// ASSERT(seq_ids.size() == 2); +// ASSERT(seq_ids[0] == 0 && seq_ids[1] == 1); + +// DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; +// } + +// void test_multi_launch(int rank, std::vector ranks, std::vector uids) { +// std::vector threads; +// for (int i = 0; i < uids.size(); i ++) { +// threads.push_back(std::thread( +// [&](std::string uid, int i) { +// auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); +// auto c = std::dynamic_pointer_cast(c_raw); +// c->instantiate(); +// cudaStream_t stream; +// cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, -2); +// if (i == 0) { +// if (rank == 0) { +// DMOE_LOG(DEBUG) << "sending metadata" << LEND; +// c->send_metadata(Metadata { +// /*shape=*/ std::vector({1, 4}), +// /*dtype=*/ "fp16", +// /*layer_id=*/ 1, +// /*req_ids=*/ std::vector({rank * 10 + 0}), +// /*exp_ids=*/ std::vector({3}), +// /*prefill_poss=*/ std::vector({4}), +// }); +// DMOE_LOG(DEBUG) << "thread " << i << "sleeping" << LEND; +// std::this_thread::sleep_for(std::chrono::milliseconds(10000)); +// // c->send_metadata(Metadata { +// // /*shape=*/ std::vector({1, 4}), +// // /*dtype=*/ "fp16", +// // /*layer_id=*/ 1, +// // /*req_ids=*/ std::vector({rank * 10 + 1}), +// // /*exp_ids=*/ std::vector({3}), +// // /*prefill_poss=*/ std::vector({4}), +// // }); +// } else { +// Metadata meta; +// DMOE_LOG(DEBUG) << "receiving metadata" << LEND; +// c->recv_metadata(meta); +// DMOE_LOG(DEBUG) << "get " << meta << LEND; +// c->recv_metadata(meta); +// DMOE_LOG(DEBUG) << "get " << meta << LEND; +// } +// } else { +// std::this_thread::sleep_for(std::chrono::milliseconds(2000)); +// DMOE_LOG(WARNING) << "thread " << i << " trying to allocate tensor" << LEND; +// void* data; +// CUDACHECK(cudaMallocAsync(&data, 4096, stream)); +// CUDACHECK(cudaStreamSynchronize(stream)); +// // c->all_reduce(data, {1, 4096}); +// DMOE_LOG(WARNING) << "allocated tensor" << LEND; +// } +// }, uids[i], i +// )); +// } +// for (auto &t: threads) +// t.join(); +// DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; +// } \ No newline at end of file diff --git a/csrc/include/comm.h b/csrc/include/comm.h index a50ce54..a2c3b24 100644 --- a/csrc/include/comm.h +++ b/csrc/include/comm.h @@ -27,12 +27,17 @@ class Channel { return local < other ? 1 : 0; } + void _delay_release_tensor(torch::Tensor tensor, cudaStream_t stream) {ASSERT(0);} + + void _release_tensor_loop() {ASSERT(0);} + public: Channel(int party_local, int party_other): local(party_local), other(party_other) {} + ~Channel() {} virtual void instantiate() = 0; - virtual void send(uintptr_t data, const Metadata& metadata) = 0; - virtual void recv(uintptr_t data, const Metadata& metadata) = 0; + virtual void send(torch::Tensor tensor, const Metadata& metadata) = 0; + virtual void recv(torch::Tensor tensor, const Metadata& metadata) = 0; void _debug_print() { printf("%d %d\n", local, other); @@ -66,9 +71,9 @@ class NcclChannel: public Channel { void instantiate() override; - void send(uintptr_t data, const Metadata& metadata) override; + void send(torch::Tensor tensor, const Metadata& metadata) override; - void recv(uintptr_t data, const Metadata& metadata) override; + void recv(torch::Tensor tensor, const Metadata& metadata) override; void sync() override; }; @@ -88,16 +93,20 @@ class ZmqChannel: public Channel { int rank_offset; + void* pin_buffer; + void* _tensor_copy(uintptr_t src, const Metadata& metadata, bool to_gpu, uintptr_t dst = 0); + void _pipeline_comm(void* data, size_t num_tokens, size_t token_size, cudaMemcpyKind flag); + public: ZmqChannel(int party_local, int party_other, bool is_sender, int rank = 0); void instantiate() override; - void send(uintptr_t data, const Metadata& metadata) override; + void send(torch::Tensor tensor, const Metadata& metadata) override; - void recv(uintptr_t data, const Metadata &metadata) override; + void recv(torch::Tensor tensor, const Metadata &metadata) override; }; class NcclGroupChannel: public NcclChannel { @@ -117,20 +126,20 @@ class NcclGroupChannel: public NcclChannel { int root() const; - void broadcast(void* send_buf, void* recv_buf, size_t count, ncclDataType_t type, cudaStream_t stream=nullptr); + void broadcast(torch::Tensor send_tensor, torch::Tensor recv_tensor, size_t count, ncclDataType_t type, cudaStream_t stream=nullptr); public: NcclGroupChannel(int party_local, const std::vector &party_all, ncclUniqueId comm_id, cudaStream_t stream = nullptr); void instantiate() override; - void send(uintptr_t data, const Metadata& metadata) override; + void send(torch::Tensor tensor, const Metadata& metadata) override; - void recv(uintptr_t data, const Metadata& metadata) override; + void recv(torch::Tensor tensor, const Metadata& metadata) override; void synchronize(); - void send_recv(uintptr_t data, const Metadata& metadata); + void send_recv(torch::Tensor tensor, const Metadata& metadata); void bcast_obj(void* &buf, size_t &size); @@ -138,7 +147,7 @@ class NcclGroupChannel: public NcclChannel { void recv_metadata(Metadata& metadata); - void all_reduce(uintptr_t data, const std::vector &shape); + void all_reduce(torch::Tensor tensor, const std::vector &shape); }; Channel_t create_channel(int party_local, int party_other, void *nccl_id_raw); diff --git a/csrc/include/muhelper.h b/csrc/include/muhelper.h index cdb061d..65e8048 100644 --- a/csrc/include/muhelper.h +++ b/csrc/include/muhelper.h @@ -59,7 +59,7 @@ class MuDispatcher: public MuHelper { virtual void _send_once(TensorBatch batch) = 0; - void _send_batch(int cid, uintptr_t buf, const Metadata& meta); + void _send_batch(int cid, torch::Tensor tensor, const Metadata& meta); void run() override; @@ -146,7 +146,7 @@ class MuPool: public MuHelper { void recv_metadata(int &peer_id, metadata_t &meta); - void recv_tensor(int peer_id, uintptr_t tensor_buf, metadata_t &meta); + void recv_tensor(int peer_id, torch::Tensor tensor, metadata_t &meta); virtual void process_batch(torch::Tensor tensor, metadata_t &meta, bool send_from_zmq=true); diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 23562fd..1799f42 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -6,6 +6,8 @@ #include #include +using Tensor = torch::Tensor; + NcclChannel::NcclChannel(int party_local, int party_other, ncclUniqueId comm_id, cudaStream_t stream): Channel::Channel(party_local, party_other), comm_id(comm_id) { @@ -48,25 +50,25 @@ void NcclChannel::instantiate() { )); } -void NcclChannel::send(uintptr_t data_ptr, const Metadata& metadata) { +void NcclChannel::send(Tensor tensor, const Metadata& metadata) { // DMOE_LOG(INFO) << "NCCL sending: " << local << " " << other << LEND; tx_range _{"NcclChannel::send"}; - void* data = reinterpret_cast(data_ptr); + void* data = reinterpret_cast(tensor.data_ptr()); NCCLCHECK(ncclSend( - data, + data, /*count=*/ metadata.num_element(), /*datatype=*/ metadata.get_nccl_datatype(), /*peer=*/ this->m_other(), this->comm, this->stream )); - // CUDACHECK(cudaStreamSynchronize(this->stream)); + _delay_release_tensor(tensor, this->stream); // DMOE_LOG(INFO) << "NCCL sent " << local << " " << other << LEND; } -void NcclChannel::recv(uintptr_t data_ptr, const Metadata& metadata) { +void NcclChannel::recv(Tensor tensor, const Metadata& metadata) { tx_range _{"NcclChannel::recv"}; - void* data = reinterpret_cast(data_ptr); + void* data = reinterpret_cast(tensor.data_ptr()); NCCLCHECK(ncclRecv( data, /*count=*/ metadata.num_element(), @@ -108,6 +110,9 @@ void ZmqChannel::instantiate() { this->mq->connect(get_zmq_addr(other, /*is_gpu=*/ false, /*manual_port=*/ -1, /*offset=*/ this->rank_offset)); } DMOE_LOG(INFO) << "ZmqChannel instantiated " << this->local << LEND; + + // CUDACHECK(cudaMallocHost(&this->pin_buffer, 512 * 4096 * 4)); + this->pin_buffer = (void*) std::malloc(512 * 4096 * 4); } void* ZmqChannel::_tensor_copy(uintptr_t data, const Metadata& metadata, bool to_gpu, uintptr_t dst) { @@ -127,7 +132,7 @@ void* ZmqChannel::_tensor_copy(uintptr_t data, const Metadata& metadata, bool to { tx_range __{"ZmqChannel::_tensor_copy_memcpy_submit"}; - const size_t step = 4; + const size_t step = 2; const size_t dim_stride = metadata.get_datatype_size() * metadata.token_hidden_dim(); size_t num_tokens = metadata.shape[0]; for (size_t i = 0; i < metadata.shape[0]; i += step) { @@ -146,33 +151,79 @@ void* ZmqChannel::_tensor_copy(uintptr_t data, const Metadata& metadata, bool to return (void*) buf; } -void ZmqChannel::send(uintptr_t data, const Metadata& metadata) { +void ZmqChannel::_pipeline_comm(void* data, size_t num_tokens, size_t token_size, cudaMemcpyKind flag) { + tx_range _{"ZmqChannel::_pipeline_comm"}; + + const size_t step = num_tokens; + void* cpu_buf = std::malloc(num_tokens * token_size); + for (size_t i = 0; i < num_tokens; i += step) { + size_t cur_step = std::min(step, num_tokens - i); + + void* src_buf = data + i * token_size; + void* dst_buf = cpu_buf; + if (flag == cudaMemcpyHostToDevice) { + std::swap(src_buf, dst_buf); + zmq::message_t msg(src_buf, cur_step * token_size); + this->mq->recv(msg); + } + + CUDACHECK(cudaMemcpyAsync( + src_buf, + dst_buf, + cur_step * token_size, + flag, + this->stream + )); + + if (flag == cudaMemcpyDeviceToHost) { + // TODO(hogura|20250106): optimize this stream + CUDACHECK(cudaStreamSynchronize(this->stream)); + zmq::message_t msg(dst_buf, cur_step * token_size); + this->mq->send(msg); + } + } + std::free(cpu_buf); +} + +void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { tx_range _{"ZmqChannel::send"}; // DMOE_LOG(DEBUG) << "ZmqChannel Sending to " << get_peer_id() << LEND; - void* buf = this->_tensor_copy(data, metadata, /*to_gpu=*/ false); - size_t size = metadata.num_element() * metadata.get_datatype_size(); - // DMOE_LOG(DEBUG) << "send size: " << size << " rank: " << this->rank_offset << LEND; - this->mq->send(zmq::buffer(buf, size)); - - // if (data != (uintptr_t) buf) - // std::free(buf); + if (is_embedding_node(this->local)) { + void* buf = (void*) tensor.data_ptr(); + size_t size = metadata.num_element() * metadata.get_datatype_size(); + this->mq->send(zmq::buffer(buf, size)); + } else { + this->_pipeline_comm( + (void*) tensor.data_ptr(), + metadata.num_tokens(), + metadata.token_hidden_dim() * metadata.get_datatype_size(), + cudaMemcpyKind::cudaMemcpyDeviceToHost + ); + } // DMOE_LOG(DEBUG) << "ZMQ Sent." << LEND; } -void ZmqChannel::recv(uintptr_t data, const Metadata &metadata) { +void ZmqChannel::recv(Tensor tensor, const Metadata &metadata) { tx_range _{"ZmqChannel::recv"}; // DMOE_LOG(DEBUG) << "ZMQ Recving from " << get_peer_id() << LEND; - size_t size = metadata.num_element() * metadata.get_datatype_size(); - zmq::message_t msg(size); - // DMOE_LOG(DEBUG) << "recv size: " << size << " rank: " << this->rank_offset << LEND; - auto err = this->mq->recv(msg, zmq::recv_flags::none); - this->_tensor_copy((uintptr_t) msg.data(), metadata, - /*to_gpu=*/ !is_embedding_node(local), data); + if (is_embedding_node(this->local)) { + size_t size = metadata.num_element() * metadata.get_datatype_size(); + zmq::message_t msg(size); + auto err = this->mq->recv(msg, zmq::recv_flags::none); + memcpy((void*) tensor.data_ptr(), msg.data(), size); + } else { + this->_pipeline_comm( + (void*) tensor.data_ptr(), + metadata.num_tokens(), + metadata.token_hidden_dim() * metadata.get_datatype_size(), + cudaMemcpyKind::cudaMemcpyHostToDevice + ); + } // DMOE_LOG(DEBUG) << "ZMQ Recved" << LEND; } @@ -251,12 +302,12 @@ int NcclGroupChannel::root() const { return 0; } -void NcclGroupChannel::broadcast(void* send_buf, void* recv_buf, size_t count, ncclDataType_t type, cudaStream_t stream) { +void NcclGroupChannel::broadcast(Tensor send_tensor, Tensor recv_tensor, size_t count, ncclDataType_t type, cudaStream_t stream) { tx_range _{"NcclGroupChannel::broadcast"}; // DMOE_LOG(DEBUG) << "broadcasting " << root() << " " << local_rank << " " << count << " on the stream " << this->stream << LEND; NCCLCHECK(ncclBroadcast( - send_buf, - recv_buf, + (void*) send_tensor.data_ptr(), + (void*) recv_tensor.data_ptr(), count, type, root(), @@ -267,25 +318,24 @@ void NcclGroupChannel::broadcast(void* send_buf, void* recv_buf, size_t count, n // DMOE_LOG(DEBUG) << "finished broadcast " << root() << " " << local_rank << " " << count << " on the stream " << this->stream << LEND; } -void NcclGroupChannel::send(uintptr_t data_ptr, const Metadata& metadata) { +void NcclGroupChannel::send(Tensor tensor, const Metadata& metadata) { tx_range _{"NcclGroupChannel::send"}; ASSERT(is_root()); // DMOE_LOG(DEBUG) << "NcclGroupChannel Sending from " << this->local << LEND; - send_recv(data_ptr, metadata); + send_recv(std::move(tensor), metadata); // DMOE_LOG(DEBUG) << "NcclGroupChannel Sent." << LEND; } -void NcclGroupChannel::recv(uintptr_t data_ptr, const Metadata& metadata) { +void NcclGroupChannel::recv(Tensor tensor, const Metadata& metadata) { tx_range _{"NcclGroupChannel::recv"}; ASSERT(!is_root()); // DMOE_LOG(DEBUG) << "NcclGroupChannel Recving from " << this->local << LEND; - send_recv(data_ptr, metadata); + send_recv(std::move(tensor), metadata); // DMOE_LOG(DEBUG) << "NcclGroupChannel Recved." << LEND; } -void NcclGroupChannel::send_recv(uintptr_t data_ptr, const Metadata& metadata) { - broadcast(reinterpret_cast(data_ptr), reinterpret_cast(data_ptr), - metadata.num_element(), metadata.get_nccl_datatype()); +void NcclGroupChannel::send_recv(Tensor tensor, const Metadata& metadata) { + broadcast(tensor, tensor, metadata.num_element(), metadata.get_nccl_datatype()); } void NcclGroupChannel::bcast_obj(void* &buf, size_t &size) { @@ -311,7 +361,7 @@ void NcclGroupChannel::bcast_obj(void* &buf, size_t &size) { void* data_buf = (void*) this->buffer_gpu.data_ptr(); CUDACHECK(cudaMemcpy(data_buf, buf, size, cudaMemcpyKind::cudaMemcpyHostToDevice)); - broadcast(data_buf, data_buf, size, ncclInt8); + broadcast(this->buffer_gpu, this->buffer_gpu, size, ncclInt8); } else { // first recv size // [option 1] use zmq to recv @@ -332,7 +382,7 @@ void NcclGroupChannel::bcast_obj(void* &buf, size_t &size) { // then recv data void* data_buf = (void*) this->buffer_gpu.data_ptr(); - broadcast(data_buf, data_buf, size, ncclInt8); + broadcast(this->buffer_gpu, this->buffer_gpu, size, ncclInt8); buf = std::malloc(size); CUDACHECK(cudaMemcpy(buf, data_buf, size, cudaMemcpyKind::cudaMemcpyDeviceToHost)); // DMOE_LOG(DEBUG) << "received metadata " << *decerealize((char*) buf, size) << LEND; @@ -363,9 +413,9 @@ void NcclGroupChannel::recv_metadata(Metadata& metadata) { // DMOE_LOG(DEBUG) << "NcclGroupChannel Recved metadata." << LEND; } -void NcclGroupChannel::all_reduce(uintptr_t data, const std::vector &shape) { +void NcclGroupChannel::all_reduce(Tensor tensor, const std::vector &shape) { tx_range _{"NcclGroupChannel::all_reduce"}; - void* buf = reinterpret_cast(data); + void* buf = reinterpret_cast(tensor.data_ptr()); int count = 1; for (int i: shape) count *= i; @@ -374,7 +424,7 @@ void NcclGroupChannel::all_reduce(uintptr_t data, const std::vector &shape) buf, buf, count, - ncclBfloat16, // !FIXME(hogura|20241106): remove this hardcode + ncclBfloat16, ncclSum, this->comm, this->stream diff --git a/csrc/muhelper/embedding.cpp b/csrc/muhelper/embedding.cpp index a8d3c31..fc6c074 100644 --- a/csrc/muhelper/embedding.cpp +++ b/csrc/muhelper/embedding.cpp @@ -62,7 +62,7 @@ void Sampler::run() { ); // auto tensor_buf = (uintptr_t) std::malloc(metadata->num_element() * metadata->get_datatype_size()); - this->peer_channels[peer_id]->recv((uintptr_t)tensor.data_ptr(), *metadata); + this->peer_channels[peer_id]->recv(tensor, *metadata); this->process_batch(tensor, metadata); } diff --git a/csrc/muhelper/muhelper.cpp b/csrc/muhelper/muhelper.cpp index b1d392c..ce42aee 100644 --- a/csrc/muhelper/muhelper.cpp +++ b/csrc/muhelper/muhelper.cpp @@ -21,6 +21,8 @@ #include +using Tensor = torch::Tensor; + // MuHelper MuHelper::MuHelper(std::vector layer_ids, int device_id, std::vector channels): @@ -83,7 +85,7 @@ bool MuDispatcher::_is_group_channel(int cid) const { return is_group_channels[cid]; } -void MuDispatcher::_send_batch(int cid, uintptr_t buf, const Metadata& meta) { +void MuDispatcher::_send_batch(int cid, Tensor tensor, const Metadata& meta) { tx_range _{"MuDispatcher::_send_batch"}; // DMOE_LOG(DEBUG) << "sending batch to channel " << cid << " current device: " << this->device_id_str << LEND; @@ -91,10 +93,10 @@ void MuDispatcher::_send_batch(int cid, uintptr_t buf, const Metadata& meta) { auto data = cerealize(std::make_shared(meta)); this->peer_mq[cid].send(zmq::str_buffer(this->device_id_str), zmq::send_flags::sndmore); this->peer_mq[cid].send(zmq::buffer(data.c_str(), data.size())); - this->channels[cid]->send(buf, meta); + this->channels[cid]->send(std::move(tensor), meta); } else { this->group_channels[cid]->send_metadata(meta); - this->group_channels[cid]->send(buf, meta); + this->group_channels[cid]->send(std::move(tensor), meta); } // DMOE_LOG(DEBUG) << "sent batch to channel " << cid << LEND; @@ -202,16 +204,15 @@ void MuAttnDispatcher::_send_once(TensorBatch batch) { // a faster path this->_send_batch( this->exp_channels[cid], - (uintptr_t)batch.data.data_ptr(), + batch.data, *batch.metadata ); break; } - auto buf = tensor_at((uintptr_t)batch.data.data_ptr(), *batch.metadata, i); this->_send_batch( this->exp_channels[cid], - buf, + batch.data.index({torch::indexing::Slice(i, j)}), batch.metadata->slice(i, j) ); i = j; @@ -282,7 +283,7 @@ void MuExpertDispatcher::_send_once(TensorBatch batch) { if (this->attn_channel[0].size() == 1 || layer_id >= this->attn_channel.size()) { this->_send_batch( _get_attn_channel(layer_id, 0), - (uintptr_t) batch.data.data_ptr(), + batch.data, *meta ); } else { @@ -297,14 +298,13 @@ void MuExpertDispatcher::_send_once(TensorBatch batch) { if (i == 0 && j == n) { this->_send_batch( channels[rank], - (uintptr_t) batch.data.data_ptr(), + batch.data, *meta ); } else { - auto buf = tensor_at((uintptr_t) batch.data.data_ptr(), *batch.metadata, i); this->_send_batch( channels[rank], - buf, + batch.data.index({torch::indexing::Slice(i, j)}), batch.metadata->slice(i, j) ); } @@ -367,16 +367,15 @@ void MuPool::recv_metadata(int &peer_id, metadata_t &meta) { meta = decerealize((char*) recv_msgs[1].data(), recv_msgs[1].size()); } -void MuPool::recv_tensor(int peer_id, uintptr_t tensor_buf, metadata_t &meta) { +void MuPool::recv_tensor(int peer_id, Tensor tensor, metadata_t &meta) { // DMOE_LOG(DEBUG) << "peer_id " << peer_id << " channelsize " << this->peer_channels.size() << LEND; ASSERT(0 <= peer_id && peer_id < this->peer_channels.size()); ASSERT(this->peer_channels[peer_id].get() != nullptr); ASSERT(meta.get() != nullptr); - ASSERT(tensor_buf != 0); - this->peer_channels[peer_id]->recv(tensor_buf, *meta); + this->peer_channels[peer_id]->recv(std::move(tensor), *meta); } -void MuPool::process_batch(torch::Tensor tensor, metadata_t &meta, bool send_from_zmq) { +void MuPool::process_batch(Tensor tensor, metadata_t &meta, bool send_from_zmq) { // TODO(hogura|20241014): sync prefill sequences /* TODO(shaoyuw|20241011): separate sequences into waiting queue and running queue @@ -433,12 +432,12 @@ void MuPool::run() { MuPool::recv_metadata(peer_id, meta); - torch::Tensor tensor = torch::empty( + Tensor tensor = torch::empty( {meta->num_tokens(), meta->token_hidden_dim()}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) ); - MuPool::recv_tensor(peer_id, (uintptr_t)tensor.data_ptr(), meta); + MuPool::recv_tensor(peer_id, tensor, meta); this->process_batch(tensor, meta, /*send_from_zmq=*/ true); } @@ -675,16 +674,16 @@ void MuAttentionPool::run() { Metadata meta; group_comm->recv_metadata(meta); - torch::Tensor tensor = torch::empty( + Tensor tensor = torch::empty( {meta.num_tokens(), meta.token_hidden_dim()}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) ); // DMOE_LOG(DEBUG) << "Worker AttnPool fetched result:" << meta << LEND; - group_comm->recv((uintptr_t) tensor.data_ptr(), meta); + group_comm->recv(tensor, meta); // DMOE_LOG(DEBUG) << "Worker AttnPool broadcast finished" << LEND; auto t_meta = std::make_shared(meta); - process_batch(tensor, t_meta, /*send_from_zmq=*/ false); + process_batch(std::move(tensor), t_meta, /*send_from_zmq=*/ false); } })); } @@ -710,13 +709,13 @@ void MuAttentionPool::run() { c->recv_metadata(meta); // DMOE_LOG(DEBUG) << "AttnPool fetched in stream " << c10_stream.stream() << " " << meta << LEND; - torch::Tensor tensor = torch::empty( + Tensor tensor = torch::empty( {meta.num_tokens(), meta.token_hidden_dim()}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) ); // DMOE_LOG(DEBUG) << "AttnPool created tensor" << LEND; - c->recv((uintptr_t)tensor.data_ptr(), meta); + c->recv(tensor, meta); // DMOE_LOG(DEBUG) << "AttnPool broadcast finished" << LEND; auto meta_t = std::make_shared(meta); @@ -736,7 +735,7 @@ void MuAttentionPool::terminate() { pool_thread.join(); } -AttentionBatch MuAttentionPool::pack_attn_batch(torch::Tensor tensor, metadata_t meta) { +AttentionBatch MuAttentionPool::pack_attn_batch(Tensor tensor, metadata_t meta) { // for a simple case we consider prefill sequences can only have 1 token, // so all sequences in tensor are complete and can be scheduled immediately ASSERT(meta.get() != nullptr); @@ -787,14 +786,14 @@ AttentionBatch MuAttentionPool::pack_attn_batch(torch::Tensor tensor, metadata_t return AttentionBatch {tensor, attn_meta}; } -void MuAttentionPool::process_batch(torch::Tensor tensor, metadata_t &meta, bool send_from_zmq) { +void MuAttentionPool::process_batch(Tensor tensor, metadata_t &meta, bool send_from_zmq) { // DMOE_LOG(DEBUG) << "AttnPool processing batch: " << meta << LEND; if (send_from_zmq && meta->layer_id == 0 && group_comm.get() != nullptr) { // since only driver can have the pool, we can send the data from layer 0 to other workers here. // NOTE(hogura|20241110): group_comm is only used when send_from_zmq, so it should be thread-safe // DMOE_LOG(DEBUG) << "Broadcasting attn batch to workers" << LEND; group_comm->send_metadata(*meta); - group_comm->send((uintptr_t) tensor.data_ptr(), *meta); + group_comm->send(tensor, *meta); // DMOE_LOG(DEBUG) << "Broadcast finished." << LEND; } From cc758beb759468990be47a727dcbc95f6af7879b Mon Sep 17 00:00:00 2001 From: hogura Date: Mon, 6 Jan 2025 10:04:59 +0000 Subject: [PATCH 2/8] Added delay_release_tensor --- csrc/include/comm.h | 6 ++---- csrc/muhelper/comm.cpp | 13 ++++++++++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/csrc/include/comm.h b/csrc/include/comm.h index a2c3b24..67c284d 100644 --- a/csrc/include/comm.h +++ b/csrc/include/comm.h @@ -27,10 +27,6 @@ class Channel { return local < other ? 1 : 0; } - void _delay_release_tensor(torch::Tensor tensor, cudaStream_t stream) {ASSERT(0);} - - void _release_tensor_loop() {ASSERT(0);} - public: Channel(int party_local, int party_other): local(party_local), other(party_other) {} ~Channel() {} @@ -64,6 +60,8 @@ class NcclChannel: public Channel { ncclComm_t comm; cudaStream_t stream; + void _delay_release_tensor(torch::Tensor tensor, cudaStream_t stream); + public: NcclChannel(int party_local, int party_other, ncclUniqueId comm_id, cudaStream_t stream = nullptr); diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 1799f42..e71c800 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -50,6 +50,17 @@ void NcclChannel::instantiate() { )); } +void NcclChannel::_delay_release_tensor(Tensor tensor, cudaStream_t stream) { + struct TensorWarp { + Tensor tensor; + }; + TensorWarp* warp = new TensorWarp{std::move(tensor)}; + CUDACHECK(cudaStreamAddCallback(stream, [](cudaStream_t stream, cudaError_t status, void* data) { + TensorWarp* warp = (TensorWarp*) data; + delete warp; + }, (void*) warp, 0)); +} + void NcclChannel::send(Tensor tensor, const Metadata& metadata) { // DMOE_LOG(INFO) << "NCCL sending: " << local << " " << other << LEND; tx_range _{"NcclChannel::send"}; @@ -168,8 +179,8 @@ void ZmqChannel::_pipeline_comm(void* data, size_t num_tokens, size_t token_size } CUDACHECK(cudaMemcpyAsync( - src_buf, dst_buf, + src_buf, cur_step * token_size, flag, this->stream From 86834356e750e38cedb96db957d41f9748d845b9 Mon Sep 17 00:00:00 2001 From: hogura Date: Tue, 7 Jan 2025 09:38:56 +0000 Subject: [PATCH 3/8] Added zmq overlap & tests --- csrc/bindings.cpp | 1 + csrc/include/constants.h | 3 ++ csrc/include/distributed.hpp | 7 ++++- csrc/include/tests.h | 4 ++- csrc/muhelper/comm.cpp | 17 ++++++++--- csrc/tests/test_zmq_overlap.cpp | 51 +++++++++++++++++++++++++++++++++ tests/comm/test_zmq_overlap.py | 23 +++++++++++++++ 7 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 csrc/tests/test_zmq_overlap.cpp create mode 100644 tests/comm/test_zmq_overlap.py diff --git a/csrc/bindings.cpp b/csrc/bindings.cpp index 1b8bf13..aa851ab 100644 --- a/csrc/bindings.cpp +++ b/csrc/bindings.cpp @@ -183,6 +183,7 @@ PYBIND11_MODULE(disagmoe_c, m) { // m.def("test_parallel_attn_scheduler", &test_parallel_attn_scheduler); // m.def("test_multi_launch", &test_multi_launch); + REGISTER_FUNC(test_zmq_overlap); // REGISTER_FUNC(test_op_overlap); // m.def("test_zmq_sub_pub", &test_zmq_sub_pub); // m.def("test_attn_dispatcher", &test_attn_dispatcher); diff --git a/csrc/include/constants.h b/csrc/include/constants.h index a178fe7..324cf23 100644 --- a/csrc/include/constants.h +++ b/csrc/include/constants.h @@ -40,7 +40,10 @@ const int ZMQ_OFFSET_BASE = 16; #ifndef MAX_N_EXPERTS #define MAX_N_EXPERTS 8 +#endif +#ifndef PIPE_COMM_STEP +#define PIPE_COMM_STEP 2 #endif #define ASSERT(condition) do {if (!(condition)) { \ diff --git a/csrc/include/distributed.hpp b/csrc/include/distributed.hpp index 810c0f2..2a979bd 100644 --- a/csrc/include/distributed.hpp +++ b/csrc/include/distributed.hpp @@ -46,7 +46,12 @@ static std::string get_ip_of_device(int device_id) { } ifs.close(); } - return device_id_2_ip.at(device_id); + if (device_id_2_ip.find(device_id) == device_id_2_ip.end()) { + DMOE_LOG(ERROR) << "device_id " << device_id << " not found" << LEND; + return "0.0.0.0"; + } else { + return device_id_2_ip.at(device_id); + } } inline std::string get_zmq_addr(int device_id, bool is_gpu = true, int manual_port = -1, int offset = 0) { diff --git a/csrc/include/tests.h b/csrc/include/tests.h index 0bf612c..270d0f0 100644 --- a/csrc/include/tests.h +++ b/csrc/include/tests.h @@ -1,4 +1,6 @@ #include #include -void test_op_overlap(int rank, std::vector ranks, std::string uid); \ No newline at end of file +void test_op_overlap(int rank, std::vector ranks, std::string uid); + +void test_zmq_overlap(int rank); \ No newline at end of file diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index e71c800..49e7f12 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -165,7 +165,7 @@ void* ZmqChannel::_tensor_copy(uintptr_t data, const Metadata& metadata, bool to void ZmqChannel::_pipeline_comm(void* data, size_t num_tokens, size_t token_size, cudaMemcpyKind flag) { tx_range _{"ZmqChannel::_pipeline_comm"}; - const size_t step = num_tokens; + const size_t step = PIPE_COMM_STEP; void* cpu_buf = std::malloc(num_tokens * token_size); for (size_t i = 0; i < num_tokens; i += step) { size_t cur_step = std::min(step, num_tokens - i); @@ -194,6 +194,7 @@ void ZmqChannel::_pipeline_comm(void* data, size_t num_tokens, size_t token_size } } std::free(cpu_buf); + cudaStreamSynchronize(this->stream); } void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { @@ -202,9 +203,13 @@ void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { // DMOE_LOG(DEBUG) << "ZmqChannel Sending to " << get_peer_id() << LEND; if (is_embedding_node(this->local)) { + const int step = PIPE_COMM_STEP; void* buf = (void*) tensor.data_ptr(); size_t size = metadata.num_element() * metadata.get_datatype_size(); - this->mq->send(zmq::buffer(buf, size)); + size_t step_size = step * metadata.token_hidden_dim() * metadata.get_datatype_size(); + for (size_t i = 0; i < size; i += step_size) { + this->mq->send(zmq::buffer(buf + i, std::min(step_size, size - i))); + } } else { this->_pipeline_comm( (void*) tensor.data_ptr(), @@ -224,9 +229,13 @@ void ZmqChannel::recv(Tensor tensor, const Metadata &metadata) { if (is_embedding_node(this->local)) { size_t size = metadata.num_element() * metadata.get_datatype_size(); + size_t step_size = PIPE_COMM_STEP * metadata.token_hidden_dim() * metadata.get_datatype_size(); zmq::message_t msg(size); - auto err = this->mq->recv(msg, zmq::recv_flags::none); - memcpy((void*) tensor.data_ptr(), msg.data(), size); + void* buf = (void*) tensor.data_ptr(); + for (size_t i = 0; i < size; i += step_size) { + auto err = this->mq->recv(msg, zmq::recv_flags::none); + memcpy(buf + i, msg.data(), std::min(step_size, size - i)); + } } else { this->_pipeline_comm( (void*) tensor.data_ptr(), diff --git a/csrc/tests/test_zmq_overlap.cpp b/csrc/tests/test_zmq_overlap.cpp new file mode 100644 index 0000000..19a5752 --- /dev/null +++ b/csrc/tests/test_zmq_overlap.cpp @@ -0,0 +1,51 @@ +#include "tests.h" +#include "comm.h" + +using Tensor = torch::Tensor; + +void test_zmq_overlap(int rank) { + const int bs = 256; + const int hs = 4096; + + Metadata meta = Metadata { + {bs, hs}, + "bf16", + 0, + std::vector(bs, 0), + std::vector(bs, 0), + std::vector(bs, 0), + std::vector(bs, 0) + }; + if (rank == 82) { + Tensor a = torch::empty( + {bs, hs}, + torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCPU) + ); + auto c_send = create_zmq_channel(rank, 0, true); + auto c_recv = create_zmq_channel(rank, 1, false); + c_send->instantiate(); + c_recv->instantiate(); + c_recv->recv(a, meta); + DMOE_LOG(INFO) << "ZMQ Received from 1" << LEND; + c_send->send(a, meta); + DMOE_LOG(INFO) << "ZMQ Sent to 0" << LEND; + } else { + Tensor a = torch::ones( + {bs, hs}, + torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) + ) * (rank + 1); + if (rank == 0) { + // receive from zmq + auto c = create_zmq_channel(0, SAMPLER_DEV_ID, false); + c->instantiate(); + c->recv(a, meta); + DMOE_LOG(INFO) << "Received from sampler " << a.mean().item() << LEND; + } else if (rank == 1) { + // send to zmq + auto c = create_zmq_channel(1, SAMPLER_DEV_ID, true); + c->instantiate(); + c->send(a, meta); + DMOE_LOG(INFO) << "Sent to sampler " << a.mean().item() << LEND; + } + } +} \ No newline at end of file diff --git a/tests/comm/test_zmq_overlap.py b/tests/comm/test_zmq_overlap.py new file mode 100644 index 0000000..2c3da00 --- /dev/null +++ b/tests/comm/test_zmq_overlap.py @@ -0,0 +1,23 @@ +import ray +import os + +def func(rank: int): + import torch + from disagmoe_c import test_zmq_overlap, set_hosts + set_hosts(os.getpid(), { + 0: "0.0.0.0", + 1: "0.0.0.0", + 82: "0.0.0.0" + }) + test_zmq_overlap(rank) + +def main(): + ray.init("auto") + tasks = [ + ray.remote(func).options(num_gpus=1).remote(0), + ray.remote(func).options(num_gpus=1).remote(1), + ray.remote(func).options(num_gpus=0).remote(82) + ] + ray.get(tasks) + +main() \ No newline at end of file From 75c08d75ebf5eabaf0d0c7df6866caf8aa663941 Mon Sep 17 00:00:00 2001 From: hogura Date: Tue, 7 Jan 2025 12:59:40 +0000 Subject: [PATCH 4/8] Fixed zmq overlap correctness --- csrc/muhelper/comm.cpp | 5 +++-- csrc/tests/test_zmq_overlap.cpp | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 49e7f12..774c957 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -171,11 +171,12 @@ void ZmqChannel::_pipeline_comm(void* data, size_t num_tokens, size_t token_size size_t cur_step = std::min(step, num_tokens - i); void* src_buf = data + i * token_size; - void* dst_buf = cpu_buf; + void* dst_buf = cpu_buf + i * token_size; if (flag == cudaMemcpyHostToDevice) { std::swap(src_buf, dst_buf); - zmq::message_t msg(src_buf, cur_step * token_size); + zmq::message_t msg; this->mq->recv(msg); + src_buf = msg.data(); } CUDACHECK(cudaMemcpyAsync( diff --git a/csrc/tests/test_zmq_overlap.cpp b/csrc/tests/test_zmq_overlap.cpp index 19a5752..27348ba 100644 --- a/csrc/tests/test_zmq_overlap.cpp +++ b/csrc/tests/test_zmq_overlap.cpp @@ -26,9 +26,9 @@ void test_zmq_overlap(int rank) { c_send->instantiate(); c_recv->instantiate(); c_recv->recv(a, meta); - DMOE_LOG(INFO) << "ZMQ Received from 1" << LEND; + DMOE_LOG(INFO) << "Sampler Received from <1>: " << a.mean().item() << LEND; c_send->send(a, meta); - DMOE_LOG(INFO) << "ZMQ Sent to 0" << LEND; + DMOE_LOG(INFO) << "Sampler Sent to <0>" << LEND; } else { Tensor a = torch::ones( {bs, hs}, @@ -39,13 +39,13 @@ void test_zmq_overlap(int rank) { auto c = create_zmq_channel(0, SAMPLER_DEV_ID, false); c->instantiate(); c->recv(a, meta); - DMOE_LOG(INFO) << "Received from sampler " << a.mean().item() << LEND; + DMOE_LOG(INFO) << "<0> Received from sampler: " << a.mean().item() << LEND; } else if (rank == 1) { // send to zmq auto c = create_zmq_channel(1, SAMPLER_DEV_ID, true); c->instantiate(); c->send(a, meta); - DMOE_LOG(INFO) << "Sent to sampler " << a.mean().item() << LEND; + DMOE_LOG(INFO) << "<1> Sent to sampler: " << a.mean().item() << LEND; } } } \ No newline at end of file From 3886137ef188c74e9195c4dd2c0ef689abd1d7bd Mon Sep 17 00:00:00 2001 From: hogura Date: Tue, 7 Jan 2025 15:08:29 +0000 Subject: [PATCH 5/8] Add memcpy async --- csrc/include/comm.h | 6 +- csrc/include/constants.h | 2 +- csrc/include/cuda_utils.h | 4 +- csrc/muhelper/comm.cpp | 173 ++++++++++++++++++++------------ csrc/tests/test_zmq_overlap.cpp | 23 +++-- tests/comm/test_zmq_overlap.py | 15 ++- 6 files changed, 143 insertions(+), 80 deletions(-) diff --git a/csrc/include/comm.h b/csrc/include/comm.h index 67c284d..c15e47e 100644 --- a/csrc/include/comm.h +++ b/csrc/include/comm.h @@ -83,7 +83,7 @@ class ZmqChannel: public Channel { static std::map global_mq; zmq::context_t ctx; mq_t mq; - cudaStream_t stream; + cudaStream_t stream_send, stream_recv; std::string other_ip; bool is_sender; @@ -93,10 +93,6 @@ class ZmqChannel: public Channel { void* pin_buffer; - void* _tensor_copy(uintptr_t src, const Metadata& metadata, bool to_gpu, uintptr_t dst = 0); - - void _pipeline_comm(void* data, size_t num_tokens, size_t token_size, cudaMemcpyKind flag); - public: ZmqChannel(int party_local, int party_other, bool is_sender, int rank = 0); diff --git a/csrc/include/constants.h b/csrc/include/constants.h index 324cf23..28fc3f6 100644 --- a/csrc/include/constants.h +++ b/csrc/include/constants.h @@ -43,7 +43,7 @@ const int ZMQ_OFFSET_BASE = 16; #endif #ifndef PIPE_COMM_STEP -#define PIPE_COMM_STEP 2 +#define PIPE_COMM_STEP 4 #endif #define ASSERT(condition) do {if (!(condition)) { \ diff --git a/csrc/include/cuda_utils.h b/csrc/include/cuda_utils.h index c3cf85f..cadc008 100644 --- a/csrc/include/cuda_utils.h +++ b/csrc/include/cuda_utils.h @@ -101,8 +101,8 @@ inline cudaStream_t get_current_torch_stream(int device_id = 0) { #include "nvtx3/nvtx3.hpp" #include "profiler.hpp" -// using tx_range = nvtx3::scoped_range; -using tx_range = ScopedRange; +using tx_range = nvtx3::scoped_range; +// using tx_range = ScopedRange; #define AUTO_TX_RANGE tx_range __{__FUNCTION__} diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 774c957..23e5ce8 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -3,8 +3,12 @@ #include "utils.hpp" #include "distributed.hpp" +#include #include +#include +#include #include +#include using Tensor = torch::Tensor; @@ -99,9 +103,15 @@ ZmqChannel::ZmqChannel(int party_local, int party_other, bool is_sender, int ran Channel(party_local, party_other), is_sender(is_sender), rank_offset(rank) { sprintf(device_id_str, "%d", party_local); if (!is_embedding_node(party_local)) { - CUDACHECK(cudaStreamCreateWithPriority(&this->stream, cudaStreamNonBlocking, 10)); + CUDACHECK(cudaStreamCreateWithPriority(&this->stream_send, cudaStreamNonBlocking, 10)); + CUDACHECK(cudaStreamCreateWithPriority(&this->stream_recv, cudaStreamNonBlocking, 10)); } else { - this->stream = 0; + this->stream_send = 0; + this->stream_recv = 0; + } + + if (!is_embedding_node(local)) { + CUDACHECK(cudaMallocHost(&this->pin_buffer, 1024 * 4096 * 4)); } } @@ -121,81 +131,114 @@ void ZmqChannel::instantiate() { this->mq->connect(get_zmq_addr(other, /*is_gpu=*/ false, /*manual_port=*/ -1, /*offset=*/ this->rank_offset)); } DMOE_LOG(INFO) << "ZmqChannel instantiated " << this->local << LEND; - - // CUDACHECK(cudaMallocHost(&this->pin_buffer, 512 * 4096 * 4)); - this->pin_buffer = (void*) std::malloc(512 * 4096 * 4); -} - -void* ZmqChannel::_tensor_copy(uintptr_t data, const Metadata& metadata, bool to_gpu, uintptr_t dst) { - if (is_embedding_node(this->local)) - return (void*) data; - tx_range _{"ZmqChannel::_tensor_copy"}; - uintptr_t buf; - cudaMemcpyKind flag; - if (!to_gpu) { - size_t size = metadata.num_element() * metadata.get_datatype_size(); - buf = !dst ? (uintptr_t) std::malloc(size) : dst; - flag = cudaMemcpyKind::cudaMemcpyDeviceToHost; - } else { - buf = dst; - flag = cudaMemcpyKind::cudaMemcpyHostToDevice; - } - - { - tx_range __{"ZmqChannel::_tensor_copy_memcpy_submit"}; - const size_t step = 2; - const size_t dim_stride = metadata.get_datatype_size() * metadata.token_hidden_dim(); - size_t num_tokens = metadata.shape[0]; - for (size_t i = 0; i < metadata.shape[0]; i += step) { - size_t cur_step = std::min(step, metadata.shape[0] - i); - CUDACHECK(cudaMemcpyAsync( - (void*) (buf + i * dim_stride), - (void*) (data + i * dim_stride), - cur_step * dim_stride, - flag, - this->stream - )); - } - } - CUDACHECK(cudaStreamSynchronize(this->stream)); - - return (void*) buf; } -void ZmqChannel::_pipeline_comm(void* data, size_t num_tokens, size_t token_size, cudaMemcpyKind flag) { +void _pipeline_recv(void* data, void* pin_buf, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { tx_range _{"ZmqChannel::_pipeline_comm"}; const size_t step = PIPE_COMM_STEP; - void* cpu_buf = std::malloc(num_tokens * token_size); + + zmq::message_t msg[(num_tokens + step - 1) / step]; for (size_t i = 0; i < num_tokens; i += step) { size_t cur_step = std::min(step, num_tokens - i); - void* src_buf = data + i * token_size; - void* dst_buf = cpu_buf + i * token_size; - if (flag == cudaMemcpyHostToDevice) { - std::swap(src_buf, dst_buf); - zmq::message_t msg; - this->mq->recv(msg); - src_buf = msg.data(); + void* dst_buf = data + i * token_size; + void* src_buf; + { + tx_range __{"ZmqChannel::_pipeline_comm_recv"}; + mq->recv(msg[i / step], zmq::recv_flags::none); + src_buf = msg[i / step].data(); } CUDACHECK(cudaMemcpyAsync( - dst_buf, + pin_buf + i * token_size, src_buf, cur_step * token_size, - flag, - this->stream + cudaMemcpyHostToHost, + stream + )); + CUDACHECK(cudaMemcpyAsync( + dst_buf, + pin_buf + i * token_size, + cur_step * token_size, + cudaMemcpyHostToDevice, + stream )); + } + + tx_range __{"ZmqChannel::_pipeline_comm_sync"}; + cudaStreamSynchronize(stream); +} + +void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { + tx_range _{"ZmqChannel::_pipeline_comm"}; + + const size_t step = PIPE_COMM_STEP; + void* cpu_buf = std::malloc(step * token_size); + + struct SendItem { + void* buf; + size_t size; + cudaEvent_t event; + }; + + std::queue send_queue; + std::mutex mutex; + std::condition_variable cv; + std::thread t_copy([&] { + for (size_t i = 0; i < num_tokens; i += step) { + size_t cur_step = std::min(step, num_tokens - i); + + void* src_buf = data + i * token_size; + void* dst_buf = cpu_buf + i * token_size; - if (flag == cudaMemcpyDeviceToHost) { - // TODO(hogura|20250106): optimize this stream - CUDACHECK(cudaStreamSynchronize(this->stream)); - zmq::message_t msg(dst_buf, cur_step * token_size); - this->mq->send(msg); + DMOE_LOG(DEBUG) << "{A} copying " << i << " " << cur_step << LEND; + CUDACHECK(cudaMemcpyAsync( + pin_buf + i * token_size, + src_buf, + cur_step * token_size, + cudaMemcpyDeviceToHost, + stream + )); + DMOE_LOG(DEBUG) << "{B} copying " << i << " " << cur_step << LEND; + CUDACHECK(cudaMemcpyAsync( + dst_buf, + pin_buf + i * token_size, + cur_step * token_size, + cudaMemcpyHostToHost, + stream + )); + + cudaEvent_t event; + CUDACHECK(cudaEventCreate(&event)); + CUDACHECK(cudaEventRecord(event, stream)); + + { + std::lock_guard lock(mutex); + send_queue.push({dst_buf, cur_step * token_size, event}); + cv.notify_one(); + } } + }); + + SendItem item; + for (size_t i = 0; i < num_tokens; i += step) { + { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return !send_queue.empty(); }); + item = send_queue.front(); + send_queue.pop(); + } + DMOE_LOG(DEBUG) << "{C} waiting stream " << i << LEND; + CUDACHECK(cudaStreamWaitEvent(stream, item.event)); + DMOE_LOG(DEBUG) << "{C} copying " << i << LEND; + tx_range __{"ZmqChannel::_pipeline_comm_send"}; + mq->send(zmq::buffer(item.buf, item.size)); + CUDACHECK(cudaEventDestroy(item.event)); } + std::free(cpu_buf); - cudaStreamSynchronize(this->stream); + t_copy.join(); } void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { @@ -212,11 +255,13 @@ void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { this->mq->send(zmq::buffer(buf + i, std::min(step_size, size - i))); } } else { - this->_pipeline_comm( + _pipeline_send( (void*) tensor.data_ptr(), + this->pin_buffer, metadata.num_tokens(), metadata.token_hidden_dim() * metadata.get_datatype_size(), - cudaMemcpyKind::cudaMemcpyDeviceToHost + this->stream_send, + this->mq ); } @@ -238,11 +283,13 @@ void ZmqChannel::recv(Tensor tensor, const Metadata &metadata) { memcpy(buf + i, msg.data(), std::min(step_size, size - i)); } } else { - this->_pipeline_comm( + _pipeline_recv( (void*) tensor.data_ptr(), + this->pin_buffer, metadata.num_tokens(), metadata.token_hidden_dim() * metadata.get_datatype_size(), - cudaMemcpyKind::cudaMemcpyHostToDevice + this->stream_recv, + this->mq ); } diff --git a/csrc/tests/test_zmq_overlap.cpp b/csrc/tests/test_zmq_overlap.cpp index 27348ba..e28e7b2 100644 --- a/csrc/tests/test_zmq_overlap.cpp +++ b/csrc/tests/test_zmq_overlap.cpp @@ -25,10 +25,12 @@ void test_zmq_overlap(int rank) { auto c_recv = create_zmq_channel(rank, 1, false); c_send->instantiate(); c_recv->instantiate(); - c_recv->recv(a, meta); - DMOE_LOG(INFO) << "Sampler Received from <1>: " << a.mean().item() << LEND; - c_send->send(a, meta); - DMOE_LOG(INFO) << "Sampler Sent to <0>" << LEND; + for (int i = 0; i < 5; i ++) { + c_recv->recv(a, meta); + DMOE_LOG(INFO) << "Sampler Received from <1>: " << a.mean().item() << LEND; + c_send->send(a, meta); + DMOE_LOG(INFO) << "Sampler Sent to <0>" << LEND; + } } else { Tensor a = torch::ones( {bs, hs}, @@ -38,14 +40,19 @@ void test_zmq_overlap(int rank) { // receive from zmq auto c = create_zmq_channel(0, SAMPLER_DEV_ID, false); c->instantiate(); - c->recv(a, meta); - DMOE_LOG(INFO) << "<0> Received from sampler: " << a.mean().item() << LEND; + for (int i = 0; i < 5; i ++) { + c->recv(a, meta); + DMOE_LOG(INFO) << "<0> Received from sampler: " << a.mean().item() << LEND; + } } else if (rank == 1) { // send to zmq auto c = create_zmq_channel(1, SAMPLER_DEV_ID, true); c->instantiate(); - c->send(a, meta); - DMOE_LOG(INFO) << "<1> Sent to sampler: " << a.mean().item() << LEND; + for (int i = 0; i < 5; i ++) { + c->send(a, meta); + DMOE_LOG(INFO) << "<1> Sent to sampler: " << a.mean().item() << LEND; + } } } + std::this_thread::sleep_for(std::chrono::seconds(1)); } \ No newline at end of file diff --git a/tests/comm/test_zmq_overlap.py b/tests/comm/test_zmq_overlap.py index 2c3da00..f356a51 100644 --- a/tests/comm/test_zmq_overlap.py +++ b/tests/comm/test_zmq_overlap.py @@ -9,15 +9,28 @@ def func(rank: int): 1: "0.0.0.0", 82: "0.0.0.0" }) + profiler = torch.profiler.profile( + activities=[ + torch.profiler.ProfilerActivity.CPU, + torch.profiler.ProfilerActivity.CUDA, + ], + # with_stack=True, + on_trace_ready=torch.profiler.tensorboard_trace_handler( + dir_name="./reports/", + worker_name=f"engine-{rank}", + use_gzip=True,)) + profiler.start() test_zmq_overlap(rank) + profiler.stop() def main(): ray.init("auto") tasks = [ ray.remote(func).options(num_gpus=1).remote(0), ray.remote(func).options(num_gpus=1).remote(1), - ray.remote(func).options(num_gpus=0).remote(82) + ray.remote(func).options(num_gpus=0).remote(82) ] ray.get(tasks) + ray.shutdown() main() \ No newline at end of file From 2966dcd4b4fce2ffe57297ef61a86e6743471e9f Mon Sep 17 00:00:00 2001 From: hogura Date: Wed, 8 Jan 2025 02:55:37 +0000 Subject: [PATCH 6/8] Fixed async overlap --- csrc/muhelper/comm.cpp | 8 ++------ tests/comm/test_zmq_overlap.py | 16 ++++++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 23e5ce8..89e7b14 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -174,7 +174,7 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s tx_range _{"ZmqChannel::_pipeline_comm"}; const size_t step = PIPE_COMM_STEP; - void* cpu_buf = std::malloc(step * token_size); + void* cpu_buf = std::malloc(num_tokens * token_size); struct SendItem { void* buf; @@ -192,7 +192,6 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s void* src_buf = data + i * token_size; void* dst_buf = cpu_buf + i * token_size; - DMOE_LOG(DEBUG) << "{A} copying " << i << " " << cur_step << LEND; CUDACHECK(cudaMemcpyAsync( pin_buf + i * token_size, src_buf, @@ -200,7 +199,6 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s cudaMemcpyDeviceToHost, stream )); - DMOE_LOG(DEBUG) << "{B} copying " << i << " " << cur_step << LEND; CUDACHECK(cudaMemcpyAsync( dst_buf, pin_buf + i * token_size, @@ -229,9 +227,7 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s item = send_queue.front(); send_queue.pop(); } - DMOE_LOG(DEBUG) << "{C} waiting stream " << i << LEND; - CUDACHECK(cudaStreamWaitEvent(stream, item.event)); - DMOE_LOG(DEBUG) << "{C} copying " << i << LEND; + CUDACHECK(cudaEventSynchronize(item.event)); tx_range __{"ZmqChannel::_pipeline_comm_send"}; mq->send(zmq::buffer(item.buf, item.size)); CUDACHECK(cudaEventDestroy(item.event)); diff --git a/tests/comm/test_zmq_overlap.py b/tests/comm/test_zmq_overlap.py index f356a51..e6c0b3a 100644 --- a/tests/comm/test_zmq_overlap.py +++ b/tests/comm/test_zmq_overlap.py @@ -16,8 +16,8 @@ def func(rank: int): ], # with_stack=True, on_trace_ready=torch.profiler.tensorboard_trace_handler( - dir_name="./reports/", - worker_name=f"engine-{rank}", + dir_name="/home/hogura1999/DisagMoE/reports/zmq_overlap/", + worker_name=f"worker-{rank}", use_gzip=True,)) profiler.start() test_zmq_overlap(rank) @@ -25,12 +25,16 @@ def func(rank: int): def main(): ray.init("auto") + env = { + "runtime_env": { + # "CUDA_LAUNCH_BLOCKING": "1", + } + } tasks = [ - ray.remote(func).options(num_gpus=1).remote(0), - ray.remote(func).options(num_gpus=1).remote(1), - ray.remote(func).options(num_gpus=0).remote(82) + ray.remote(func).options(num_gpus=1, runtime_env=env).remote(0), + ray.remote(func).options(num_gpus=1, runtime_env=env).remote(1), + ray.remote(func).options(num_gpus=0, runtime_env=env).remote(82) ] ray.get(tasks) - ray.shutdown() main() \ No newline at end of file From d8599c345c3afaf4668798e6c8d6fb74e196b512 Mon Sep 17 00:00:00 2001 From: hogura Date: Wed, 8 Jan 2025 03:19:47 +0000 Subject: [PATCH 7/8] Profiling zmq overlap --- csrc/include/cuda_utils.h | 4 ++-- csrc/muhelper/comm.cpp | 21 ++++-------------- tests/comm/test_zmq_overlap.py | 39 ++++++++++++++++++++++++++++++---- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/csrc/include/cuda_utils.h b/csrc/include/cuda_utils.h index cadc008..c3cf85f 100644 --- a/csrc/include/cuda_utils.h +++ b/csrc/include/cuda_utils.h @@ -101,8 +101,8 @@ inline cudaStream_t get_current_torch_stream(int device_id = 0) { #include "nvtx3/nvtx3.hpp" #include "profiler.hpp" -using tx_range = nvtx3::scoped_range; -// using tx_range = ScopedRange; +// using tx_range = nvtx3::scoped_range; +using tx_range = ScopedRange; #define AUTO_TX_RANGE tx_range __{__FUNCTION__} diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 89e7b14..2624db7 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -179,7 +179,6 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s struct SendItem { void* buf; size_t size; - cudaEvent_t event; }; std::queue send_queue; @@ -187,33 +186,23 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s std::condition_variable cv; std::thread t_copy([&] { for (size_t i = 0; i < num_tokens; i += step) { + tx_range __{"ZmqChannel::_pipeline_comm_async_copy"}; size_t cur_step = std::min(step, num_tokens - i); void* src_buf = data + i * token_size; void* dst_buf = cpu_buf + i * token_size; CUDACHECK(cudaMemcpyAsync( - pin_buf + i * token_size, + dst_buf, src_buf, cur_step * token_size, cudaMemcpyDeviceToHost, stream )); - CUDACHECK(cudaMemcpyAsync( - dst_buf, - pin_buf + i * token_size, - cur_step * token_size, - cudaMemcpyHostToHost, - stream - )); - - cudaEvent_t event; - CUDACHECK(cudaEventCreate(&event)); - CUDACHECK(cudaEventRecord(event, stream)); { std::lock_guard lock(mutex); - send_queue.push({dst_buf, cur_step * token_size, event}); + send_queue.push({dst_buf, cur_step * token_size}); cv.notify_one(); } } @@ -227,10 +216,8 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s item = send_queue.front(); send_queue.pop(); } - CUDACHECK(cudaEventSynchronize(item.event)); - tx_range __{"ZmqChannel::_pipeline_comm_send"}; + tx_range __{"ZmqChannel::_pipeline_comm_async_send"}; mq->send(zmq::buffer(item.buf, item.size)); - CUDACHECK(cudaEventDestroy(item.event)); } std::free(cpu_buf); diff --git a/tests/comm/test_zmq_overlap.py b/tests/comm/test_zmq_overlap.py index e6c0b3a..b9e33c5 100644 --- a/tests/comm/test_zmq_overlap.py +++ b/tests/comm/test_zmq_overlap.py @@ -3,7 +3,8 @@ def func(rank: int): import torch - from disagmoe_c import test_zmq_overlap, set_hosts + from disagmoe_c import test_zmq_overlap, set_hosts, recorder_create, recorder_output + from disagmoe.frontend.datatypes import TraceContext set_hosts(os.getpid(), { 0: "0.0.0.0", 1: "0.0.0.0", @@ -19,9 +20,15 @@ def func(rank: int): dir_name="/home/hogura1999/DisagMoE/reports/zmq_overlap/", worker_name=f"worker-{rank}", use_gzip=True,)) - profiler.start() + recorder_create() + # profiler.start() test_zmq_overlap(rank) - profiler.stop() + # profiler.stop() + output = recorder_output() + result = {} + for key in output: + result[key] = [TraceContext.from_c(c) for c in output[key]] + return result def main(): ray.init("auto") @@ -35,6 +42,30 @@ def main(): ray.remote(func).options(num_gpus=1, runtime_env=env).remote(1), ray.remote(func).options(num_gpus=0, runtime_env=env).remote(82) ] - ray.get(tasks) + outputs = ray.get(tasks) + events = [] + ms_to_us = lambda ms: ms * 1000 + for pid, p_traces in enumerate(outputs): + for tid, t_traces in p_traces.items(): + print("outputing thread", tid) + tid = tid % (1 << 32) + for trace in t_traces: + if "schedule" in trace.msg and ms_to_us(trace.t_dur) < 10: + continue + events.append({ + "name": trace.msg, + "cat": "trace", + "ph": "X", + "ts": ms_to_us(trace.t_start), + "dur": ms_to_us(trace.t_dur), + "pid": pid, + "tid": (tid * 10 + trace.track_id) % (1 << 31), + }) + + import gzip + import json + with gzip.open(f"trace_zmq_overlap.json.gz", "w") as f: + f.write(json.dumps(events).encode("utf-8")) + main() \ No newline at end of file From 493677e665118289dd3fd290aeb62d24b4de60d3 Mon Sep 17 00:00:00 2001 From: hogura Date: Wed, 8 Jan 2025 12:35:00 +0000 Subject: [PATCH 8/8] Add comm helper; testing random blocking @ non-pipeline send --- csrc/include/comm.h | 58 +++++++++++ csrc/include/constants.h | 6 +- csrc/include/profiler.hpp | 8 +- csrc/muhelper/comm.cpp | 174 +++++++++++++++++++++----------- csrc/tests/test_zmq_overlap.cpp | 2 +- tests/comm/test_zmq_overlap.py | 5 +- 6 files changed, 189 insertions(+), 64 deletions(-) diff --git a/csrc/include/comm.h b/csrc/include/comm.h index c15e47e..a73eb72 100644 --- a/csrc/include/comm.h +++ b/csrc/include/comm.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "cuda_runtime.h" #include "zmq.hpp" @@ -78,12 +80,67 @@ class NcclChannel: public Channel { typedef std::shared_ptr mq_t; +class ZmqChannelHelper { +public: + struct Item; + +protected: + std::thread t; + bool end_flag; + mq_t mq; + cudaStream_t stream; + + std::queue queue; + std::mutex mutex, mtx_end; + std::condition_variable cv, cv_end; + + virtual void run(Item &item) = 0; + +public: + struct Item { + void* dst {0}; + size_t size {0}; + void* src {0}; + void* event {0}; + }; + + ZmqChannelHelper(mq_t mq, cudaStream_t stream); + + void terminate(); + + void put(void* dst, size_t size, void* src = nullptr, void* event = 0); + + Item get(); + + void start(); + + void sync(); +}; + +class ZmqChannelSender: public ZmqChannelHelper { +protected: + void run(Item &item) override; + +public: + ZmqChannelSender(mq_t mq, cudaStream_t stream): ZmqChannelHelper(mq, stream) {} +}; + +class ZmqChannelRecver: public ZmqChannelHelper { +protected: + void run(Item &item) override; + +public: + ZmqChannelRecver(mq_t mq, cudaStream_t stream): ZmqChannelHelper(mq, stream) {} +}; + class ZmqChannel: public Channel { + protected: static std::map global_mq; zmq::context_t ctx; mq_t mq; cudaStream_t stream_send, stream_recv; + ZmqChannelHelper* helper; std::string other_ip; bool is_sender; @@ -95,6 +152,7 @@ class ZmqChannel: public Channel { public: ZmqChannel(int party_local, int party_other, bool is_sender, int rank = 0); + ~ZmqChannel(); void instantiate() override; diff --git a/csrc/include/constants.h b/csrc/include/constants.h index 28fc3f6..f582542 100644 --- a/csrc/include/constants.h +++ b/csrc/include/constants.h @@ -43,7 +43,11 @@ const int ZMQ_OFFSET_BASE = 16; #endif #ifndef PIPE_COMM_STEP -#define PIPE_COMM_STEP 4 +#define PIPE_COMM_STEP 256 +#endif + +#ifndef TRACER_GAP_US +#define TRACER_GAP_US 0 #endif #define ASSERT(condition) do {if (!(condition)) { \ diff --git a/csrc/include/profiler.hpp b/csrc/include/profiler.hpp index 8ac6d46..2f1595f 100644 --- a/csrc/include/profiler.hpp +++ b/csrc/include/profiler.hpp @@ -46,6 +46,7 @@ class Recorder { public: Recorder(const char *enabled) { + DMOE_LOG(DEBUG) << "Recorder enable status: " << enabled << LEND; this->enabled = enabled && strcmp(enabled, "1") == 0; } @@ -54,7 +55,10 @@ class Recorder { return; auto tid = std::this_thread::get_id(); DMOE_LOG(DEBUG) << "Creating thread " << tid << LEND; - ASSERT(ctx.find(tid) == ctx.end()); + if (ctx.find(tid) != ctx.end()) { + DMOE_LOG(WARNING) << "Thread " << tid << " already exists" << LEND; + return; + } ctx[tid] = std::vector(); stack[tid] = std::stack>(); } @@ -78,7 +82,7 @@ class Recorder { auto top = stack.at(tid).top(); stack.at(tid).pop(); - if ((ts - top.first) * 1000 > 10) // only > 10us is commited + if ((ts - top.first) * 1000 > TRACER_GAP_US) // only time > gap is commited ctx.at(tid).push_back(TraceContext{top.second, top.first, ts - top.first, (int) stack.at(tid).size()}); } diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 2624db7..3fc1963 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -2,6 +2,7 @@ #include "logging.h" #include "utils.hpp" #include "distributed.hpp" +#include "profiler.hpp" #include #include @@ -105,15 +106,19 @@ ZmqChannel::ZmqChannel(int party_local, int party_other, bool is_sender, int ran if (!is_embedding_node(party_local)) { CUDACHECK(cudaStreamCreateWithPriority(&this->stream_send, cudaStreamNonBlocking, 10)); CUDACHECK(cudaStreamCreateWithPriority(&this->stream_recv, cudaStreamNonBlocking, 10)); + CUDACHECK(cudaMallocHost(&this->pin_buffer, 1024 * 4096 * 4)); } else { this->stream_send = 0; this->stream_recv = 0; } + } - if (!is_embedding_node(local)) { - CUDACHECK(cudaMallocHost(&this->pin_buffer, 1024 * 4096 * 4)); - } +ZmqChannel::~ZmqChannel() { + if (!is_embedding_node(local)) { + this->helper->terminate(); + delete this->helper; } +} std::map ZmqChannel::global_mq = {}; std::mutex global_mutex; @@ -131,67 +136,132 @@ void ZmqChannel::instantiate() { this->mq->connect(get_zmq_addr(other, /*is_gpu=*/ false, /*manual_port=*/ -1, /*offset=*/ this->rank_offset)); } DMOE_LOG(INFO) << "ZmqChannel instantiated " << this->local << LEND; + + if (!is_embedding_node(local)) { + if (is_sender) { + this->helper = new ZmqChannelSender(this->mq, this->stream_send); + } else { + this->helper = new ZmqChannelRecver(this->mq, this->stream_recv); + } + this->helper->start(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); +} + +ZmqChannelHelper::ZmqChannelHelper(mq_t mq, cudaStream_t stream): + mq(mq), stream(stream), mutex(), queue(), cv(), end_flag(false) {} + +void ZmqChannelHelper::terminate() { + this->end_flag = true; + put(0, 0, 0); + this->t.join(); } -void _pipeline_recv(void* data, void* pin_buf, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { +void ZmqChannelHelper::put(void* buf, size_t size, void* buf_recv, void* event) { + std::lock_guard lock(mutex); + queue.push({buf, size, buf_recv, event}); + cv.notify_one(); +} + +using Item = ZmqChannelHelper::Item; + +Item ZmqChannelHelper::get() { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return !queue.empty(); }); + auto item = queue.front(); + queue.pop(); + return item; +} + +void ZmqChannelHelper::start() { + this->t = std::thread([&] { + Recorder::create(); + while (!end_flag) { + auto item = get(); + if (item.size == 0) + break; + run(item); + } + DMOE_LOG(WARNING) << "Quiting ZmqChannelHelper thread" << LEND; + }); +} + +void ZmqChannelHelper::sync() { + std::unique_lock lock(mtx_end); + cv_end.wait(lock); +} + +void ZmqChannelSender::run(Item &item) { + tx_range _{"ZmqChannel::_pipeline_comm_async_send"}; + DMOE_LOG(DEBUG) << "Sending " << item.size << LEND; + mq->send(zmq::buffer(item.dst, item.size), zmq::send_flags::dontwait); + if (item.event != 0) { + std::lock_guard lock(this->mtx_end); + this->cv_end.notify_one(); + } + DMOE_LOG(DEBUG) << "Sent " << item.size << LEND; +} + +void ZmqChannelRecver::run(Item &item) { + tx_range _{"ZmqChannel::_pipeline_comm_async_copy"}; + CUDACHECK(cudaMemcpyAsync( + item.dst, + item.src, + item.size, + cudaMemcpyHostToDevice, + stream + )); + if (item.event != 0) { + std::lock_guard lock(this->mtx_end); + this->cv_end.notify_one(); + } +} + +void _pipeline_recv(void* data, ZmqChannelHelper* helper, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { tx_range _{"ZmqChannel::_pipeline_comm"}; const size_t step = PIPE_COMM_STEP; + cudaEvent_t event; + CUDACHECK(cudaEventCreate(&event)); zmq::message_t msg[(num_tokens + step - 1) / step]; for (size_t i = 0; i < num_tokens; i += step) { + DMOE_LOG(DEBUG) << "Receiving " << i << " " << num_tokens << LEND; size_t cur_step = std::min(step, num_tokens - i); void* dst_buf = data + i * token_size; void* src_buf; { tx_range __{"ZmqChannel::_pipeline_comm_recv"}; - mq->recv(msg[i / step], zmq::recv_flags::none); + auto err = mq->recv(msg[i / step], zmq::recv_flags::none); src_buf = msg[i / step].data(); } - - CUDACHECK(cudaMemcpyAsync( - pin_buf + i * token_size, - src_buf, - cur_step * token_size, - cudaMemcpyHostToHost, - stream - )); - CUDACHECK(cudaMemcpyAsync( - dst_buf, - pin_buf + i * token_size, - cur_step * token_size, - cudaMemcpyHostToDevice, - stream - )); + + DMOE_LOG(DEBUG) << "Putting " << i << " " << num_tokens << LEND; + helper->put(dst_buf, cur_step * token_size, src_buf, + i + step < num_tokens ? 0 : event); } tx_range __{"ZmqChannel::_pipeline_comm_sync"}; cudaStreamSynchronize(stream); + helper->sync(); } -void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { +void _pipeline_send(void* data, ZmqChannelHelper* helper, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { tx_range _{"ZmqChannel::_pipeline_comm"}; const size_t step = PIPE_COMM_STEP; void* cpu_buf = std::malloc(num_tokens * token_size); - - struct SendItem { - void* buf; - size_t size; - }; - - std::queue send_queue; std::mutex mutex; - std::condition_variable cv; - std::thread t_copy([&] { - for (size_t i = 0; i < num_tokens; i += step) { - tx_range __{"ZmqChannel::_pipeline_comm_async_copy"}; - size_t cur_step = std::min(step, num_tokens - i); - void* src_buf = data + i * token_size; - void* dst_buf = cpu_buf + i * token_size; + for (size_t i = 0; i < num_tokens; i += step) { + size_t cur_step = std::min(step, num_tokens - i); + + void* src_buf = data + i * token_size; + void* dst_buf = cpu_buf + i * token_size; + { + tx_range __{"ZmqChannel::_pipeline_comm_async_copy"}; CUDACHECK(cudaMemcpyAsync( dst_buf, src_buf, @@ -199,29 +269,14 @@ void _pipeline_send(void* data, void* pin_buf, size_t num_tokens, size_t token_s cudaMemcpyDeviceToHost, stream )); - - { - std::lock_guard lock(mutex); - send_queue.push({dst_buf, cur_step * token_size}); - cv.notify_one(); - } } - }); - SendItem item; - for (size_t i = 0; i < num_tokens; i += step) { - { - std::unique_lock lock(mutex); - cv.wait(lock, [&] { return !send_queue.empty(); }); - item = send_queue.front(); - send_queue.pop(); - } - tx_range __{"ZmqChannel::_pipeline_comm_async_send"}; - mq->send(zmq::buffer(item.buf, item.size)); + helper->put(dst_buf, cur_step * token_size, 0, + (void*) (i + step < num_tokens ? 0x0 : 0x1)); } + helper->sync(); std::free(cpu_buf); - t_copy.join(); } void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { @@ -235,12 +290,12 @@ void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { size_t size = metadata.num_element() * metadata.get_datatype_size(); size_t step_size = step * metadata.token_hidden_dim() * metadata.get_datatype_size(); for (size_t i = 0; i < size; i += step_size) { - this->mq->send(zmq::buffer(buf + i, std::min(step_size, size - i))); + this->mq->send(zmq::buffer(buf + i, std::min(step_size, size - i)), zmq::send_flags::dontwait); } } else { _pipeline_send( (void*) tensor.data_ptr(), - this->pin_buffer, + this->helper, metadata.num_tokens(), metadata.token_hidden_dim() * metadata.get_datatype_size(), this->stream_send, @@ -259,16 +314,21 @@ void ZmqChannel::recv(Tensor tensor, const Metadata &metadata) { if (is_embedding_node(this->local)) { size_t size = metadata.num_element() * metadata.get_datatype_size(); size_t step_size = PIPE_COMM_STEP * metadata.token_hidden_dim() * metadata.get_datatype_size(); - zmq::message_t msg(size); + zmq::message_t msg(step_size); void* buf = (void*) tensor.data_ptr(); + DMOE_LOG(DEBUG) << "launching recv loop" << LEND; for (size_t i = 0; i < size; i += step_size) { + DMOE_LOG(DEBUG) << "Receiving " << i << "/" << size << LEND; + // std::this_thread::sleep_for(std::chrono::microseconds(1)); auto err = this->mq->recv(msg, zmq::recv_flags::none); + ASSERT(msg.size() == std::min(step_size, size - i)); memcpy(buf + i, msg.data(), std::min(step_size, size - i)); } + DMOE_LOG(DEBUG) << "recv loop finished" << LEND; } else { _pipeline_recv( (void*) tensor.data_ptr(), - this->pin_buffer, + this->helper, metadata.num_tokens(), metadata.token_hidden_dim() * metadata.get_datatype_size(), this->stream_recv, diff --git a/csrc/tests/test_zmq_overlap.cpp b/csrc/tests/test_zmq_overlap.cpp index e28e7b2..fa3974c 100644 --- a/csrc/tests/test_zmq_overlap.cpp +++ b/csrc/tests/test_zmq_overlap.cpp @@ -23,8 +23,8 @@ void test_zmq_overlap(int rank) { ); auto c_send = create_zmq_channel(rank, 0, true); auto c_recv = create_zmq_channel(rank, 1, false); - c_send->instantiate(); c_recv->instantiate(); + c_send->instantiate(); for (int i = 0; i < 5; i ++) { c_recv->recv(a, meta); DMOE_LOG(INFO) << "Sampler Received from <1>: " << a.mean().item() << LEND; diff --git a/tests/comm/test_zmq_overlap.py b/tests/comm/test_zmq_overlap.py index b9e33c5..f44f426 100644 --- a/tests/comm/test_zmq_overlap.py +++ b/tests/comm/test_zmq_overlap.py @@ -33,8 +33,9 @@ def func(rank: int): def main(): ray.init("auto") env = { - "runtime_env": { + "env_vars": { # "CUDA_LAUNCH_BLOCKING": "1", + "ENABLE_NVTX": "1", } } tasks = [ @@ -50,8 +51,6 @@ def main(): print("outputing thread", tid) tid = tid % (1 << 32) for trace in t_traces: - if "schedule" in trace.msg and ms_to_us(trace.t_dur) < 10: - continue events.append({ "name": trace.msg, "cat": "trace",