Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3999d18
Use thread pool for request handlers.
ehpor Feb 11, 2026
98d2625
Make functions const.
ehpor Feb 11, 2026
bd55df5
Add networked message broker classes.
ehpor Feb 11, 2026
15bad80
Add Python bindings for remote message broker classes.
ehpor Feb 11, 2026
1c2e3e5
Add PeerConfig constructor.
ehpor Feb 11, 2026
d92e837
Add tests for remote message broker.
ehpor Feb 11, 2026
b3f5e6f
Add network message broker design file.
ehpor Feb 11, 2026
64c5a47
Return null message after final message has been published.
ehpor Feb 11, 2026
2e4a4e8
Set temporary frame id.
ehpor Feb 11, 2026
797f4b5
Make destructor of abstract class virtual.
ehpor Feb 11, 2026
1330801
Fix test layout.
ehpor Feb 11, 2026
a9167df
Temporarily add debug prints.
ehpor Feb 11, 2026
3e254fe
Consolidate RemoteMessageBroker functionality into a single file.
ehpor Feb 11, 2026
f4b2c97
Make message structs for consistent (de)serialization.
ehpor Feb 12, 2026
3bceb70
Add debug print statements.
ehpor Feb 12, 2026
74bb1ca
Have tests use fixtures to initialize their brokers.
ehpor Feb 12, 2026
ca79323
Let fixture create its own header memory.
ehpor Feb 12, 2026
73ab5ad
Explicitly use a LocalMessageBroker to back the RemoteBrokerServer.
ehpor Feb 14, 2026
9a1a0be
Refactor out unnecessary functions.
ehpor Feb 14, 2026
9827332
Make GetNextMessage and TryGetNextMessage non-public.
ehpor Feb 14, 2026
64b3e2c
Fix GetSize() functions for messages.
ehpor Feb 14, 2026
fb29c25
Add array info to messages.
ehpor Feb 14, 2026
9a1134a
Remove unnecessary sleeps.
ehpor Feb 14, 2026
0bc839d
Fix remote message broker tests.
ehpor Feb 14, 2026
a17e3ba
Don't log socket identity since it can contain non-ascii characters.
ehpor Feb 14, 2026
5a0f01e
Don't manually shut down the service.
ehpor Feb 14, 2026
718f0f3
Remove some debug print statements.
ehpor Feb 14, 2026
3c1087e
Update plan.
ehpor Feb 14, 2026
ec6e607
Wait for next message in chunks.
ehpor Feb 14, 2026
1050450
Publish received messages on LocalMessageBroker.
ehpor Feb 15, 2026
12b2f76
Add more bit operations.
ehpor Feb 16, 2026
b20e930
Refactor topic header to use bitmap and fully atomic operations.
ehpor Feb 16, 2026
1eb0c2a
Add message IDs to header.
ehpor Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 126 additions & 16 deletions catkit2/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "Uuid.h"
#include "ArrayView.h"
#include "ProcessStats.h"
#include "RemoteMessageBroker.h"

#include "testbed.pb.h"

Expand Down Expand Up @@ -900,6 +901,8 @@ PYBIND11_MODULE(catkit_bindings, m)
})
.def_property_readonly("filename", &SharedMemory::GetFileName);

py::class_<MessageBroker, std::shared_ptr<MessageBroker>>(m, "MessageBroker");

py::class_<LocalMemory, Memory, std::shared_ptr<LocalMemory>>(m, "LocalMemory")
.def_static("create", [](size_t num_bytes)
{
Expand Down Expand Up @@ -1087,7 +1090,7 @@ PYBIND11_MODULE(catkit_bindings, m)
return py::none();
});

py::class_<LocalMessageBroker, std::shared_ptr<LocalMessageBroker>>(m, "LocalMessageBroker")
py::class_<LocalMessageBroker, MessageBroker, std::shared_ptr<LocalMessageBroker>>(m, "LocalMessageBroker")
.def_static("create", [](std::shared_ptr<Memory> header, std::vector<std::shared_ptr<Memory>> memory_blocks)
{
auto stream = StructStream(header);
Expand All @@ -1102,12 +1105,6 @@ PYBIND11_MODULE(catkit_bindings, m)

return std::shared_ptr<LocalMessageBroker>(std::move(broker));
})
.def("prepare_message", [](std::shared_ptr<LocalMessageBroker> broker, const std::string& topic, size_t payload_size, std::uint8_t memory_block_id)
{
auto message = broker->PrepareMessage(topic, payload_size, memory_block_id);

return message;
}, py::arg("topic"), py::arg("payload_size"), py::arg("memory_block_id") = 0)
.def("prepare_message", [](std::shared_ptr<LocalMessageBroker> broker, const std::string& topic, size_t payload_size, py::object trace_id, std::uint8_t memory_block_id)
{
if (trace_id.is_none())
Expand Down Expand Up @@ -1168,26 +1165,23 @@ PYBIND11_MODULE(catkit_bindings, m)
broker->PublishArray(topic, array_view, py::cast<Uuid>(trace_id), memory_block_id);
}
}, py::arg("topic"), py::arg("array"), py::arg("trace_id") = py::none(), py::arg("memory_block_id") = 0)
.def("try_get_message", [](std::shared_ptr<LocalMessageBroker> broker, std::string_view topic, size_t frame_id) -> py::object
.def("get_current_message", [](std::shared_ptr<LocalMessageBroker> broker, std::string_view topic) -> py::object
{
auto res = broker->TryGetMessage(topic, frame_id);
auto res = broker->GetCurrentMessage(topic);
if (res)
return py::cast(res.value());

return py::none();
}, py::arg("topic"), py::arg("frame_id"))
.def("get_current_message", [](std::shared_ptr<LocalMessageBroker> broker, std::string_view topic) -> py::object
}, py::arg("topic"))
.def("get_current_message_id", [](std::shared_ptr<LocalMessageBroker> broker, std::string_view topic) -> py::object
{
auto res = broker->GetCurrentMessage(topic);
auto res = broker->GetCurrentMessageId(topic);

if (res)
return py::cast(res.value());

return py::none();
}, py::arg("topic"))
.def("is_message_available", &LocalMessageBroker::IsMessageAvailable)
.def("will_message_be_available", &LocalMessageBroker::WillMessageBeAvailable)
.def("get_newest_message_id", &LocalMessageBroker::GetNewestMessageId)
.def("get_oldest_message_id", &LocalMessageBroker::GetOldestMessageId)
.def("get_message_rate", &LocalMessageBroker::GetMessageRate)
.def("get_all_message_topics", &LocalMessageBroker::GetAllMessageTopics)
.def("print_debug_info", &LocalMessageBroker::PrintDebugInfo)
Expand Down Expand Up @@ -1292,6 +1286,122 @@ PYBIND11_MODULE(catkit_bindings, m)
.def_property_readonly("memory_usage", &ProcessStats::GetMemoryUsage)
.def_property_readonly("cpu_usage", &ProcessStats::GetCpuUsage);

py::class_<PeerConfig>(m, "PeerConfig")
.def(py::init<>())
.def(py::init<std::string, std::string, int>(),
py::arg("name"),
py::arg("host"),
py::arg("port"))
.def_readwrite("name", &PeerConfig::name)
.def_readwrite("host", &PeerConfig::host)
.def_readwrite("port", &PeerConfig::port);

py::class_<RemoteBrokerServer>(m, "RemoteBrokerServer")
.def(py::init<std::shared_ptr<LocalMessageBroker>, uint16_t, int>(),
py::arg("broker"),
py::arg("port"),
py::arg("num_workers") = 4)
.def("start", &RemoteBrokerServer::Start)
.def("stop", &RemoteBrokerServer::Stop, py::call_guard<py::gil_scoped_release>())
.def_property_readonly("is_running", &RemoteBrokerServer::IsRunning);

py::class_<RemoteMessageBroker, MessageBroker, std::shared_ptr<RemoteMessageBroker>>(m, "RemoteMessageBroker")
.def(py::init<std::shared_ptr<LocalMessageBroker>, std::string, std::vector<PeerConfig>>(),
py::arg("local_broker"),
py::arg("local_machine_name"),
py::arg("peers"))
.def("prepare_message", [](std::shared_ptr<RemoteMessageBroker> broker, const std::string& topic, size_t payload_size, py::object trace_id, uint8_t memory_block_id)
{
if (trace_id.is_none())
{
return broker->PrepareMessage(topic, payload_size, memory_block_id);
}
else
{
return broker->PrepareMessage(topic, payload_size, py::cast<Uuid>(trace_id), memory_block_id);
}
}, py::arg("topic"), py::arg("payload_size"), py::arg("trace_id") = py::none(), py::arg("memory_block_id") = 0)
.def("publish_message", [](std::shared_ptr<RemoteMessageBroker> broker, Message& message, bool is_final)
{
broker->PublishMessage(message, is_final);
}, py::arg("message"), py::arg("is_final") = true)
.def("publish_data", [](std::shared_ptr<RemoteMessageBroker> broker, std::string topic, py::bytes data, py::object trace_id, uint8_t memory_block_id)
{
if (trace_id.is_none())
{
broker->PublishData(topic, PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()), memory_block_id);
}
else
{
broker->PublishData(topic, PyBytes_AsString(data.ptr()), PyBytes_Size(data.ptr()), py::cast<Uuid>(trace_id), memory_block_id);
}
}, py::arg("topic"), py::arg("data"), py::arg("trace_id") = py::none(), py::arg("memory_block_id") = 0)
.def("publish_array", [](std::shared_ptr<RemoteMessageBroker> broker, std::string topic, py::array array, py::object trace_id, uint8_t memory_block_id)
{
ArrayInfo info;

auto dtype = array.dtype();
info.data_type = dtype.kind();
info.item_size = dtype.itemsize();
info.byte_order = dtype.byteorder();

if (array.ndim() > MAX_NUM_DIMENSIONS)
throw std::runtime_error("Array dimension is too large.");

info.ndim = array.ndim();

for (size_t i = 0; i < info.ndim; ++i)
{
info.shape[i] = array.shape()[i];
info.strides[i] = array.strides()[i];
}

if (!info.IsCContiguous() && !info.IsFContiguous())
throw std::runtime_error("Array has to be either C or F contiguous.");

// All checks are complete. Let's copy/submit the raw data.
const ArrayView array_view{info, array.mutable_data()};
if (trace_id.is_none())
{
broker->PublishArray(topic, array_view, memory_block_id);
}
else
{
broker->PublishArray(topic, array_view, py::cast<Uuid>(trace_id), memory_block_id);
}
}, py::arg("topic"), py::arg("array"), py::arg("trace_id") = py::none(), py::arg("memory_block_id") = 0)
.def("get_current_message", [](std::shared_ptr<RemoteMessageBroker> broker, std::string_view topic) -> py::object
{
auto res = broker->GetCurrentMessage(topic);
if (res)
return py::cast(res.value());

return py::none();
}, py::arg("topic"))
.def("get_current_message_id", [](std::shared_ptr<LocalMessageBroker>broker, std::string_view topic) -> py::object
{
auto res = broker->GetCurrentMessageId(topic);

if (res)
return py::cast(res.value());

return py::none();
}, py::arg("topic"))
.def("get_message_rate", &RemoteMessageBroker::GetMessageRate)
.def("get_all_message_topics", &RemoteMessageBroker::GetAllMessageTopics)
.def("subscribe", [](std::shared_ptr<RemoteMessageBroker> broker, std::string topic, py::object preferred_next_frame_id, MessageSubscriptionMode mode)
{
// Check if the starting frame ID is a number or None.
if (preferred_next_frame_id.is_none())
{
return broker->Subscribe(topic, mode);
}
else
{
return broker->Subscribe(topic, py::cast<std::uint64_t>(preferred_next_frame_id), mode);
}
}, py::arg("topic"), py::arg("preferred_next_frame_id") = py::none(), py::arg("mode") = MessageSubscriptionMode::NewestOnly);

#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
Expand Down
9 changes: 7 additions & 2 deletions catkit_core/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
using namespace std;
using namespace zmq;

const int SOCKET_TIMEOUT = 60000; // milliseconds.
#define DEBUG_PRINT(msg) std::cerr << "[DEBUG] " << __func__ << ":" << __LINE__ << " - " << msg << std::endl
#define ERROR_PRINT(msg) std::cerr << "[ERROR] " << __func__ << ":" << __LINE__ << " - " << msg << std::endl

const int SOCKET_TIMEOUT = 60000;

Client::Client(std::string host, int port)
: m_Host(host), m_Port(port)
Expand All @@ -29,6 +32,7 @@ Client::~Client()

string Client::MakeRequest(const string &what, const string &request)
{
DEBUG_PRINT("what: " << what << ", request_size: " << request.size() << ", host: " << m_Host << ":" << m_Port);
auto socket = GetSocket();

zmq::multipart_t request_msg;
Expand Down Expand Up @@ -59,6 +63,7 @@ string Client::MakeRequest(const string &what, const string &request)

std::string reply_type = reply_msg.popstr();
std::string reply_data = reply_msg.popstr();
DEBUG_PRINT("reply_type: " << reply_type << ", reply_data: " << reply_data);

if (reply_type == "OK")
{
Expand Down Expand Up @@ -119,4 +124,4 @@ Client::socket_ptr Client::GetSocket()
{
this->m_Sockets.emplace(ptr);
});
}
}
Loading