diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp
index 889d4452c65..6b209df3e66 100644
--- a/lib/remote/jsonrpcconnection.cpp
+++ b/lib/remote/jsonrpcconnection.cpp
@@ -13,6 +13,7 @@
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
+#include "base/shared.hpp"
#include "base/tlsstream.hpp"
#include
#include
@@ -66,13 +67,14 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
return ch::duration_cast(d).count();
});
+ auto& io (m_IoStrand.context());
+ auto msg (Shared>::Make(String(), io));
+
m_Stream->next_layer().SetSeen(&m_Seen);
while (!m_ShuttingDown) {
- String jsonString;
-
try {
- jsonString = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
+ msg->first = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
@@ -83,52 +85,64 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
m_Seen = Utility::GetTime();
if (m_Endpoint) {
- m_Endpoint->AddMessageReceived(jsonString.GetLength());
+ m_Endpoint->AddMessageReceived(msg->first.GetLength());
}
- String rpcMethod("UNKNOWN");
ch::steady_clock::duration cpuBoundDuration(0);
auto start (ch::steady_clock::now());
+ CpuBoundWork handleMessage (yc);
- try {
- CpuBoundWork handleMessage (yc);
+ // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
+ cpuBoundDuration = ch::steady_clock::now() - start;
- // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
- cpuBoundDuration = ch::steady_clock::now() - start;
+ io.post([this, keepAlive = Ptr(this), msg, start, toMilliseconds, cpuBoundDuration] {
+ Defer wakeUp ([this, msg] {
+ m_IoStrand.post([msg] { msg->second.Set(); });
+ });
- Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
- if (String method = message->Get("method"); !method.IsEmpty()) {
- rpcMethod = std::move(method);
- }
+ String rpcMethod ("UNKNOWN");
+
+ try {
+ Dictionary::Ptr message = JsonRpc::DecodeMessage(msg->first);
+ if (String method = message->Get("method"); !method.IsEmpty()) {
+ rpcMethod = std::move(method);
+ }
- MessageHandler(message);
+ MessageHandler(message);
- l_TaskStats.InsertValue(Utility::GetTime(), 1);
+ l_TaskStats.InsertValue(Utility::GetTime(), 1);
- auto total = ch::steady_clock::now() - start;
+ auto total = ch::steady_clock::now() - start;
+ Log msg (total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");
- Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");
- msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
- << "' (took total " << toMilliseconds(total) << "ms";
+ msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
+ << "' (took total " << toMilliseconds(total) << "ms";
- if (cpuBoundDuration >= ch::seconds(1)) {
- msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
- }
- msg << ").";
- } catch (const std::exception& ex) {
- auto total = ch::steady_clock::now() - start;
+ if (cpuBoundDuration >= ch::seconds(1)) {
+ msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
+ }
- Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");
- msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
- << m_Identity << "' (took total " << toMilliseconds(total) << "ms";
+ msg << ").";
+ } catch (const std::exception& ex) {
+ auto total = ch::steady_clock::now() - start;
+
+ Log msg (m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");
+
+ msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
+ << m_Identity << "' (took total " << toMilliseconds(total) << "ms";
+
+ if (cpuBoundDuration >= ch::seconds(1)) {
+ msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
+ }
- if (cpuBoundDuration >= ch::seconds(1)) {
- msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
+ msg << "): " << DiagnosticInformation(ex);
+
+ Disconnect();
}
- msg << "): " << DiagnosticInformation(ex);
+ });
- break;
- }
+ msg->second.Wait(yc);
+ msg->second.Clear();
}
Disconnect();
@@ -369,7 +383,7 @@ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
resultMessage->Set("jsonrpc", "2.0");
resultMessage->Set("id", message->Get("id"));
- SendMessageInternal(resultMessage);
+ SendMessage(resultMessage);
}
}