Skip to content

Commit 0ce9429

Browse files
committed
Use preferred_next_frame_id rather than the last read id.
1 parent 54ba831 commit 0ce9429

6 files changed

Lines changed: 44 additions & 40 deletions

File tree

catkit2/bindings.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -965,6 +965,8 @@ PYBIND11_MODULE(catkit_bindings, m)
965965
py::class_<Message>(m, "Message")
966966
.def_property_readonly("topic", &Message::GetTopic)
967967
.def_property_readonly("trace_id", &Message::GetTraceId)
968+
.def_property_readonly("frame_id", &Message::GetFrameId)
969+
.def_property_readonly("partial_frame_id", &Message::GetPartialFrameId)
968970
.def_property_readonly("payload_id", &Message::GetPayloadId)
969971
.def_property_readonly("producer_hostname", &Message::GetProducerHostname)
970972
.def_property_readonly("producer_pid", &Message::GetProducerPid)
@@ -1164,18 +1166,18 @@ PYBIND11_MODULE(catkit_bindings, m)
11641166
.def("get_oldest_message_id", &LocalMessageBroker::GetOldestMessageId)
11651167
.def("get_message_rate", &LocalMessageBroker::GetMessageRate)
11661168
.def("get_all_message_topics", &LocalMessageBroker::GetAllMessageTopics)
1167-
.def("subscribe", [](std::shared_ptr<LocalMessageBroker> broker, std::string topic, py::object starting_frame_id, MessageSubscriptionMode mode)
1169+
.def("subscribe", [](std::shared_ptr<LocalMessageBroker> broker, std::string topic, py::object preferred_next_frame_id, MessageSubscriptionMode mode)
11681170
{
11691171
// Check if the starting frame ID is a number or None.
1170-
if (starting_frame_id.is_none())
1172+
if (preferred_next_frame_id.is_none())
11711173
{
11721174
return broker->Subscribe(topic, mode);
11731175
}
11741176
else
11751177
{
1176-
return broker->Subscribe(topic, py::cast<std::uint64_t>(starting_frame_id), mode);
1178+
return broker->Subscribe(topic, py::cast<std::uint64_t>(preferred_next_frame_id), mode);
11771179
}
1178-
}, py::arg("topic"), py::arg("starting_frame_id") = py::none(), py::arg("mode") = MessageSubscriptionMode::NewestOnly);
1180+
}, py::arg("topic"), py::arg("preferred_next_frame_id") = py::none(), py::arg("mode") = MessageSubscriptionMode::NewestOnly);
11791181

11801182
py::class_<PoolAllocator, std::shared_ptr<PoolAllocator>>(m, "PoolAllocator")
11811183
.def_static("create", [](std::shared_ptr<Memory> memory, std::uint32_t capacity)

catkit_core/LocalMessageBroker.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -504,9 +504,9 @@ Message LocalMessageBroker::FetchMessage(TopicHeader* topic_header, size_t frame
504504
return Message(header, payload, frame_id, true);
505505
}
506506

507-
std::uint64_t LocalMessageBroker::GetNextMessageId(TopicHeader *topic_header, size_t last_read_frame_id, MessageSubscriptionMode mode)
507+
std::uint64_t LocalMessageBroker::GetNextMessageId(TopicHeader *topic_header, size_t preferred_next_frame_id, MessageSubscriptionMode mode)
508508
{
509-
size_t frame_id = last_read_frame_id + 1;
509+
size_t frame_id = preferred_next_frame_id;
510510
size_t newest_frame_id = topic_header->last_frame_id.load(std::memory_order_relaxed);
511511
size_t oldest_frame_id = topic_header->first_frame_id.load(std::memory_order_relaxed);
512512

@@ -549,11 +549,11 @@ std::optional<Message> LocalMessageBroker::GetCurrentMessage(std::string_view to
549549
return FetchMessage(topic_header, frame_id);
550550
}
551551

552-
std::optional<Message> LocalMessageBroker::GetNextMessage(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode, double timeout_in_seconds, EventWaitMethod wait_type, void (*error_check)())
552+
std::optional<Message> LocalMessageBroker::GetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode, double timeout_in_seconds, EventWaitMethod wait_type, void (*error_check)())
553553
{
554554
auto topic_header = GetTopicHeader(topic);
555555

556-
std::uint64_t new_frame_id = GetNextMessageId(topic_header, last_read_frame_id, mode);
556+
std::uint64_t new_frame_id = GetNextMessageId(topic_header, preferred_next_frame_id, mode);
557557

558558
// Check if the frame is available.
559559
if (topic_header->IsMessageAvailable(new_frame_id))
@@ -566,11 +566,11 @@ std::optional<Message> LocalMessageBroker::GetNextMessage(std::string_view topic
566566
return FetchMessage(topic_header, new_frame_id);
567567
}
568568

569-
std::optional<Message> LocalMessageBroker::TryGetNextMessage(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode)
569+
std::optional<Message> LocalMessageBroker::TryGetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode)
570570
{
571571
auto topic_header = GetTopicHeader(topic);
572572

573-
std::uint64_t frame_id = GetNextMessageId(topic_header, last_read_frame_id, mode);
573+
std::uint64_t frame_id = GetNextMessageId(topic_header, preferred_next_frame_id, mode);
574574

575575
// Check if the frame is available.
576576
if (!topic_header->IsMessageAvailable(frame_id))

catkit_core/LocalMessageBroker.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ class LocalMessageBroker : public Shareable, public MessageBroker
8686
virtual std::optional<Message> GetCurrentMessage(std::string_view topic) override;
8787

8888
// Get the next message for a topic.
89-
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) override;
89+
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) override;
9090

9191
// Try to get the next message for a topic.
92-
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) override;
92+
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) override;
9393

9494
// Get the message rate for a topic.
9595
virtual double GetMessageRate(std::string_view topic) override;
@@ -116,7 +116,7 @@ class LocalMessageBroker : public Shareable, public MessageBroker
116116

117117
private:
118118
Message FetchMessage(TopicHeader *topic_header, size_t frame_id);
119-
std::uint64_t GetNextMessageId(TopicHeader *topic_header, size_t last_read_frame_id, MessageSubscriptionMode mode);
119+
std::uint64_t GetNextMessageId(TopicHeader *topic_header, size_t preferred_next_frame_id, MessageSubscriptionMode mode);
120120

121121
std::shared_ptr<HybridPoolAllocator> GetAllocator(uint8_t memory_block_id);
122122
std::shared_ptr<Memory> GetMemory(uint8_t memory_block_id);

catkit_core/MessageBroker.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,33 +141,33 @@ void Message::SetEndByte(std::uint64_t end_byte)
141141
m_Header->end_byte = end_byte;
142142
}
143143

144-
MessageSubscription::MessageSubscription(std::shared_ptr<MessageBroker> message_broker, std::string_view topic, std::uint64_t last_read_frame_id, MessageSubscriptionMode mode)
145-
: m_MessageBroker(message_broker), m_Topic(topic), m_LastReadFrameId(last_read_frame_id), m_SubscriptionMode(mode)
144+
MessageSubscription::MessageSubscription(std::shared_ptr<MessageBroker> message_broker, std::string_view topic, std::uint64_t preferred_next_frame_id, MessageSubscriptionMode mode)
145+
: m_MessageBroker(message_broker), m_Topic(topic), m_PreferredNextFrameId(preferred_next_frame_id), m_SubscriptionMode(mode)
146146
{
147147
}
148148

149149
std::optional<Message> MessageSubscription::GetNextMessage(double timeout_in_seconds, EventWaitMethod wait_type, void (*error_check)())
150150
{
151-
auto message = m_MessageBroker->GetNextMessage(m_Topic, m_LastReadFrameId, m_SubscriptionMode, timeout_in_seconds, wait_type, error_check);
151+
auto message = m_MessageBroker->GetNextMessage(m_Topic, m_PreferredNextFrameId, m_SubscriptionMode, timeout_in_seconds, wait_type, error_check);
152152

153153
if (!message.has_value())
154154
return message;
155155

156156
// We are going to return a message. Update our frame id for the next call.
157-
m_LastReadFrameId = message->GetFrameId() + 1;
157+
m_PreferredNextFrameId = message->GetFrameId() + 1;
158158

159159
return message;
160160
}
161161

162162
std::optional<Message> MessageSubscription::TryGetNextMessage()
163163
{
164-
auto message = m_MessageBroker->TryGetNextMessage(m_Topic, m_LastReadFrameId, m_SubscriptionMode);
164+
auto message = m_MessageBroker->TryGetNextMessage(m_Topic, m_PreferredNextFrameId, m_SubscriptionMode);
165165

166166
if (!message.has_value())
167167
return message;
168168

169169
// We are going to return a message. Update our frame id for the next call.
170-
m_LastReadFrameId = message->GetFrameId() + 1;
170+
m_PreferredNextFrameId = message->GetFrameId() + 1;
171171

172172
return message;
173173
}
@@ -229,7 +229,7 @@ MessageSubscription MessageBroker::Subscribe(std::string_view topic, MessageSubs
229229
return Subscribe(topic, starting_frame_id, mode);
230230
}
231231

232-
MessageSubscription MessageBroker::Subscribe(std::string_view topic, size_t starting_frame_id, MessageSubscriptionMode mode)
232+
MessageSubscription MessageBroker::Subscribe(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode)
233233
{
234-
return MessageSubscription(shared_from_this(), topic, starting_frame_id, mode);
234+
return MessageSubscription(shared_from_this(), topic, preferred_next_frame_id, mode);
235235
}

catkit_core/MessageBroker.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ class MessageSubscription
141141
std::optional<Message> TryGetNextMessage();
142142

143143
private:
144-
MessageSubscription(std::shared_ptr<MessageBroker> broker, std::string_view topic, std::uint64_t last_read_frame_id, MessageSubscriptionMode mode);
144+
MessageSubscription(std::shared_ptr<MessageBroker> broker, std::string_view topic, std::uint64_t preferred_next_frame_id, MessageSubscriptionMode mode);
145145

146146
std::shared_ptr<MessageBroker> m_MessageBroker;
147147

148148
std::string m_Topic;
149-
std::uint64_t m_LastReadFrameId;
149+
std::uint64_t m_PreferredNextFrameId;
150150
MessageSubscriptionMode m_SubscriptionMode;
151151
};
152152

@@ -157,8 +157,8 @@ class MessageBroker : public std::enable_shared_from_this<MessageBroker>
157157
virtual void PublishMessage(Message &message, bool is_final = true) = 0;
158158

159159
virtual std::optional<Message> GetCurrentMessage(std::string_view topic) = 0;
160-
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) = 0;
161-
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) = 0;
160+
virtual std::optional<Message> GetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly, double timeout_in_seconds = -1, EventWaitMethod wait_type = EventWaitMethod::Default, void (*error_check)() = nullptr) = 0;
161+
virtual std::optional<Message> TryGetNextMessage(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly) = 0;
162162

163163
virtual std::vector<std::string> GetAllMessageTopics() = 0;
164164
virtual double GetMessageRate(std::string_view topic) = 0;
@@ -173,7 +173,7 @@ class MessageBroker : public std::enable_shared_from_this<MessageBroker>
173173
void PublishArray(std::string_view topic, ArrayView array, Uuid trace_id, uint8_t memory_block_id = 0);
174174

175175
MessageSubscription Subscribe(std::string_view topic, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly);
176-
MessageSubscription Subscribe(std::string_view topic, size_t last_read_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly);
176+
MessageSubscription Subscribe(std::string_view topic, size_t preferred_next_frame_id, MessageSubscriptionMode mode = MessageSubscriptionMode::NewestOnly);
177177
};
178178

179179
#endif // MESSAGE_BROKER_H

tests/test_message_broker.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from catkit2.catkit_bindings import LocalMemory, MessageBroker, MessageSubscriptionMode
1+
from catkit2.catkit_bindings import LocalMemory, LocalMessageBroker, MessageSubscriptionMode
22
import numpy as np
33
import pytest
44

@@ -7,7 +7,7 @@ def broker():
77
header = LocalMemory.create(1024 * 1024 * 512)
88
block = LocalMemory.create(1024 * 1024 * 1024)
99

10-
broker = MessageBroker.create(header, [block])
10+
broker = LocalMessageBroker.create(header, [block])
1111
yield broker
1212

1313
def test_message_subscription(broker):
@@ -22,9 +22,6 @@ def test_message_subscription(broker):
2222
message.payload = arr
2323
broker.publish_message(message)
2424

25-
assert subscription_newest.next_message_id == 2
26-
assert subscription_sequential.next_message_id == 0
27-
2825
message_1 = subscription_newest.get_next_message(0.01)
2926
message_2 = subscription_sequential.get_next_message(0.01)
3027
message_3 = subscription_sequential.get_next_message(0.01)
@@ -33,6 +30,9 @@ def test_message_subscription(broker):
3330
assert message_2 is not None
3431
assert message_3 is not None
3532

33+
assert message_1.frame_id == 2
34+
assert message_2.frame_id == 0
35+
3636
assert message_1.payload[0] == 12
3737
assert message_2.payload[0] == 10
3838
assert message_3.payload[0] == 11
@@ -45,11 +45,13 @@ def test_message_subscription(broker):
4545

4646
assert subscription_sequential.try_get_next_message() is None
4747

48-
subscription_newest2 = broker.subscribe(topic, starting_frame_id=1, mode=MessageSubscriptionMode.NewestOnly)
49-
assert subscription_newest2.next_message_id == 2
48+
subscription_newest2 = broker.subscribe(topic, preferred_next_frame_id=1, mode=MessageSubscriptionMode.NewestOnly)
49+
m = subscription_newest2.get_next_message(0.01)
50+
assert m.frame_id == 2
5051

51-
subscription_sequential2 = broker.subscribe(topic, starting_frame_id=1, mode=MessageSubscriptionMode.Sequential)
52-
assert subscription_sequential2.next_message_id == 1
52+
subscription_sequential2 = broker.subscribe(topic, preferred_next_frame_id=1, mode=MessageSubscriptionMode.Sequential)
53+
m = subscription_sequential2.get_next_message(0.01)
54+
assert m.frame_id == 1
5355

5456
dtypes = ['int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64', 'float32', 'float64', 'complex64', 'complex128']
5557
shapes = [[10], [10, 10], [10, 10, 10], [10, 10, 10, 10]]
@@ -65,7 +67,7 @@ def test_message_dtype_and_shape(broker, shape, dtype):
6567
message.payload = arr
6668
broker.publish_message(message)
6769

68-
retrieved_message = broker.get_newest_message(topic)
70+
retrieved_message = broker.get_current_message(topic)
6971

7072
assert np.all(arr == retrieved_message.payload)
7173
assert retrieved_message.payload.dtype == dtype
@@ -98,7 +100,7 @@ def test_message_broker_publish(broker):
98100

99101
broker.publish_data(topic, data)
100102

101-
retrieved_message = broker.get_newest_message(topic)
103+
retrieved_message = broker.get_current_message(topic)
102104

103105
assert (retrieved_message.payload == arr).all()
104106

@@ -107,14 +109,14 @@ def test_message_broker_trace_id(broker):
107109
data = b'hello world'
108110

109111
broker.publish_data(topic, data)
110-
message_1 = broker.get_newest_message(topic)
112+
message_1 = broker.get_current_message(topic)
111113

112114
broker.publish_data(topic, data * 2)
113-
message_2 = broker.get_newest_message(topic)
115+
message_2 = broker.get_current_message(topic)
114116

115117
assert str(message_2.trace_id) != str(message_1.trace_id)
116118

117119
broker.publish_data(topic, data * 3, trace_id=message_1.trace_id)
118-
message_3 = broker.get_newest_message(topic)
120+
message_3 = broker.get_current_message(topic)
119121

120122
assert str(message_3.trace_id) == str(message_1.trace_id)

0 commit comments

Comments
 (0)