diff --git a/csrc/bindings.cpp b/csrc/bindings.cpp index 5bfcc54..aa851ab 100644 --- a/csrc/bindings.cpp +++ b/csrc/bindings.cpp @@ -10,7 +10,6 @@ #include "datatypes.hpp" #include "block_manager.h" #include "permute.h" -#include "binding_helper.h" #include "binding_tests.hpp" #include "profiler.hpp" @@ -179,12 +178,13 @@ PYBIND11_MODULE(disagmoe_c, m) { /******** Test functions ********/ - m.def("test_nccl_p2p", &test_nccl_p2p); - m.def("test_nccl_group", &test_nccl_group); - m.def("test_parallel_attn_scheduler", &test_parallel_attn_scheduler); - m.def("test_multi_launch", &test_multi_launch); + // m.def("test_nccl_p2p", &test_nccl_p2p); + // m.def("test_nccl_group", &test_nccl_group); + // m.def("test_parallel_attn_scheduler", &test_parallel_attn_scheduler); + // m.def("test_multi_launch", &test_multi_launch); - REGISTER_FUNC(test_op_overlap); + REGISTER_FUNC(test_zmq_overlap); + // REGISTER_FUNC(test_op_overlap); // m.def("test_zmq_sub_pub", &test_zmq_sub_pub); // m.def("test_attn_dispatcher", &test_attn_dispatcher); // m.def("test_expert_dispatcher", &test_expert_dispatcher); diff --git a/csrc/include/binding_helper.h b/csrc/include/binding_helper.h deleted file mode 100644 index b07f380..0000000 --- a/csrc/include/binding_helper.h +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once - -#include "muhelper.h" -#include "comm.h" - -#include -#include - -class PyMuHelper: MuHelper { -public: - using MuHelper::MuHelper; - using MuHelper::start; - using MuHelper::terminate; - - void run() override { - PYBIND11_OVERRIDE_PURE(void, MuHelper, run); - } -}; - -class PyChannel: Channel { -public: - using Channel::Channel; - - void instantiate() override { - PYBIND11_OVERRIDE_PURE(void, Channel, instantiate); - } - - void send(uintptr_t data, const Metadata& metadata) override { - PYBIND11_OVERRIDE_PURE(void, Channel, send); - } - - void recv(uintptr_t data, const Metadata& metadata) override { - PYBIND11_OVERRIDE_PURE(void, Channel, recv); - } -}; \ No newline at end of file diff --git a/csrc/include/binding_tests.hpp b/csrc/include/binding_tests.hpp index c59e0e5..6c9b579 100644 --- a/csrc/include/binding_tests.hpp +++ b/csrc/include/binding_tests.hpp @@ -12,22 +12,22 @@ #include #include -void test_nccl_p2p(Channel_t c1, uintptr_t p1, Channel_t c2, uintptr_t p2, const Metadata& metadata) { - auto t1 = std::thread([&]{c1->send(p1, metadata);}); - auto t2 = std::thread([&]{c2->recv(p2, metadata);}); - t1.join(); - t2.join(); -} - -std::pair _init_channel(int s = 0, int r = 1) { - auto uid = get_nccl_unique_id(); - auto c1 = create_channel(s, r, uid); - auto c2 = create_channel(r, s, uid); - - auto t1 = std::thread([&]{ c1->instantiate(); }), t2 = std::thread([&]{ c2->instantiate(); }); - t1.join(); t2.join(); - return std::make_pair(c1, c2); -} +// void test_nccl_p2p(Channel_t c1, uintptr_t p1, Channel_t c2, uintptr_t p2, const Metadata& metadata) { +// auto t1 = std::thread([&]{c1->send(p1, metadata);}); +// auto t2 = std::thread([&]{c2->recv(p2, metadata);}); +// t1.join(); +// t2.join(); +// } + +// std::pair _init_channel(int s = 0, int r = 1) { +// auto uid = get_nccl_unique_id(); +// auto c1 = create_channel(s, r, uid); +// auto c2 = create_channel(r, s, uid); + +// auto t1 = std::thread([&]{ c1->instantiate(); }), t2 = std::thread([&]{ c2->instantiate(); }); +// t1.join(); t2.join(); +// return std::make_pair(c1, c2); +// } // void test_zmq_sub_pub() { // auto pr = _init_channel(); @@ -347,150 +347,150 @@ std::pair _init_channel(int s = 0, int r = 1) { // exit(0); // } -void test_nccl_group(int rank, std::vector ranks, std::string uid) { - auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); - auto c = static_cast(c_raw.get()); - c->instantiate(); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; +// void test_nccl_group(int rank, std::vector ranks, std::string uid) { +// auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); +// auto c = static_cast(c_raw.get()); +// c->instantiate(); +// std::this_thread::sleep_for(std::chrono::milliseconds(1000)); +// DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; - if (rank == 0) { - Metadata meta = Metadata { - /*shape=*/ std::vector({1, 4}), - /*dtype=*/ "fp16", - /*layer_id=*/ 1, - /*req_ids=*/ std::vector({2}), - /*exp_ids=*/ std::vector({3}), - /*prefill_poss=*/ std::vector({4}), - }; - uintptr_t buf = alloc_cuda_tensor(4, 0); - c->send_metadata(meta); - DMOE_LOG(INFO) << "Send metadata." << LEND; - c->send(buf, meta); - } else { - Metadata meta; - c->recv_metadata(meta); - DMOE_LOG(INFO) << "Got metadata: " << meta << LEND; - ASSERT(meta.num_element() == 4); - uintptr_t buf = alloc_cuda_tensor(meta.num_element(), 0); - c->recv(buf, meta); - ASSERT(meta.req_ids[0] == 2); - ASSERT(meta.exp_ids[0] == 3); - ASSERT(meta.prefill_poss[0] == 4); - } - - DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; -} - -void test_parallel_attn_scheduler(int rank, std::vector ranks, std::string uid) { - auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); - auto c = static_cast(c_raw.get()); - c->instantiate(); - DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; - - std::vector layer_ids{0, 1}; - std::vector channels{}; - mu_attn_pool_t pool = std::make_shared( - layer_ids, - rank, - channels - ); - - torch::Tensor dummy_tensor = torch::empty({0}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0)); - - std::vector> data_queue(2); - data_queue[0] = std::vector{ - AttentionBatch{dummy_tensor, std::make_shared( - AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {0}, {1}, {1}, {}} - )}, - AttentionBatch{dummy_tensor, std::make_shared( - AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {1}, {1}, {1}, {}} - )}, - }; - data_queue[1] = std::vector{ - AttentionBatch{dummy_tensor, std::make_shared( - AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {2}, {1}, {1}, {}} - )} - }; - std::vector token_per_layer {2, 1}; - pool->__set_attn_data_queue(data_queue, token_per_layer, 0); - - AttentionBatch result; - - if (rank == 0) { - // driver scheduler - AttentionDriverScheduler scheduler(pool, layer_ids, c_raw, c_raw); - result = scheduler.schedule(); - } else { - // worker scheduler - AttentionWorkerScheduler scheduler(pool, layer_ids, c_raw, c_raw); - result = scheduler.schedule(); - } - - ASSERT(result.metadata.get() != nullptr); - auto &seq_ids = result.metadata->seq_ids; - - for (int i: seq_ids) - DMOE_LOG(DEBUG) << "seq_id: " << i << LEND; - - ASSERT(seq_ids.size() == 2); - ASSERT(seq_ids[0] == 0 && seq_ids[1] == 1); - - DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; -} - -void test_multi_launch(int rank, std::vector ranks, std::vector uids) { - std::vector threads; - for (int i = 0; i < uids.size(); i ++) { - threads.push_back(std::thread( - [&](std::string uid, int i) { - auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); - auto c = std::dynamic_pointer_cast(c_raw); - c->instantiate(); - cudaStream_t stream; - cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, -2); - if (i == 0) { - if (rank == 0) { - DMOE_LOG(DEBUG) << "sending metadata" << LEND; - c->send_metadata(Metadata { - /*shape=*/ std::vector({1, 4}), - /*dtype=*/ "fp16", - /*layer_id=*/ 1, - /*req_ids=*/ std::vector({rank * 10 + 0}), - /*exp_ids=*/ std::vector({3}), - /*prefill_poss=*/ std::vector({4}), - }); - DMOE_LOG(DEBUG) << "thread " << i << "sleeping" << LEND; - std::this_thread::sleep_for(std::chrono::milliseconds(10000)); - // c->send_metadata(Metadata { - // /*shape=*/ std::vector({1, 4}), - // /*dtype=*/ "fp16", - // /*layer_id=*/ 1, - // /*req_ids=*/ std::vector({rank * 10 + 1}), - // /*exp_ids=*/ std::vector({3}), - // /*prefill_poss=*/ std::vector({4}), - // }); - } else { - Metadata meta; - DMOE_LOG(DEBUG) << "receiving metadata" << LEND; - c->recv_metadata(meta); - DMOE_LOG(DEBUG) << "get " << meta << LEND; - c->recv_metadata(meta); - DMOE_LOG(DEBUG) << "get " << meta << LEND; - } - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - DMOE_LOG(WARNING) << "thread " << i << " trying to allocate tensor" << LEND; - void* data; - CUDACHECK(cudaMallocAsync(&data, 4096, stream)); - CUDACHECK(cudaStreamSynchronize(stream)); - // c->all_reduce(data, {1, 4096}); - DMOE_LOG(WARNING) << "allocated tensor" << LEND; - } - }, uids[i], i - )); - } - for (auto &t: threads) - t.join(); - DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; -} \ No newline at end of file +// if (rank == 0) { +// Metadata meta = Metadata { +// /*shape=*/ std::vector({1, 4}), +// /*dtype=*/ "fp16", +// /*layer_id=*/ 1, +// /*req_ids=*/ std::vector({2}), +// /*exp_ids=*/ std::vector({3}), +// /*prefill_poss=*/ std::vector({4}), +// }; +// uintptr_t buf = alloc_cuda_tensor(4, 0); +// c->send_metadata(meta); +// DMOE_LOG(INFO) << "Send metadata." << LEND; +// c->send(buf, meta); +// } else { +// Metadata meta; +// c->recv_metadata(meta); +// DMOE_LOG(INFO) << "Got metadata: " << meta << LEND; +// ASSERT(meta.num_element() == 4); +// uintptr_t buf = alloc_cuda_tensor(meta.num_element(), 0); +// c->recv(buf, meta); +// ASSERT(meta.req_ids[0] == 2); +// ASSERT(meta.exp_ids[0] == 3); +// ASSERT(meta.prefill_poss[0] == 4); +// } + +// DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; +// } + +// void test_parallel_attn_scheduler(int rank, std::vector ranks, std::string uid) { +// auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); +// auto c = static_cast(c_raw.get()); +// c->instantiate(); +// DMOE_LOG(INFO) << "rank " << rank << " instantiated" << LEND; + +// std::vector layer_ids{0, 1}; +// std::vector channels{}; +// mu_attn_pool_t pool = std::make_shared( +// layer_ids, +// rank, +// channels +// ); + +// torch::Tensor dummy_tensor = torch::empty({0}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0)); + +// std::vector> data_queue(2); +// data_queue[0] = std::vector{ +// AttentionBatch{dummy_tensor, std::make_shared( +// AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {0}, {1}, {1}, {}} +// )}, +// AttentionBatch{dummy_tensor, std::make_shared( +// AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {1}, {1}, {1}, {}} +// )}, +// }; +// data_queue[1] = std::vector{ +// AttentionBatch{dummy_tensor, std::make_shared( +// AttentionBatchMetadata{0, {1, 4}, "fp16", 1, 1, 0, /*seq_ids=*/ {2}, {1}, {1}, {}} +// )} +// }; +// std::vector token_per_layer {2, 1}; +// pool->__set_attn_data_queue(data_queue, token_per_layer, 0); + +// AttentionBatch result; + +// if (rank == 0) { +// // driver scheduler +// AttentionDriverScheduler scheduler(pool, layer_ids, c_raw, c_raw); +// result = scheduler.schedule(); +// } else { +// // worker scheduler +// AttentionWorkerScheduler scheduler(pool, layer_ids, c_raw, c_raw); +// result = scheduler.schedule(); +// } + +// ASSERT(result.metadata.get() != nullptr); +// auto &seq_ids = result.metadata->seq_ids; + +// for (int i: seq_ids) +// DMOE_LOG(DEBUG) << "seq_id: " << i << LEND; + +// ASSERT(seq_ids.size() == 2); +// ASSERT(seq_ids[0] == 0 && seq_ids[1] == 1); + +// DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; +// } + +// void test_multi_launch(int rank, std::vector ranks, std::vector uids) { +// std::vector threads; +// for (int i = 0; i < uids.size(); i ++) { +// threads.push_back(std::thread( +// [&](std::string uid, int i) { +// auto c_raw = create_nccl_group_channel(rank, ranks, (void*) uid.c_str()); +// auto c = std::dynamic_pointer_cast(c_raw); +// c->instantiate(); +// cudaStream_t stream; +// cudaStreamCreateWithPriority(&stream, cudaStreamNonBlocking, -2); +// if (i == 0) { +// if (rank == 0) { +// DMOE_LOG(DEBUG) << "sending metadata" << LEND; +// c->send_metadata(Metadata { +// /*shape=*/ std::vector({1, 4}), +// /*dtype=*/ "fp16", +// /*layer_id=*/ 1, +// /*req_ids=*/ std::vector({rank * 10 + 0}), +// /*exp_ids=*/ std::vector({3}), +// /*prefill_poss=*/ std::vector({4}), +// }); +// DMOE_LOG(DEBUG) << "thread " << i << "sleeping" << LEND; +// std::this_thread::sleep_for(std::chrono::milliseconds(10000)); +// // c->send_metadata(Metadata { +// // /*shape=*/ std::vector({1, 4}), +// // /*dtype=*/ "fp16", +// // /*layer_id=*/ 1, +// // /*req_ids=*/ std::vector({rank * 10 + 1}), +// // /*exp_ids=*/ std::vector({3}), +// // /*prefill_poss=*/ std::vector({4}), +// // }); +// } else { +// Metadata meta; +// DMOE_LOG(DEBUG) << "receiving metadata" << LEND; +// c->recv_metadata(meta); +// DMOE_LOG(DEBUG) << "get " << meta << LEND; +// c->recv_metadata(meta); +// DMOE_LOG(DEBUG) << "get " << meta << LEND; +// } +// } else { +// std::this_thread::sleep_for(std::chrono::milliseconds(2000)); +// DMOE_LOG(WARNING) << "thread " << i << " trying to allocate tensor" << LEND; +// void* data; +// CUDACHECK(cudaMallocAsync(&data, 4096, stream)); +// CUDACHECK(cudaStreamSynchronize(stream)); +// // c->all_reduce(data, {1, 4096}); +// DMOE_LOG(WARNING) << "allocated tensor" << LEND; +// } +// }, uids[i], i +// )); +// } +// for (auto &t: threads) +// t.join(); +// DMOE_LOG(INFO) << "rank " << rank << " passed" << LEND; +// } \ No newline at end of file diff --git a/csrc/include/comm.h b/csrc/include/comm.h index a50ce54..a73eb72 100644 --- a/csrc/include/comm.h +++ b/csrc/include/comm.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "cuda_runtime.h" #include "zmq.hpp" @@ -29,10 +31,11 @@ class Channel { public: Channel(int party_local, int party_other): local(party_local), other(party_other) {} + ~Channel() {} virtual void instantiate() = 0; - virtual void send(uintptr_t data, const Metadata& metadata) = 0; - virtual void recv(uintptr_t data, const Metadata& metadata) = 0; + virtual void send(torch::Tensor tensor, const Metadata& metadata) = 0; + virtual void recv(torch::Tensor tensor, const Metadata& metadata) = 0; void _debug_print() { printf("%d %d\n", local, other); @@ -59,6 +62,8 @@ class NcclChannel: public Channel { ncclComm_t comm; cudaStream_t stream; + void _delay_release_tensor(torch::Tensor tensor, cudaStream_t stream); + public: NcclChannel(int party_local, int party_other, ncclUniqueId comm_id, cudaStream_t stream = nullptr); @@ -66,21 +71,76 @@ class NcclChannel: public Channel { void instantiate() override; - void send(uintptr_t data, const Metadata& metadata) override; + void send(torch::Tensor tensor, const Metadata& metadata) override; - void recv(uintptr_t data, const Metadata& metadata) override; + void recv(torch::Tensor tensor, const Metadata& metadata) override; void sync() override; }; typedef std::shared_ptr mq_t; +class ZmqChannelHelper { +public: + struct Item; + +protected: + std::thread t; + bool end_flag; + mq_t mq; + cudaStream_t stream; + + std::queue queue; + std::mutex mutex, mtx_end; + std::condition_variable cv, cv_end; + + virtual void run(Item &item) = 0; + +public: + struct Item { + void* dst {0}; + size_t size {0}; + void* src {0}; + void* event {0}; + }; + + ZmqChannelHelper(mq_t mq, cudaStream_t stream); + + void terminate(); + + void put(void* dst, size_t size, void* src = nullptr, void* event = 0); + + Item get(); + + void start(); + + void sync(); +}; + +class ZmqChannelSender: public ZmqChannelHelper { +protected: + void run(Item &item) override; + +public: + ZmqChannelSender(mq_t mq, cudaStream_t stream): ZmqChannelHelper(mq, stream) {} +}; + +class ZmqChannelRecver: public ZmqChannelHelper { +protected: + void run(Item &item) override; + +public: + ZmqChannelRecver(mq_t mq, cudaStream_t stream): ZmqChannelHelper(mq, stream) {} +}; + class ZmqChannel: public Channel { + protected: static std::map global_mq; zmq::context_t ctx; mq_t mq; - cudaStream_t stream; + cudaStream_t stream_send, stream_recv; + ZmqChannelHelper* helper; std::string other_ip; bool is_sender; @@ -88,16 +148,17 @@ class ZmqChannel: public Channel { int rank_offset; - void* _tensor_copy(uintptr_t src, const Metadata& metadata, bool to_gpu, uintptr_t dst = 0); + void* pin_buffer; public: ZmqChannel(int party_local, int party_other, bool is_sender, int rank = 0); + ~ZmqChannel(); void instantiate() override; - void send(uintptr_t data, const Metadata& metadata) override; + void send(torch::Tensor tensor, const Metadata& metadata) override; - void recv(uintptr_t data, const Metadata &metadata) override; + void recv(torch::Tensor tensor, const Metadata &metadata) override; }; class NcclGroupChannel: public NcclChannel { @@ -117,20 +178,20 @@ class NcclGroupChannel: public NcclChannel { int root() const; - void broadcast(void* send_buf, void* recv_buf, size_t count, ncclDataType_t type, cudaStream_t stream=nullptr); + void broadcast(torch::Tensor send_tensor, torch::Tensor recv_tensor, size_t count, ncclDataType_t type, cudaStream_t stream=nullptr); public: NcclGroupChannel(int party_local, const std::vector &party_all, ncclUniqueId comm_id, cudaStream_t stream = nullptr); void instantiate() override; - void send(uintptr_t data, const Metadata& metadata) override; + void send(torch::Tensor tensor, const Metadata& metadata) override; - void recv(uintptr_t data, const Metadata& metadata) override; + void recv(torch::Tensor tensor, const Metadata& metadata) override; void synchronize(); - void send_recv(uintptr_t data, const Metadata& metadata); + void send_recv(torch::Tensor tensor, const Metadata& metadata); void bcast_obj(void* &buf, size_t &size); @@ -138,7 +199,7 @@ class NcclGroupChannel: public NcclChannel { void recv_metadata(Metadata& metadata); - void all_reduce(uintptr_t data, const std::vector &shape); + void all_reduce(torch::Tensor tensor, const std::vector &shape); }; Channel_t create_channel(int party_local, int party_other, void *nccl_id_raw); diff --git a/csrc/include/constants.h b/csrc/include/constants.h index a178fe7..f582542 100644 --- a/csrc/include/constants.h +++ b/csrc/include/constants.h @@ -40,7 +40,14 @@ const int ZMQ_OFFSET_BASE = 16; #ifndef MAX_N_EXPERTS #define MAX_N_EXPERTS 8 +#endif + +#ifndef PIPE_COMM_STEP +#define PIPE_COMM_STEP 256 +#endif +#ifndef TRACER_GAP_US +#define TRACER_GAP_US 0 #endif #define ASSERT(condition) do {if (!(condition)) { \ diff --git a/csrc/include/distributed.hpp b/csrc/include/distributed.hpp index 810c0f2..2a979bd 100644 --- a/csrc/include/distributed.hpp +++ b/csrc/include/distributed.hpp @@ -46,7 +46,12 @@ static std::string get_ip_of_device(int device_id) { } ifs.close(); } - return device_id_2_ip.at(device_id); + if (device_id_2_ip.find(device_id) == device_id_2_ip.end()) { + DMOE_LOG(ERROR) << "device_id " << device_id << " not found" << LEND; + return "0.0.0.0"; + } else { + return device_id_2_ip.at(device_id); + } } inline std::string get_zmq_addr(int device_id, bool is_gpu = true, int manual_port = -1, int offset = 0) { diff --git a/csrc/include/muhelper.h b/csrc/include/muhelper.h index cdb061d..65e8048 100644 --- a/csrc/include/muhelper.h +++ b/csrc/include/muhelper.h @@ -59,7 +59,7 @@ class MuDispatcher: public MuHelper { virtual void _send_once(TensorBatch batch) = 0; - void _send_batch(int cid, uintptr_t buf, const Metadata& meta); + void _send_batch(int cid, torch::Tensor tensor, const Metadata& meta); void run() override; @@ -146,7 +146,7 @@ class MuPool: public MuHelper { void recv_metadata(int &peer_id, metadata_t &meta); - void recv_tensor(int peer_id, uintptr_t tensor_buf, metadata_t &meta); + void recv_tensor(int peer_id, torch::Tensor tensor, metadata_t &meta); virtual void process_batch(torch::Tensor tensor, metadata_t &meta, bool send_from_zmq=true); diff --git a/csrc/include/profiler.hpp b/csrc/include/profiler.hpp index 8ac6d46..2f1595f 100644 --- a/csrc/include/profiler.hpp +++ b/csrc/include/profiler.hpp @@ -46,6 +46,7 @@ class Recorder { public: Recorder(const char *enabled) { + DMOE_LOG(DEBUG) << "Recorder enable status: " << enabled << LEND; this->enabled = enabled && strcmp(enabled, "1") == 0; } @@ -54,7 +55,10 @@ class Recorder { return; auto tid = std::this_thread::get_id(); DMOE_LOG(DEBUG) << "Creating thread " << tid << LEND; - ASSERT(ctx.find(tid) == ctx.end()); + if (ctx.find(tid) != ctx.end()) { + DMOE_LOG(WARNING) << "Thread " << tid << " already exists" << LEND; + return; + } ctx[tid] = std::vector(); stack[tid] = std::stack>(); } @@ -78,7 +82,7 @@ class Recorder { auto top = stack.at(tid).top(); stack.at(tid).pop(); - if ((ts - top.first) * 1000 > 10) // only > 10us is commited + if ((ts - top.first) * 1000 > TRACER_GAP_US) // only time > gap is commited ctx.at(tid).push_back(TraceContext{top.second, top.first, ts - top.first, (int) stack.at(tid).size()}); } diff --git a/csrc/include/tests.h b/csrc/include/tests.h index 0bf612c..270d0f0 100644 --- a/csrc/include/tests.h +++ b/csrc/include/tests.h @@ -1,4 +1,6 @@ #include #include -void test_op_overlap(int rank, std::vector ranks, std::string uid); \ No newline at end of file +void test_op_overlap(int rank, std::vector ranks, std::string uid); + +void test_zmq_overlap(int rank); \ No newline at end of file diff --git a/csrc/muhelper/comm.cpp b/csrc/muhelper/comm.cpp index 23562fd..3fc1963 100644 --- a/csrc/muhelper/comm.cpp +++ b/csrc/muhelper/comm.cpp @@ -2,9 +2,16 @@ #include "logging.h" #include "utils.hpp" #include "distributed.hpp" +#include "profiler.hpp" +#include #include +#include +#include #include +#include + +using Tensor = torch::Tensor; NcclChannel::NcclChannel(int party_local, int party_other, ncclUniqueId comm_id, cudaStream_t stream): Channel::Channel(party_local, party_other), comm_id(comm_id) @@ -48,25 +55,36 @@ void NcclChannel::instantiate() { )); } -void NcclChannel::send(uintptr_t data_ptr, const Metadata& metadata) { +void NcclChannel::_delay_release_tensor(Tensor tensor, cudaStream_t stream) { + struct TensorWarp { + Tensor tensor; + }; + TensorWarp* warp = new TensorWarp{std::move(tensor)}; + CUDACHECK(cudaStreamAddCallback(stream, [](cudaStream_t stream, cudaError_t status, void* data) { + TensorWarp* warp = (TensorWarp*) data; + delete warp; + }, (void*) warp, 0)); +} + +void NcclChannel::send(Tensor tensor, const Metadata& metadata) { // DMOE_LOG(INFO) << "NCCL sending: " << local << " " << other << LEND; tx_range _{"NcclChannel::send"}; - void* data = reinterpret_cast(data_ptr); + void* data = reinterpret_cast(tensor.data_ptr()); NCCLCHECK(ncclSend( - data, + data, /*count=*/ metadata.num_element(), /*datatype=*/ metadata.get_nccl_datatype(), /*peer=*/ this->m_other(), this->comm, this->stream )); - // CUDACHECK(cudaStreamSynchronize(this->stream)); + _delay_release_tensor(tensor, this->stream); // DMOE_LOG(INFO) << "NCCL sent " << local << " " << other << LEND; } -void NcclChannel::recv(uintptr_t data_ptr, const Metadata& metadata) { +void NcclChannel::recv(Tensor tensor, const Metadata& metadata) { tx_range _{"NcclChannel::recv"}; - void* data = reinterpret_cast(data_ptr); + void* data = reinterpret_cast(tensor.data_ptr()); NCCLCHECK(ncclRecv( data, /*count=*/ metadata.num_element(), @@ -86,12 +104,22 @@ ZmqChannel::ZmqChannel(int party_local, int party_other, bool is_sender, int ran Channel(party_local, party_other), is_sender(is_sender), rank_offset(rank) { sprintf(device_id_str, "%d", party_local); if (!is_embedding_node(party_local)) { - CUDACHECK(cudaStreamCreateWithPriority(&this->stream, cudaStreamNonBlocking, 10)); + CUDACHECK(cudaStreamCreateWithPriority(&this->stream_send, cudaStreamNonBlocking, 10)); + CUDACHECK(cudaStreamCreateWithPriority(&this->stream_recv, cudaStreamNonBlocking, 10)); + CUDACHECK(cudaMallocHost(&this->pin_buffer, 1024 * 4096 * 4)); } else { - this->stream = 0; + this->stream_send = 0; + this->stream_recv = 0; } } +ZmqChannel::~ZmqChannel() { + if (!is_embedding_node(local)) { + this->helper->terminate(); + delete this->helper; + } +} + std::map ZmqChannel::global_mq = {}; std::mutex global_mutex; @@ -108,71 +136,205 @@ void ZmqChannel::instantiate() { this->mq->connect(get_zmq_addr(other, /*is_gpu=*/ false, /*manual_port=*/ -1, /*offset=*/ this->rank_offset)); } DMOE_LOG(INFO) << "ZmqChannel instantiated " << this->local << LEND; + + if (!is_embedding_node(local)) { + if (is_sender) { + this->helper = new ZmqChannelSender(this->mq, this->stream_send); + } else { + this->helper = new ZmqChannelRecver(this->mq, this->stream_recv); + } + this->helper->start(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } -void* ZmqChannel::_tensor_copy(uintptr_t data, const Metadata& metadata, bool to_gpu, uintptr_t dst) { - if (is_embedding_node(this->local)) - return (void*) data; - tx_range _{"ZmqChannel::_tensor_copy"}; - uintptr_t buf; - cudaMemcpyKind flag; - if (!to_gpu) { - size_t size = metadata.num_element() * metadata.get_datatype_size(); - buf = !dst ? (uintptr_t) std::malloc(size) : dst; - flag = cudaMemcpyKind::cudaMemcpyDeviceToHost; - } else { - buf = dst; - flag = cudaMemcpyKind::cudaMemcpyHostToDevice; +ZmqChannelHelper::ZmqChannelHelper(mq_t mq, cudaStream_t stream): + mq(mq), stream(stream), mutex(), queue(), cv(), end_flag(false) {} + +void ZmqChannelHelper::terminate() { + this->end_flag = true; + put(0, 0, 0); + this->t.join(); +} + +void ZmqChannelHelper::put(void* buf, size_t size, void* buf_recv, void* event) { + std::lock_guard lock(mutex); + queue.push({buf, size, buf_recv, event}); + cv.notify_one(); +} + +using Item = ZmqChannelHelper::Item; + +Item ZmqChannelHelper::get() { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return !queue.empty(); }); + auto item = queue.front(); + queue.pop(); + return item; +} + +void ZmqChannelHelper::start() { + this->t = std::thread([&] { + Recorder::create(); + while (!end_flag) { + auto item = get(); + if (item.size == 0) + break; + run(item); + } + DMOE_LOG(WARNING) << "Quiting ZmqChannelHelper thread" << LEND; + }); +} + +void ZmqChannelHelper::sync() { + std::unique_lock lock(mtx_end); + cv_end.wait(lock); +} + +void ZmqChannelSender::run(Item &item) { + tx_range _{"ZmqChannel::_pipeline_comm_async_send"}; + DMOE_LOG(DEBUG) << "Sending " << item.size << LEND; + mq->send(zmq::buffer(item.dst, item.size), zmq::send_flags::dontwait); + if (item.event != 0) { + std::lock_guard lock(this->mtx_end); + this->cv_end.notify_one(); } + DMOE_LOG(DEBUG) << "Sent " << item.size << LEND; +} - { - tx_range __{"ZmqChannel::_tensor_copy_memcpy_submit"}; - const size_t step = 4; - const size_t dim_stride = metadata.get_datatype_size() * metadata.token_hidden_dim(); - size_t num_tokens = metadata.shape[0]; - for (size_t i = 0; i < metadata.shape[0]; i += step) { - size_t cur_step = std::min(step, metadata.shape[0] - i); +void ZmqChannelRecver::run(Item &item) { + tx_range _{"ZmqChannel::_pipeline_comm_async_copy"}; + CUDACHECK(cudaMemcpyAsync( + item.dst, + item.src, + item.size, + cudaMemcpyHostToDevice, + stream + )); + if (item.event != 0) { + std::lock_guard lock(this->mtx_end); + this->cv_end.notify_one(); + } +} + +void _pipeline_recv(void* data, ZmqChannelHelper* helper, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { + tx_range _{"ZmqChannel::_pipeline_comm"}; + + const size_t step = PIPE_COMM_STEP; + + cudaEvent_t event; + CUDACHECK(cudaEventCreate(&event)); + zmq::message_t msg[(num_tokens + step - 1) / step]; + for (size_t i = 0; i < num_tokens; i += step) { + DMOE_LOG(DEBUG) << "Receiving " << i << " " << num_tokens << LEND; + size_t cur_step = std::min(step, num_tokens - i); + + void* dst_buf = data + i * token_size; + void* src_buf; + { + tx_range __{"ZmqChannel::_pipeline_comm_recv"}; + auto err = mq->recv(msg[i / step], zmq::recv_flags::none); + src_buf = msg[i / step].data(); + } + + DMOE_LOG(DEBUG) << "Putting " << i << " " << num_tokens << LEND; + helper->put(dst_buf, cur_step * token_size, src_buf, + i + step < num_tokens ? 0 : event); + } + + tx_range __{"ZmqChannel::_pipeline_comm_sync"}; + cudaStreamSynchronize(stream); + helper->sync(); +} + +void _pipeline_send(void* data, ZmqChannelHelper* helper, size_t num_tokens, size_t token_size, cudaStream_t stream, mq_t &mq) { + tx_range _{"ZmqChannel::_pipeline_comm"}; + + const size_t step = PIPE_COMM_STEP; + void* cpu_buf = std::malloc(num_tokens * token_size); + std::mutex mutex; + + for (size_t i = 0; i < num_tokens; i += step) { + size_t cur_step = std::min(step, num_tokens - i); + + void* src_buf = data + i * token_size; + void* dst_buf = cpu_buf + i * token_size; + + { + tx_range __{"ZmqChannel::_pipeline_comm_async_copy"}; CUDACHECK(cudaMemcpyAsync( - (void*) (buf + i * dim_stride), - (void*) (data + i * dim_stride), - cur_step * dim_stride, - flag, - this->stream + dst_buf, + src_buf, + cur_step * token_size, + cudaMemcpyDeviceToHost, + stream )); } + + helper->put(dst_buf, cur_step * token_size, 0, + (void*) (i + step < num_tokens ? 0x0 : 0x1)); } - CUDACHECK(cudaStreamSynchronize(this->stream)); - return (void*) buf; + helper->sync(); + std::free(cpu_buf); } -void ZmqChannel::send(uintptr_t data, const Metadata& metadata) { +void ZmqChannel::send(Tensor tensor, const Metadata& metadata) { tx_range _{"ZmqChannel::send"}; // DMOE_LOG(DEBUG) << "ZmqChannel Sending to " << get_peer_id() << LEND; - void* buf = this->_tensor_copy(data, metadata, /*to_gpu=*/ false); - size_t size = metadata.num_element() * metadata.get_datatype_size(); - // DMOE_LOG(DEBUG) << "send size: " << size << " rank: " << this->rank_offset << LEND; - this->mq->send(zmq::buffer(buf, size)); - - // if (data != (uintptr_t) buf) - // std::free(buf); + if (is_embedding_node(this->local)) { + const int step = PIPE_COMM_STEP; + void* buf = (void*) tensor.data_ptr(); + size_t size = metadata.num_element() * metadata.get_datatype_size(); + size_t step_size = step * metadata.token_hidden_dim() * metadata.get_datatype_size(); + for (size_t i = 0; i < size; i += step_size) { + this->mq->send(zmq::buffer(buf + i, std::min(step_size, size - i)), zmq::send_flags::dontwait); + } + } else { + _pipeline_send( + (void*) tensor.data_ptr(), + this->helper, + metadata.num_tokens(), + metadata.token_hidden_dim() * metadata.get_datatype_size(), + this->stream_send, + this->mq + ); + } // DMOE_LOG(DEBUG) << "ZMQ Sent." << LEND; } -void ZmqChannel::recv(uintptr_t data, const Metadata &metadata) { +void ZmqChannel::recv(Tensor tensor, const Metadata &metadata) { tx_range _{"ZmqChannel::recv"}; // DMOE_LOG(DEBUG) << "ZMQ Recving from " << get_peer_id() << LEND; - size_t size = metadata.num_element() * metadata.get_datatype_size(); - zmq::message_t msg(size); - // DMOE_LOG(DEBUG) << "recv size: " << size << " rank: " << this->rank_offset << LEND; - auto err = this->mq->recv(msg, zmq::recv_flags::none); - this->_tensor_copy((uintptr_t) msg.data(), metadata, - /*to_gpu=*/ !is_embedding_node(local), data); + if (is_embedding_node(this->local)) { + size_t size = metadata.num_element() * metadata.get_datatype_size(); + size_t step_size = PIPE_COMM_STEP * metadata.token_hidden_dim() * metadata.get_datatype_size(); + zmq::message_t msg(step_size); + void* buf = (void*) tensor.data_ptr(); + DMOE_LOG(DEBUG) << "launching recv loop" << LEND; + for (size_t i = 0; i < size; i += step_size) { + DMOE_LOG(DEBUG) << "Receiving " << i << "/" << size << LEND; + // std::this_thread::sleep_for(std::chrono::microseconds(1)); + auto err = this->mq->recv(msg, zmq::recv_flags::none); + ASSERT(msg.size() == std::min(step_size, size - i)); + memcpy(buf + i, msg.data(), std::min(step_size, size - i)); + } + DMOE_LOG(DEBUG) << "recv loop finished" << LEND; + } else { + _pipeline_recv( + (void*) tensor.data_ptr(), + this->helper, + metadata.num_tokens(), + metadata.token_hidden_dim() * metadata.get_datatype_size(), + this->stream_recv, + this->mq + ); + } // DMOE_LOG(DEBUG) << "ZMQ Recved" << LEND; } @@ -251,12 +413,12 @@ int NcclGroupChannel::root() const { return 0; } -void NcclGroupChannel::broadcast(void* send_buf, void* recv_buf, size_t count, ncclDataType_t type, cudaStream_t stream) { +void NcclGroupChannel::broadcast(Tensor send_tensor, Tensor recv_tensor, size_t count, ncclDataType_t type, cudaStream_t stream) { tx_range _{"NcclGroupChannel::broadcast"}; // DMOE_LOG(DEBUG) << "broadcasting " << root() << " " << local_rank << " " << count << " on the stream " << this->stream << LEND; NCCLCHECK(ncclBroadcast( - send_buf, - recv_buf, + (void*) send_tensor.data_ptr(), + (void*) recv_tensor.data_ptr(), count, type, root(), @@ -267,25 +429,24 @@ void NcclGroupChannel::broadcast(void* send_buf, void* recv_buf, size_t count, n // DMOE_LOG(DEBUG) << "finished broadcast " << root() << " " << local_rank << " " << count << " on the stream " << this->stream << LEND; } -void NcclGroupChannel::send(uintptr_t data_ptr, const Metadata& metadata) { +void NcclGroupChannel::send(Tensor tensor, const Metadata& metadata) { tx_range _{"NcclGroupChannel::send"}; ASSERT(is_root()); // DMOE_LOG(DEBUG) << "NcclGroupChannel Sending from " << this->local << LEND; - send_recv(data_ptr, metadata); + send_recv(std::move(tensor), metadata); // DMOE_LOG(DEBUG) << "NcclGroupChannel Sent." << LEND; } -void NcclGroupChannel::recv(uintptr_t data_ptr, const Metadata& metadata) { +void NcclGroupChannel::recv(Tensor tensor, const Metadata& metadata) { tx_range _{"NcclGroupChannel::recv"}; ASSERT(!is_root()); // DMOE_LOG(DEBUG) << "NcclGroupChannel Recving from " << this->local << LEND; - send_recv(data_ptr, metadata); + send_recv(std::move(tensor), metadata); // DMOE_LOG(DEBUG) << "NcclGroupChannel Recved." << LEND; } -void NcclGroupChannel::send_recv(uintptr_t data_ptr, const Metadata& metadata) { - broadcast(reinterpret_cast(data_ptr), reinterpret_cast(data_ptr), - metadata.num_element(), metadata.get_nccl_datatype()); +void NcclGroupChannel::send_recv(Tensor tensor, const Metadata& metadata) { + broadcast(tensor, tensor, metadata.num_element(), metadata.get_nccl_datatype()); } void NcclGroupChannel::bcast_obj(void* &buf, size_t &size) { @@ -311,7 +472,7 @@ void NcclGroupChannel::bcast_obj(void* &buf, size_t &size) { void* data_buf = (void*) this->buffer_gpu.data_ptr(); CUDACHECK(cudaMemcpy(data_buf, buf, size, cudaMemcpyKind::cudaMemcpyHostToDevice)); - broadcast(data_buf, data_buf, size, ncclInt8); + broadcast(this->buffer_gpu, this->buffer_gpu, size, ncclInt8); } else { // first recv size // [option 1] use zmq to recv @@ -332,7 +493,7 @@ void NcclGroupChannel::bcast_obj(void* &buf, size_t &size) { // then recv data void* data_buf = (void*) this->buffer_gpu.data_ptr(); - broadcast(data_buf, data_buf, size, ncclInt8); + broadcast(this->buffer_gpu, this->buffer_gpu, size, ncclInt8); buf = std::malloc(size); CUDACHECK(cudaMemcpy(buf, data_buf, size, cudaMemcpyKind::cudaMemcpyDeviceToHost)); // DMOE_LOG(DEBUG) << "received metadata " << *decerealize((char*) buf, size) << LEND; @@ -363,9 +524,9 @@ void NcclGroupChannel::recv_metadata(Metadata& metadata) { // DMOE_LOG(DEBUG) << "NcclGroupChannel Recved metadata." << LEND; } -void NcclGroupChannel::all_reduce(uintptr_t data, const std::vector &shape) { +void NcclGroupChannel::all_reduce(Tensor tensor, const std::vector &shape) { tx_range _{"NcclGroupChannel::all_reduce"}; - void* buf = reinterpret_cast(data); + void* buf = reinterpret_cast(tensor.data_ptr()); int count = 1; for (int i: shape) count *= i; @@ -374,7 +535,7 @@ void NcclGroupChannel::all_reduce(uintptr_t data, const std::vector &shape) buf, buf, count, - ncclBfloat16, // !FIXME(hogura|20241106): remove this hardcode + ncclBfloat16, ncclSum, this->comm, this->stream diff --git a/csrc/muhelper/embedding.cpp b/csrc/muhelper/embedding.cpp index a8d3c31..fc6c074 100644 --- a/csrc/muhelper/embedding.cpp +++ b/csrc/muhelper/embedding.cpp @@ -62,7 +62,7 @@ void Sampler::run() { ); // auto tensor_buf = (uintptr_t) std::malloc(metadata->num_element() * metadata->get_datatype_size()); - this->peer_channels[peer_id]->recv((uintptr_t)tensor.data_ptr(), *metadata); + this->peer_channels[peer_id]->recv(tensor, *metadata); this->process_batch(tensor, metadata); } diff --git a/csrc/muhelper/muhelper.cpp b/csrc/muhelper/muhelper.cpp index b1d392c..ce42aee 100644 --- a/csrc/muhelper/muhelper.cpp +++ b/csrc/muhelper/muhelper.cpp @@ -21,6 +21,8 @@ #include +using Tensor = torch::Tensor; + // MuHelper MuHelper::MuHelper(std::vector layer_ids, int device_id, std::vector channels): @@ -83,7 +85,7 @@ bool MuDispatcher::_is_group_channel(int cid) const { return is_group_channels[cid]; } -void MuDispatcher::_send_batch(int cid, uintptr_t buf, const Metadata& meta) { +void MuDispatcher::_send_batch(int cid, Tensor tensor, const Metadata& meta) { tx_range _{"MuDispatcher::_send_batch"}; // DMOE_LOG(DEBUG) << "sending batch to channel " << cid << " current device: " << this->device_id_str << LEND; @@ -91,10 +93,10 @@ void MuDispatcher::_send_batch(int cid, uintptr_t buf, const Metadata& meta) { auto data = cerealize(std::make_shared(meta)); this->peer_mq[cid].send(zmq::str_buffer(this->device_id_str), zmq::send_flags::sndmore); this->peer_mq[cid].send(zmq::buffer(data.c_str(), data.size())); - this->channels[cid]->send(buf, meta); + this->channels[cid]->send(std::move(tensor), meta); } else { this->group_channels[cid]->send_metadata(meta); - this->group_channels[cid]->send(buf, meta); + this->group_channels[cid]->send(std::move(tensor), meta); } // DMOE_LOG(DEBUG) << "sent batch to channel " << cid << LEND; @@ -202,16 +204,15 @@ void MuAttnDispatcher::_send_once(TensorBatch batch) { // a faster path this->_send_batch( this->exp_channels[cid], - (uintptr_t)batch.data.data_ptr(), + batch.data, *batch.metadata ); break; } - auto buf = tensor_at((uintptr_t)batch.data.data_ptr(), *batch.metadata, i); this->_send_batch( this->exp_channels[cid], - buf, + batch.data.index({torch::indexing::Slice(i, j)}), batch.metadata->slice(i, j) ); i = j; @@ -282,7 +283,7 @@ void MuExpertDispatcher::_send_once(TensorBatch batch) { if (this->attn_channel[0].size() == 1 || layer_id >= this->attn_channel.size()) { this->_send_batch( _get_attn_channel(layer_id, 0), - (uintptr_t) batch.data.data_ptr(), + batch.data, *meta ); } else { @@ -297,14 +298,13 @@ void MuExpertDispatcher::_send_once(TensorBatch batch) { if (i == 0 && j == n) { this->_send_batch( channels[rank], - (uintptr_t) batch.data.data_ptr(), + batch.data, *meta ); } else { - auto buf = tensor_at((uintptr_t) batch.data.data_ptr(), *batch.metadata, i); this->_send_batch( channels[rank], - buf, + batch.data.index({torch::indexing::Slice(i, j)}), batch.metadata->slice(i, j) ); } @@ -367,16 +367,15 @@ void MuPool::recv_metadata(int &peer_id, metadata_t &meta) { meta = decerealize((char*) recv_msgs[1].data(), recv_msgs[1].size()); } -void MuPool::recv_tensor(int peer_id, uintptr_t tensor_buf, metadata_t &meta) { +void MuPool::recv_tensor(int peer_id, Tensor tensor, metadata_t &meta) { // DMOE_LOG(DEBUG) << "peer_id " << peer_id << " channelsize " << this->peer_channels.size() << LEND; ASSERT(0 <= peer_id && peer_id < this->peer_channels.size()); ASSERT(this->peer_channels[peer_id].get() != nullptr); ASSERT(meta.get() != nullptr); - ASSERT(tensor_buf != 0); - this->peer_channels[peer_id]->recv(tensor_buf, *meta); + this->peer_channels[peer_id]->recv(std::move(tensor), *meta); } -void MuPool::process_batch(torch::Tensor tensor, metadata_t &meta, bool send_from_zmq) { +void MuPool::process_batch(Tensor tensor, metadata_t &meta, bool send_from_zmq) { // TODO(hogura|20241014): sync prefill sequences /* TODO(shaoyuw|20241011): separate sequences into waiting queue and running queue @@ -433,12 +432,12 @@ void MuPool::run() { MuPool::recv_metadata(peer_id, meta); - torch::Tensor tensor = torch::empty( + Tensor tensor = torch::empty( {meta->num_tokens(), meta->token_hidden_dim()}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) ); - MuPool::recv_tensor(peer_id, (uintptr_t)tensor.data_ptr(), meta); + MuPool::recv_tensor(peer_id, tensor, meta); this->process_batch(tensor, meta, /*send_from_zmq=*/ true); } @@ -675,16 +674,16 @@ void MuAttentionPool::run() { Metadata meta; group_comm->recv_metadata(meta); - torch::Tensor tensor = torch::empty( + Tensor tensor = torch::empty( {meta.num_tokens(), meta.token_hidden_dim()}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) ); // DMOE_LOG(DEBUG) << "Worker AttnPool fetched result:" << meta << LEND; - group_comm->recv((uintptr_t) tensor.data_ptr(), meta); + group_comm->recv(tensor, meta); // DMOE_LOG(DEBUG) << "Worker AttnPool broadcast finished" << LEND; auto t_meta = std::make_shared(meta); - process_batch(tensor, t_meta, /*send_from_zmq=*/ false); + process_batch(std::move(tensor), t_meta, /*send_from_zmq=*/ false); } })); } @@ -710,13 +709,13 @@ void MuAttentionPool::run() { c->recv_metadata(meta); // DMOE_LOG(DEBUG) << "AttnPool fetched in stream " << c10_stream.stream() << " " << meta << LEND; - torch::Tensor tensor = torch::empty( + Tensor tensor = torch::empty( {meta.num_tokens(), meta.token_hidden_dim()}, torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) ); // DMOE_LOG(DEBUG) << "AttnPool created tensor" << LEND; - c->recv((uintptr_t)tensor.data_ptr(), meta); + c->recv(tensor, meta); // DMOE_LOG(DEBUG) << "AttnPool broadcast finished" << LEND; auto meta_t = std::make_shared(meta); @@ -736,7 +735,7 @@ void MuAttentionPool::terminate() { pool_thread.join(); } -AttentionBatch MuAttentionPool::pack_attn_batch(torch::Tensor tensor, metadata_t meta) { +AttentionBatch MuAttentionPool::pack_attn_batch(Tensor tensor, metadata_t meta) { // for a simple case we consider prefill sequences can only have 1 token, // so all sequences in tensor are complete and can be scheduled immediately ASSERT(meta.get() != nullptr); @@ -787,14 +786,14 @@ AttentionBatch MuAttentionPool::pack_attn_batch(torch::Tensor tensor, metadata_t return AttentionBatch {tensor, attn_meta}; } -void MuAttentionPool::process_batch(torch::Tensor tensor, metadata_t &meta, bool send_from_zmq) { +void MuAttentionPool::process_batch(Tensor tensor, metadata_t &meta, bool send_from_zmq) { // DMOE_LOG(DEBUG) << "AttnPool processing batch: " << meta << LEND; if (send_from_zmq && meta->layer_id == 0 && group_comm.get() != nullptr) { // since only driver can have the pool, we can send the data from layer 0 to other workers here. // NOTE(hogura|20241110): group_comm is only used when send_from_zmq, so it should be thread-safe // DMOE_LOG(DEBUG) << "Broadcasting attn batch to workers" << LEND; group_comm->send_metadata(*meta); - group_comm->send((uintptr_t) tensor.data_ptr(), *meta); + group_comm->send(tensor, *meta); // DMOE_LOG(DEBUG) << "Broadcast finished." << LEND; } diff --git a/csrc/tests/test_zmq_overlap.cpp b/csrc/tests/test_zmq_overlap.cpp new file mode 100644 index 0000000..fa3974c --- /dev/null +++ b/csrc/tests/test_zmq_overlap.cpp @@ -0,0 +1,58 @@ +#include "tests.h" +#include "comm.h" + +using Tensor = torch::Tensor; + +void test_zmq_overlap(int rank) { + const int bs = 256; + const int hs = 4096; + + Metadata meta = Metadata { + {bs, hs}, + "bf16", + 0, + std::vector(bs, 0), + std::vector(bs, 0), + std::vector(bs, 0), + std::vector(bs, 0) + }; + if (rank == 82) { + Tensor a = torch::empty( + {bs, hs}, + torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCPU) + ); + auto c_send = create_zmq_channel(rank, 0, true); + auto c_recv = create_zmq_channel(rank, 1, false); + c_recv->instantiate(); + c_send->instantiate(); + for (int i = 0; i < 5; i ++) { + c_recv->recv(a, meta); + DMOE_LOG(INFO) << "Sampler Received from <1>: " << a.mean().item() << LEND; + c_send->send(a, meta); + DMOE_LOG(INFO) << "Sampler Sent to <0>" << LEND; + } + } else { + Tensor a = torch::ones( + {bs, hs}, + torch::TensorOptions().dtype(torch::kBFloat16).device(torch::kCUDA, 0) + ) * (rank + 1); + if (rank == 0) { + // receive from zmq + auto c = create_zmq_channel(0, SAMPLER_DEV_ID, false); + c->instantiate(); + for (int i = 0; i < 5; i ++) { + c->recv(a, meta); + DMOE_LOG(INFO) << "<0> Received from sampler: " << a.mean().item() << LEND; + } + } else if (rank == 1) { + // send to zmq + auto c = create_zmq_channel(1, SAMPLER_DEV_ID, true); + c->instantiate(); + for (int i = 0; i < 5; i ++) { + c->send(a, meta); + DMOE_LOG(INFO) << "<1> Sent to sampler: " << a.mean().item() << LEND; + } + } + } + std::this_thread::sleep_for(std::chrono::seconds(1)); +} \ No newline at end of file diff --git a/tests/comm/test_zmq_overlap.py b/tests/comm/test_zmq_overlap.py new file mode 100644 index 0000000..f44f426 --- /dev/null +++ b/tests/comm/test_zmq_overlap.py @@ -0,0 +1,70 @@ +import ray +import os + +def func(rank: int): + import torch + from disagmoe_c import test_zmq_overlap, set_hosts, recorder_create, recorder_output + from disagmoe.frontend.datatypes import TraceContext + set_hosts(os.getpid(), { + 0: "0.0.0.0", + 1: "0.0.0.0", + 82: "0.0.0.0" + }) + profiler = torch.profiler.profile( + activities=[ + torch.profiler.ProfilerActivity.CPU, + torch.profiler.ProfilerActivity.CUDA, + ], + # with_stack=True, + on_trace_ready=torch.profiler.tensorboard_trace_handler( + dir_name="/home/hogura1999/DisagMoE/reports/zmq_overlap/", + worker_name=f"worker-{rank}", + use_gzip=True,)) + recorder_create() + # profiler.start() + test_zmq_overlap(rank) + # profiler.stop() + output = recorder_output() + result = {} + for key in output: + result[key] = [TraceContext.from_c(c) for c in output[key]] + return result + +def main(): + ray.init("auto") + env = { + "env_vars": { + # "CUDA_LAUNCH_BLOCKING": "1", + "ENABLE_NVTX": "1", + } + } + tasks = [ + ray.remote(func).options(num_gpus=1, runtime_env=env).remote(0), + ray.remote(func).options(num_gpus=1, runtime_env=env).remote(1), + ray.remote(func).options(num_gpus=0, runtime_env=env).remote(82) + ] + outputs = ray.get(tasks) + events = [] + ms_to_us = lambda ms: ms * 1000 + for pid, p_traces in enumerate(outputs): + for tid, t_traces in p_traces.items(): + print("outputing thread", tid) + tid = tid % (1 << 32) + for trace in t_traces: + events.append({ + "name": trace.msg, + "cat": "trace", + "ph": "X", + "ts": ms_to_us(trace.t_start), + "dur": ms_to_us(trace.t_dur), + "pid": pid, + "tid": (tid * 10 + trace.track_id) % (1 << 31), + }) + + import gzip + import json + with gzip.open(f"trace_zmq_overlap.json.gz", "w") as f: + f.write(json.dumps(events).encode("utf-8")) + + +main() \ No newline at end of file