Skip to content

Commit a621ceb

Browse files
authored
[ntcore] Server round robin message processing (#7191)
Each client has an incoming queue of ClientMessage. In the read callback: - Parse and process only ping messages and a limited number of messages; anything else will get put into the queue and not processed - If we queued some messages, we tell the network we stopped reading; this will result in back-pressure if we are reading too slowly. We also start an idle handle to process the queued messages. In the idle handle callback: - For each client, process just a few pending messages. This is performed in round-robin fashion across all clients with pending messages - When a client's queue becomes empty, we re-enable the network read - When all client queues are empty, we stop the idle handle (so we don't spin) For local client processing, we use round-robin processing for most cases (including FlushLocal), but still do batch processing of all local changes for explicit network Flush() calls.
1 parent 8870d98 commit a621ceb

15 files changed

+438
-246
lines changed

ntcore/src/main/native/cpp/NetworkClient.cpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ NetworkClientBase::NetworkClientBase(int inst, std::string_view id,
4343
m_id{id},
4444
m_localQueue{logger},
4545
m_loop{*m_loopRunner.GetLoop()} {
46-
m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize);
47-
4846
INFO("starting network client");
4947
}
5048

@@ -194,9 +192,14 @@ NetworkClient3::~NetworkClient3() {
194192
}
195193

196194
void NetworkClient3::HandleLocal() {
197-
m_localQueue.ReadQueue(&m_localMsgs);
198-
if (m_clientImpl) {
199-
m_clientImpl->HandleLocal(m_localMsgs);
195+
for (;;) {
196+
auto msgs = m_localQueue.ReadQueue(m_localMsgs);
197+
if (msgs.empty()) {
198+
return;
199+
}
200+
if (m_clientImpl) {
201+
m_clientImpl->HandleLocal(msgs);
202+
}
200203
}
201204
}
202205

@@ -358,9 +361,14 @@ NetworkClient::~NetworkClient() {
358361
}
359362

360363
void NetworkClient::HandleLocal() {
361-
m_localQueue.ReadQueue(&m_localMsgs);
362-
if (m_clientImpl) {
363-
m_clientImpl->HandleLocal(std::move(m_localMsgs));
364+
for (;;) {
365+
auto msgs = m_localQueue.ReadQueue(m_localMsgs);
366+
if (msgs.empty()) {
367+
return;
368+
}
369+
if (m_clientImpl) {
370+
m_clientImpl->HandleLocal(msgs);
371+
}
364372
}
365373
}
366374

ntcore/src/main/native/cpp/NetworkClient.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#include <atomic>
88
#include <functional>
99
#include <memory>
10-
#include <optional>
1110
#include <span>
1211
#include <string>
1312
#include <string_view>
@@ -23,12 +22,11 @@
2322

2423
#include "INetworkClient.h"
2524
#include "net/ClientImpl.h"
25+
#include "net/ClientMessageQueue.h"
2626
#include "net/Message.h"
27-
#include "net/NetworkLoopQueue.h"
2827
#include "net/WebSocketConnection.h"
2928
#include "net3/ClientImpl3.h"
3029
#include "net3/UvStreamConnection3.h"
31-
#include "ntcore_cpp.h"
3230

3331
namespace wpi {
3432
class Logger;
@@ -80,7 +78,8 @@ class NetworkClientBase : public INetworkClient {
8078
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
8179
std::shared_ptr<wpi::uv::Async<>> m_flush;
8280

83-
std::vector<net::ClientMessage> m_localMsgs;
81+
using Queue = net::LocalClientMessageQueue;
82+
net::ClientMessage m_localMsgs[Queue::kBlockSize];
8483

8584
std::vector<std::pair<std::string, unsigned int>> m_servers;
8685

@@ -91,7 +90,7 @@ class NetworkClientBase : public INetworkClient {
9190
std::atomic<wpi::uv::Async<>*> m_flushLocalAtomic{nullptr};
9291
std::atomic<wpi::uv::Async<>*> m_flushAtomic{nullptr};
9392

94-
net::NetworkLoopQueue m_localQueue;
93+
Queue m_localQueue;
9594

9695
int m_connHandle = 0;
9796

ntcore/src/main/native/cpp/NetworkServer.cpp

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ namespace uv = wpi::uv;
4242
// use a larger max message size for websockets
4343
static constexpr size_t kMaxMessageSize = 2 * 1024 * 1024;
4444

45+
static constexpr size_t kClientProcessMessageCountMax = 16;
46+
4547
class NetworkServer::ServerConnection {
4648
public:
4749
ServerConnection(NetworkServer& server, std::string_view addr,
@@ -105,7 +107,6 @@ class NetworkServer::ServerConnection4 final
105107
void NetworkServer::ServerConnection::SetupOutgoingTimer() {
106108
m_outgoingTimer = uv::Timer::Create(m_server.m_loop);
107109
m_outgoingTimer->timeout.connect([this] {
108-
m_server.HandleLocal();
109110
m_server.m_serverImpl.SendOutgoing(m_clientId,
110111
m_server.m_loop.Now().count());
111112
});
@@ -172,8 +173,10 @@ NetworkServer::ServerConnection3::ServerConnection3(
172173
ConnectionClosed();
173174
});
174175
stream->data.connect([this](uv::Buffer& buf, size_t size) {
175-
m_server.m_serverImpl.ProcessIncomingBinary(
176-
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size});
176+
if (m_server.m_serverImpl.ProcessIncomingBinary(
177+
m_clientId, {reinterpret_cast<const uint8_t*>(buf.base), size})) {
178+
m_server.m_idle->Start();
179+
}
177180
});
178181
stream->StartRead();
179182

@@ -293,10 +296,14 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
293296
ConnectionClosed();
294297
});
295298
m_websocket->text.connect([this](std::string_view data, bool) {
296-
m_server.m_serverImpl.ProcessIncomingText(m_clientId, data);
299+
if (m_server.m_serverImpl.ProcessIncomingText(m_clientId, data)) {
300+
m_server.m_idle->Start();
301+
}
297302
});
298303
m_websocket->binary.connect([this](std::span<const uint8_t> data, bool) {
299-
m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data);
304+
if (m_server.m_serverImpl.ProcessIncomingBinary(m_clientId, data)) {
305+
m_server.m_idle->Start();
306+
}
300307
});
301308

302309
SetupOutgoingTimer();
@@ -320,12 +327,11 @@ NetworkServer::NetworkServer(std::string_view persistentFilename,
320327
m_serverImpl{logger},
321328
m_localQueue{logger},
322329
m_loop(*m_loopRunner.GetLoop()) {
323-
m_localMsgs.reserve(net::NetworkLoopQueue::kInitialQueueSize);
324330
m_loopRunner.ExecAsync([=, this](uv::Loop& loop) {
325331
// connect local storage to server
326-
m_serverImpl.SetLocal(&m_localStorage);
332+
m_serverImpl.SetLocal(&m_localStorage, &m_localQueue);
327333
m_localStorage.StartNetwork(&m_localQueue);
328-
HandleLocal();
334+
ProcessAllLocal();
329335

330336
// load persistent file first, then initialize
331337
uv::QueueWork(m_loop, [this] { LoadPersistent(); }, [this] { Init(); });
@@ -350,9 +356,9 @@ void NetworkServer::Flush() {
350356
}
351357
}
352358

353-
void NetworkServer::HandleLocal() {
354-
m_localQueue.ReadQueue(&m_localMsgs);
355-
m_serverImpl.HandleLocal(m_localMsgs);
359+
void NetworkServer::ProcessAllLocal() {
360+
while (m_serverImpl.ProcessLocalMessages(128)) {
361+
}
356362
}
357363

358364
void NetworkServer::LoadPersistent() {
@@ -421,8 +427,10 @@ void NetworkServer::Init() {
421427
m_readLocalTimer = uv::Timer::Create(m_loop);
422428
if (m_readLocalTimer) {
423429
m_readLocalTimer->timeout.connect([this] {
424-
HandleLocal();
425-
m_serverImpl.SendAllOutgoing(m_loop.Now().count(), false);
430+
if (m_serverImpl.ProcessLocalMessages(kClientProcessMessageCountMax)) {
431+
DEBUG4("Starting idle processing");
432+
m_idle->Start(); // more to process
433+
}
426434
});
427435
m_readLocalTimer->Start(uv::Timer::Time{100}, uv::Timer::Time{100});
428436
}
@@ -447,18 +455,36 @@ void NetworkServer::Init() {
447455
m_flush = uv::Async<>::Create(m_loop);
448456
if (m_flush) {
449457
m_flush->wakeup.connect([this] {
450-
HandleLocal();
458+
ProcessAllLocal();
451459
m_serverImpl.SendAllOutgoing(m_loop.Now().count(), true);
452460
});
453461
}
454462
m_flushAtomic = m_flush.get();
455463

456464
m_flushLocal = uv::Async<>::Create(m_loop);
457465
if (m_flushLocal) {
458-
m_flushLocal->wakeup.connect([this] { HandleLocal(); });
466+
m_flushLocal->wakeup.connect([this] {
467+
if (m_serverImpl.ProcessLocalMessages(kClientProcessMessageCountMax)) {
468+
DEBUG4("Starting idle processing");
469+
m_idle->Start(); // more to process
470+
}
471+
});
459472
}
460473
m_flushLocalAtomic = m_flushLocal.get();
461474

475+
m_idle = uv::Idle::Create(m_loop);
476+
if (m_idle) {
477+
m_idle->idle.connect([this] {
478+
if (m_serverImpl.ProcessIncomingMessages(kClientProcessMessageCountMax)) {
479+
DEBUG4("Starting idle processing");
480+
m_idle->Start(); // more to process
481+
} else {
482+
DEBUG4("Stopping idle processing");
483+
m_idle->Stop(); // go back to sleep
484+
}
485+
});
486+
}
487+
462488
INFO("Listening on NT3 port {}, NT4 port {}", m_port3, m_port4);
463489

464490
if (m_port3 != 0) {

ntcore/src/main/native/cpp/NetworkServer.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
#include <wpinet/EventLoopRunner.h>
1515
#include <wpinet/uv/Async.h>
16+
#include <wpinet/uv/Idle.h>
1617
#include <wpinet/uv/Timer.h>
1718

19+
#include "net/ClientMessageQueue.h"
1820
#include "net/Message.h"
19-
#include "net/NetworkLoopQueue.h"
2021
#include "net/ServerImpl.h"
2122
#include "ntcore_cpp.h"
2223

@@ -49,7 +50,7 @@ class NetworkServer {
4950
class ServerConnection3;
5051
class ServerConnection4;
5152

52-
void HandleLocal();
53+
void ProcessAllLocal();
5354
void LoadPersistent();
5455
void SavePersistent(std::string_view filename, std::string_view data);
5556
void Init();
@@ -71,9 +72,11 @@ class NetworkServer {
7172
std::shared_ptr<wpi::uv::Timer> m_savePersistentTimer;
7273
std::shared_ptr<wpi::uv::Async<>> m_flushLocal;
7374
std::shared_ptr<wpi::uv::Async<>> m_flush;
75+
std::shared_ptr<wpi::uv::Idle> m_idle;
7476
bool m_shutdown = false;
7577

76-
std::vector<net::ClientMessage> m_localMsgs;
78+
using Queue = net::LocalClientMessageQueue;
79+
net::ClientMessage m_localMsgs[Queue::kBlockSize];
7780

7881
net::ServerImpl m_serverImpl;
7982

@@ -87,7 +90,7 @@ class NetworkServer {
8790
};
8891
std::vector<Connection> m_connections;
8992

90-
net::NetworkLoopQueue m_localQueue;
93+
Queue m_localQueue;
9194

9295
wpi::EventLoopRunner m_loopRunner;
9396
wpi::uv::Loop& m_loop;

ntcore/src/main/native/cpp/net/ClientImpl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ void ClientImpl::ProcessIncomingBinary(uint64_t curTimeMs,
9999
}
100100
}
101101

102-
void ClientImpl::HandleLocal(std::vector<ClientMessage>&& msgs) {
102+
void ClientImpl::HandleLocal(std::span<ClientMessage> msgs) {
103103
DEBUG4("HandleLocal()");
104104
for (auto&& elem : msgs) {
105105
// common case is value

ntcore/src/main/native/cpp/net/ClientImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ClientImpl final : private ServerMessageHandler {
4545

4646
void ProcessIncomingText(std::string_view data);
4747
void ProcessIncomingBinary(uint64_t curTimeMs, std::span<const uint8_t> data);
48-
void HandleLocal(std::vector<ClientMessage>&& msgs);
48+
void HandleLocal(std::span<ClientMessage> msgs);
4949

5050
void SendOutgoing(uint64_t curTimeMs, bool flush);
5151

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright (c) FIRST and other WPILib contributors.
2+
// Open Source Software; you can modify and/or share it under the terms of
3+
// the WPILib BSD license file in the root directory of this project.
4+
5+
#pragma once
6+
7+
#include <span>
8+
#include <string>
9+
10+
#include <wpi/FastQueue.h>
11+
#include <wpi/mutex.h>
12+
13+
#include "Message.h"
14+
#include "MessageHandler.h"
15+
16+
namespace wpi {
17+
class Logger;
18+
} // namespace wpi
19+
20+
namespace nt::net {
21+
22+
class ClientMessageQueue {
23+
public:
24+
virtual ~ClientMessageQueue() = default;
25+
26+
virtual std::span<ClientMessage> ReadQueue(std::span<ClientMessage> out) = 0;
27+
virtual void ClearQueue() = 0;
28+
};
29+
30+
namespace detail {
31+
32+
template <size_t MaxValueSize, bool IsMutexed>
33+
class ClientMessageQueueImpl final : public ClientMessageHandler,
34+
public ClientMessageQueue {
35+
public:
36+
static constexpr size_t kBlockSize = 64;
37+
38+
explicit ClientMessageQueueImpl(wpi::Logger& logger) : m_logger{logger} {}
39+
40+
bool empty() const { return m_queue.empty(); }
41+
42+
// ClientMessageQueue - calls to these read the queue
43+
std::span<ClientMessage> ReadQueue(std::span<ClientMessage> out) final;
44+
void ClearQueue() final;
45+
46+
// ClientMessageHandler - calls to these append to the queue
47+
void ClientPublish(int pubuid, std::string_view name,
48+
std::string_view typeStr, const wpi::json& properties,
49+
const PubSubOptionsImpl& options) final;
50+
void ClientUnpublish(int pubuid) final;
51+
void ClientSetProperties(std::string_view name,
52+
const wpi::json& update) final;
53+
void ClientSubscribe(int subuid, std::span<const std::string> topicNames,
54+
const PubSubOptionsImpl& options) final;
55+
void ClientUnsubscribe(int subuid) final;
56+
void ClientSetValue(int pubuid, const Value& value) final;
57+
58+
private:
59+
wpi::FastQueue<ClientMessage, kBlockSize> m_queue{kBlockSize - 1};
60+
wpi::Logger& m_logger;
61+
62+
class NoMutex {
63+
public:
64+
void lock() {}
65+
void unlock() {}
66+
};
67+
[[no_unique_address]]
68+
std::conditional_t<IsMutexed, wpi::mutex, NoMutex> m_mutex;
69+
70+
struct ValueSize {
71+
size_t size{0};
72+
bool errored{false};
73+
};
74+
struct Empty {};
75+
[[no_unique_address]]
76+
std::conditional_t<MaxValueSize != 0, ValueSize, Empty> m_valueSize;
77+
};
78+
79+
} // namespace detail
80+
81+
using LocalClientMessageQueue =
82+
detail::ClientMessageQueueImpl<2 * 1024 * 1024, true>;
83+
using NetworkIncomingClientQueue = detail::ClientMessageQueueImpl<0, false>;
84+
85+
} // namespace nt::net
86+
87+
#include "ClientMessageQueue.inc"

0 commit comments

Comments
 (0)