Skip to content

Commit 96f0b24

Browse files
authored
[ntcore] Unify NetworkInterface and MessageHandler (#7190)
1 parent 8ca99c7 commit 96f0b24

21 files changed

+494
-479
lines changed

ntcore/src/main/native/cpp/LocalStorage.cpp

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "Log.h"
2121
#include "Types_internal.h"
2222
#include "Value_internal.h"
23+
#include "net/MessageHandler.h"
2324
#include "networktables/NetworkTableValue.h"
2425

2526
using namespace nt;
@@ -403,7 +404,7 @@ void LocalStorage::Impl::PropertiesUpdated(TopicData* topic,
403404
NotifyTopic(topic, eventFlags | NT_EVENT_PROPERTIES);
404405
// check local flag so we don't echo back received properties changes
405406
if (m_network && sendNetwork) {
406-
m_network->SetProperties(topic->name, update);
407+
m_network->ClientSetProperties(topic->name, update);
407408
}
408409
}
409410

@@ -503,8 +504,9 @@ void LocalStorage::Impl::RemoveNetworkPublisher(TopicData* topic) {
503504
// this may result in a duplicate publish warning on the server side,
504505
// but send one anyway in this case just to be sure
505506
if (nextPub->active && m_network) {
506-
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
507-
topic->typeStr, topic->properties, nextPub->config);
507+
m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(),
508+
topic->name, topic->typeStr, topic->properties,
509+
nextPub->config);
508510
}
509511
}
510512
}
@@ -561,8 +563,8 @@ LocalStorage::PublisherData* LocalStorage::Impl::AddLocalPublisher(
561563
}
562564

563565
if (publisher->active && m_network) {
564-
m_network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
565-
topic->typeStr, topic->properties, config);
566+
m_network->ClientPublish(Handle{publisher->handle}.GetIndex(), topic->name,
567+
topic->typeStr, topic->properties, config);
566568
}
567569
return publisher;
568570
}
@@ -580,7 +582,7 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
580582
}
581583

582584
if (publisher->active && m_network) {
583-
m_network->Unpublish(Handle{publisher->handle}.GetIndex());
585+
m_network->ClientUnpublish(Handle{publisher->handle}.GetIndex());
584586
}
585587

586588
if (publisher->active && !topic->localPublishers.empty()) {
@@ -593,9 +595,9 @@ LocalStorage::Impl::RemoveLocalPublisher(NT_Publisher pubHandle) {
593595
topic->typeStr = nextPub->config.typeStr;
594596
RefreshPubSubActive(topic, false);
595597
if (nextPub->active && m_network) {
596-
m_network->Publish(Handle{nextPub->handle}.GetIndex(), topic->name,
597-
topic->typeStr, topic->properties,
598-
nextPub->config);
598+
m_network->ClientPublish(Handle{nextPub->handle}.GetIndex(),
599+
topic->name, topic->typeStr,
600+
topic->properties, nextPub->config);
599601
}
600602
}
601603
}
@@ -619,8 +621,8 @@ LocalStorage::SubscriberData* LocalStorage::Impl::AddLocalSubscriber(
619621
}
620622
if (m_network && !subscriber->config.hidden) {
621623
DEBUG4("-> NetworkSubscribe({})", topic->name);
622-
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(), {{topic->name}},
623-
config);
624+
m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
625+
{{topic->name}}, config);
624626
}
625627

626628
// queue current value
@@ -648,7 +650,7 @@ LocalStorage::Impl::RemoveLocalSubscriber(NT_Subscriber subHandle) {
648650
}
649651
}
650652
if (m_network && !subscriber->config.hidden) {
651-
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
653+
m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex());
652654
}
653655
}
654656
return subscriber;
@@ -685,8 +687,8 @@ LocalStorage::MultiSubscriberData* LocalStorage::Impl::AddMultiSubscriber(
685687
}
686688
if (m_network && !subscriber->options.hidden) {
687689
DEBUG4("-> NetworkSubscribe");
688-
m_network->Subscribe(Handle{subscriber->handle}.GetIndex(),
689-
subscriber->prefixes, subscriber->options);
690+
m_network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
691+
subscriber->prefixes, subscriber->options);
690692
}
691693
return subscriber;
692694
}
@@ -704,7 +706,7 @@ LocalStorage::Impl::RemoveMultiSubscriber(NT_MultiSubscriber subHandle) {
704706
}
705707
}
706708
if (m_network && !subscriber->options.hidden) {
707-
m_network->Unsubscribe(Handle{subscriber->handle}.GetIndex());
709+
m_network->ClientUnsubscribe(Handle{subscriber->handle}.GetIndex());
708710
}
709711
}
710712
return subscriber;
@@ -978,7 +980,7 @@ bool LocalStorage::Impl::PublishLocalValue(PublisherData* publisher,
978980
if (publisher->topic->IsCached()) {
979981
publisher->topic->lastValueNetwork = value;
980982
}
981-
m_network->SetValue(Handle{publisher->handle}.GetIndex(), value);
983+
m_network->ClientSetValue(Handle{publisher->handle}.GetIndex(), value);
982984
}
983985
return SetValue(publisher->topic, value, NT_EVENT_VALUE_LOCAL,
984986
suppressDuplicates, publisher);
@@ -1074,34 +1076,35 @@ LocalStorage::Impl::Impl(int inst, IListenerStorage& listenerStorage,
10741076

10751077
LocalStorage::~LocalStorage() = default;
10761078

1077-
NT_Topic LocalStorage::NetworkAnnounce(std::string_view name,
1078-
std::string_view typeStr,
1079-
const wpi::json& properties,
1080-
std::optional<int> pubuid) {
1079+
int LocalStorage::ServerAnnounce(std::string_view name, int id,
1080+
std::string_view typeStr,
1081+
const wpi::json& properties,
1082+
std::optional<int> pubuid) {
10811083
std::scoped_lock lock{m_mutex};
10821084
auto topic = m_impl.GetOrCreateTopic(name);
10831085
m_impl.NetworkAnnounce(topic, typeStr, properties, pubuid);
1084-
return topic->handle;
1086+
return Handle{topic->handle}.GetIndex();
10851087
}
10861088

1087-
void LocalStorage::NetworkUnannounce(std::string_view name) {
1089+
void LocalStorage::ServerUnannounce(std::string_view name, int id) {
10881090
std::scoped_lock lock{m_mutex};
10891091
auto topic = m_impl.GetOrCreateTopic(name);
10901092
m_impl.RemoveNetworkPublisher(topic);
10911093
}
10921094

1093-
void LocalStorage::NetworkPropertiesUpdate(std::string_view name,
1094-
const wpi::json& update, bool ack) {
1095+
void LocalStorage::ServerPropertiesUpdate(std::string_view name,
1096+
const wpi::json& update, bool ack) {
10951097
std::scoped_lock lock{m_mutex};
10961098
auto it = m_impl.m_nameTopics.find(name);
10971099
if (it != m_impl.m_nameTopics.end()) {
10981100
m_impl.NetworkPropertiesUpdate(it->second, update, ack);
10991101
}
11001102
}
11011103

1102-
void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
1104+
void LocalStorage::ServerSetValue(int topicId, const Value& value) {
11031105
std::scoped_lock lock{m_mutex};
1104-
if (auto topic = m_impl.m_topics.Get(topicHandle)) {
1106+
if (auto topic =
1107+
m_impl.m_topics.Get(Handle{m_impl.m_inst, topicId, Handle::kTopic})) {
11051108
if (m_impl.SetValue(topic, value, NT_EVENT_VALUE_REMOTE, false, nullptr)) {
11061109
if (topic->IsCached()) {
11071110
topic->lastValueNetwork = value;
@@ -1111,12 +1114,12 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
11111114
}
11121115
}
11131116

1114-
void LocalStorage::StartNetwork(net::NetworkInterface* network) {
1117+
void LocalStorage::StartNetwork(net::ClientMessageHandler* network) {
11151118
std::scoped_lock lock{m_mutex};
11161119
m_impl.StartNetwork(network);
11171120
}
11181121

1119-
void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
1122+
void LocalStorage::Impl::StartNetwork(net::ClientMessageHandler* network) {
11201123
DEBUG4("StartNetwork()");
11211124
m_network = network;
11221125
// publish all active publishers to the network and send last values
@@ -1125,26 +1128,27 @@ void LocalStorage::Impl::StartNetwork(net::NetworkInterface* network) {
11251128
PublisherData* anyPublisher = nullptr;
11261129
for (auto&& publisher : topic->localPublishers) {
11271130
if (publisher->active) {
1128-
network->Publish(Handle{publisher->handle}.GetIndex(), topic->name,
1129-
topic->typeStr, topic->properties, publisher->config);
1131+
network->ClientPublish(Handle{publisher->handle}.GetIndex(),
1132+
topic->name, topic->typeStr, topic->properties,
1133+
publisher->config);
11301134
anyPublisher = publisher;
11311135
}
11321136
}
11331137
if (anyPublisher && topic->lastValue) {
1134-
network->SetValue(Handle{anyPublisher->handle}.GetIndex(),
1135-
topic->lastValue);
1138+
network->ClientSetValue(Handle{anyPublisher->handle}.GetIndex(),
1139+
topic->lastValue);
11361140
}
11371141
}
11381142
for (auto&& subscriber : m_subscribers) {
11391143
if (!subscriber->config.hidden) {
1140-
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
1141-
{{subscriber->topic->name}}, subscriber->config);
1144+
network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
1145+
{{subscriber->topic->name}}, subscriber->config);
11421146
}
11431147
}
11441148
for (auto&& subscriber : m_multiSubscribers) {
11451149
if (!subscriber->options.hidden) {
1146-
network->Subscribe(Handle{subscriber->handle}.GetIndex(),
1147-
subscriber->prefixes, subscriber->options);
1150+
network->ClientSubscribe(Handle{subscriber->handle}.GetIndex(),
1151+
subscriber->prefixes, subscriber->options);
11481152
}
11491153
}
11501154
}

ntcore/src/main/native/cpp/LocalStorage.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
#include <stdint.h>
88

9-
#include <functional>
109
#include <memory>
1110
#include <span>
1211
#include <string>
@@ -26,6 +25,7 @@
2625
#include "Types_internal.h"
2726
#include "ValueCircularBuffer.h"
2827
#include "VectorSet.h"
28+
#include "net/MessageHandler.h"
2929
#include "net/NetworkInterface.h"
3030
#include "ntcore_cpp.h"
3131

@@ -46,15 +46,15 @@ class LocalStorage final : public net::ILocalStorage {
4646
~LocalStorage() final;
4747

4848
// network interface functions
49-
NT_Topic NetworkAnnounce(std::string_view name, std::string_view typeStr,
50-
const wpi::json& properties,
51-
std::optional<int> pubuid) final;
52-
void NetworkUnannounce(std::string_view name) final;
53-
void NetworkPropertiesUpdate(std::string_view name, const wpi::json& update,
54-
bool ack) final;
55-
void NetworkSetValue(NT_Topic topicHandle, const Value& value) final;
56-
57-
void StartNetwork(net::NetworkInterface* network) final;
49+
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
50+
const wpi::json& properties,
51+
std::optional<int> pubuid) final;
52+
void ServerUnannounce(std::string_view name, int id) final;
53+
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
54+
bool ack) final;
55+
void ServerSetValue(int topicId, const Value& value) final;
56+
57+
void StartNetwork(net::ClientMessageHandler* network) final;
5858
void ClearNetwork() final;
5959

6060
// User functions. These are the actual implementations of the corresponding
@@ -555,7 +555,7 @@ class LocalStorage final : public net::ILocalStorage {
555555
int m_inst;
556556
IListenerStorage& m_listenerStorage;
557557
wpi::Logger& m_logger;
558-
net::NetworkInterface* m_network{nullptr};
558+
net::ClientMessageHandler* m_network{nullptr};
559559

560560
// handle mappings
561561
HandleMap<TopicData, 16> m_topics;
@@ -606,7 +606,7 @@ class LocalStorage final : public net::ILocalStorage {
606606
void RemoveNetworkPublisher(TopicData* topic);
607607
void NetworkPropertiesUpdate(TopicData* topic, const wpi::json& update,
608608
bool ack);
609-
void StartNetwork(net::NetworkInterface* network);
609+
void StartNetwork(net::ClientMessageHandler* network);
610610

611611
PublisherData* AddLocalPublisher(TopicData* topic,
612612
const wpi::json& properties,

ntcore/src/main/native/cpp/NetworkClient.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "IConnectionList.h"
2424
#include "Log.h"
25+
#include "net/NetworkInterface.h"
2526

2627
using namespace nt;
2728
namespace uv = wpi::uv;

ntcore/src/main/native/cpp/net/ClientImpl.cpp

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,8 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
9494
continue;
9595
}
9696

97-
// otherwise it's a value message, get the local topic handle for it
98-
auto topicIt = m_topicMap.find(id);
99-
if (topicIt == m_topicMap.end()) {
100-
WARN("received unknown id {}", id);
101-
continue;
102-
}
103-
104-
// pass along to local handler
105-
if (m_local) {
106-
m_local->NetworkSetValue(topicIt->second, value);
107-
}
97+
// otherwise it's a value message
98+
ServerSetValue(id, value);
10899
}
109100
}
110101

@@ -229,27 +220,43 @@ void ClientImpl::SetValue(int32_t pubuid, const Value& value) {
229220
publisher.options.sendAll ? ValueSendMode::kAll : ValueSendMode::kNormal);
230221
}
231222

232-
void ClientImpl::ServerAnnounce(std::string_view name, int id,
233-
std::string_view typeStr,
234-
const wpi::json& properties,
235-
std::optional<int> pubuid) {
223+
int ClientImpl::ServerAnnounce(std::string_view name, int id,
224+
std::string_view typeStr,
225+
const wpi::json& properties,
226+
std::optional<int> pubuid) {
236227
DEBUG4("ServerAnnounce({}, {}, {})", name, id, typeStr);
237228
assert(m_local);
238-
m_topicMap[id] = m_local->NetworkAnnounce(name, typeStr, properties, pubuid);
229+
m_topicMap[id] =
230+
m_local->ServerAnnounce(name, 0, typeStr, properties, pubuid);
231+
return id;
239232
}
240233

241234
void ClientImpl::ServerUnannounce(std::string_view name, int id) {
242235
DEBUG4("ServerUnannounce({}, {})", name, id);
243236
assert(m_local);
244-
m_local->NetworkUnannounce(name);
237+
m_local->ServerUnannounce(name, m_topicMap[id]);
245238
m_topicMap.erase(id);
246239
}
247240

248241
void ClientImpl::ServerPropertiesUpdate(std::string_view name,
249242
const wpi::json& update, bool ack) {
250243
DEBUG4("ServerProperties({}, {}, {})", name, update.dump(), ack);
251244
assert(m_local);
252-
m_local->NetworkPropertiesUpdate(name, update, ack);
245+
m_local->ServerPropertiesUpdate(name, update, ack);
246+
}
247+
248+
void ClientImpl::ServerSetValue(int topicId, const Value& value) {
249+
// get the local topic handle for it
250+
auto topicIt = m_topicMap.find(topicId);
251+
if (topicIt == m_topicMap.end()) {
252+
WARN("received unknown id {}", topicId);
253+
return;
254+
}
255+
256+
// pass along to local handler
257+
if (m_local) {
258+
m_local->ServerSetValue(topicIt->second, value);
259+
}
253260
}
254261

255262
void ClientImpl::ProcessIncomingText(std::string_view data) {

ntcore/src/main/native/cpp/net/ClientImpl.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
#include <wpi/DenseMap.h>
1616

17-
#include "NetworkInterface.h"
17+
#include "MessageHandler.h"
1818
#include "NetworkOutgoingQueue.h"
1919
#include "NetworkPing.h"
2020
#include "PubSubOptions.h"
@@ -49,7 +49,7 @@ class ClientImpl final : private ServerMessageHandler {
4949

5050
void SendOutgoing(uint64_t curTimeMs, bool flush);
5151

52-
void SetLocal(LocalInterface* local) { m_local = local; }
52+
void SetLocal(ServerMessageHandler* local) { m_local = local; }
5353
void SendInitial();
5454

5555
private:
@@ -63,12 +63,13 @@ class ClientImpl final : private ServerMessageHandler {
6363
void UpdatePeriodic();
6464

6565
// ServerMessageHandler interface
66-
void ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
67-
const wpi::json& properties,
68-
std::optional<int> pubuid) final;
66+
int ServerAnnounce(std::string_view name, int id, std::string_view typeStr,
67+
const wpi::json& properties,
68+
std::optional<int> pubuid) final;
6969
void ServerUnannounce(std::string_view name, int id) final;
7070
void ServerPropertiesUpdate(std::string_view name, const wpi::json& update,
7171
bool ack) final;
72+
void ServerSetValue(int topicId, const Value& value) final;
7273

7374
void Publish(int pubuid, std::string_view name, std::string_view typeStr,
7475
const wpi::json& properties, const PubSubOptionsImpl& options);
@@ -77,7 +78,7 @@ class ClientImpl final : private ServerMessageHandler {
7778

7879
WireConnection& m_wire;
7980
wpi::Logger& m_logger;
80-
LocalInterface* m_local{nullptr};
81+
ServerMessageHandler* m_local{nullptr};
8182
std::function<void(int64_t serverTimeOffset, int64_t rtt2, bool valid)>
8283
m_timeSyncUpdated;
8384
std::function<void(uint32_t repeatMs)> m_setPeriodic;
@@ -86,7 +87,7 @@ class ClientImpl final : private ServerMessageHandler {
8687
std::vector<std::unique_ptr<PublisherData>> m_publishers;
8788

8889
// indexed by server-provided topic id
89-
wpi::DenseMap<int, NT_Topic> m_topicMap;
90+
wpi::DenseMap<int, int> m_topicMap;
9091

9192
// ping
9293
NetworkPing m_ping;

0 commit comments

Comments
 (0)