-
Notifications
You must be signed in to change notification settings - Fork 8
Description
Hi! I'm recently using Thallium to build a low overhead client-server RPC architecture. I realize that when using Thallium RPC call in Separate Thread to transfer data from client to server is extremely slow than directly using it in Main Thread. I'm making some experiment script to test this phenomenal:
Server Script (server.cpp):
All client script are using the same server script.
// server.cpp
#include <thallium.hpp>
#include <torch/torch.h>
#include <iostream>
#include "tensor_packet.hpp"
using namespace thallium;
// RPC handler
void receive_tensor(const request& req, const TensorPacket& pkt) {
// rebuild tensor
torch::Tensor t = pkt.unpack();
std::cout << "[Server] Received tensor of shape " << t.sizes() << ":\n"
<< t << std::endl;
}
int main(int argc, char** argv) {
// create Thallium engine
engine svr("tcp://128.4.120.67:1234", THALLIUM_SERVER_MODE);
svr.define("receive_tensor", receive_tensor).disable_response();
std::cout << "[Server] Running at " << svr.self() << std::endl;
svr.wait_for_finalize();
return 0;
}
One script using Thallium RPC call in Main thread(not_threaded.cpp):
(This script finish in 230ms-250ms)
#include <thallium.hpp>
#include <torch/torch.h>
#include <iostream>
#include "tensor_packet.hpp"
namespace tl = thallium;
int main(int argc, char** argv) {
tl::engine eng("tcp", THALLIUM_CLIENT_MODE);
tl::endpoint srv = eng.lookup("ofi+tcp://128.4.120.67:1234");
auto ep = eng.define("receive_tensor").disable_response();
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < 10000; i++) {
TensorPacket pkt = TensorPacket::pack(torch::randn({3, 4}));
ep.on(srv)(pkt);
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Time taken: " << duration.count() << "ms" << std::endl;
return 0;
}
One script using Thallium RPC call in separate thread(threaded.cpp):
(This script is taking more than 1 minute..)
#include <thallium.hpp>
#include <torch/torch.h>
#include <iostream>
#include "tensor_packet.hpp"
namespace tl = thallium;
void thread_func(tl::engine& eng, tl::endpoint srv, tl::remote_procedure rpc) {
for (int i = 0; i < 10000; i++) {
TensorPacket pkt = TensorPacket::pack(torch::randn({3, 4}));
rpc.on(srv)(pkt); // Use async to avoid blocking
}
}
int main(int argc, char** argv) {
tl::engine eng("tcp", THALLIUM_CLIENT_MODE, true);
tl::endpoint srv = eng.lookup("ofi+tcp://128.4.120.67:1234");
tl::remote_procedure rpc = eng.define("receive_tensor").disable_response();
auto start = std::chrono::high_resolution_clock::now();
std::thread t1(thread_func, std::ref(eng), srv, rpc);
t1.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Time taken: " << duration.count() << "ms" << std::endl;
return 0;
}
I wonder if I use it in a wrong way or it is suppose to be like this. Or there are some conflict with argobot that I'm not awaring at? I'm new to HPC stuff like mercury, argobot and even C++. May I know what is causing this phenomenal and how to fix it?
Any clarification will be much appreciated! Thanks!
The platform I'm using is MacOS Sonoma 14.7.4
I download and install Thallium as stated in: Mochi Documentation
But during installing, I deleted package Redpandas due to MacOS don't support it. Other than that it's all the same as doc stated.
For context here is the tensor_packet.hpp I have:
#pragma once
#include <torch/torch.h>
#include <thallium.hpp>
#include <cstdint>
#include <vector>
#include <cereal/types/vector.hpp>
#include <cereal/types/string.hpp>
struct TensorPacket {
std::vector<int64_t> shape;
int32_t dtype; // torch::ScalarType
std::vector<uint8_t> bytes;
/* ---- pack ---- */
static TensorPacket pack(const torch::Tensor& t) {
TensorPacket p;
torch::Tensor c = t.contiguous().cpu();
p.shape.assign(c.sizes().begin(), c.sizes().end());
p.dtype = static_cast<int32_t>(c.scalar_type());
std::size_t nbytes = c.nbytes();
p.bytes.resize(nbytes);
std::memcpy(p.bytes.data(), c.data_ptr(), nbytes);
return p;
}
/* ---- unpack ---- */
torch::Tensor unpack() const {
auto opt = torch::dtype(static_cast<torch::ScalarType>(dtype)).device(torch::kCPU);
void* raw = const_cast<uint8_t*>(bytes.data());
return torch::from_blob(raw, shape, opt).clone(); // clone -> managed tensor
}
template<class A>
void serialize(A& ar) { ar(shape, dtype, bytes); } // cereal hook
};