Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Commit 2273296

Browse files
authored
Associate worker thread pool creation to HTTP server (#99)
Fix eth_syncing integration test Update README
1 parent 95d3396 commit 2273296

File tree

10 files changed

+38
-24
lines changed

10 files changed

+38
-24
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,11 @@ silkrpcdaemon: C++ implementation of ETH JSON Remote Procedure Call (RPC) daemon
139139
140140
Flags from main.cpp:
141141
--chaindata (chain data path as string); default: "";
142-
--local (HTTP JSON local binding as string <address>:<port>);
143-
default: "localhost:8545";
142+
--local (HTTP JSON local binding as string <address>:<port>); default: "localhost:8545";
144143
--logLevel (logging level); default: c;
145-
--target (Erigon Core gRPC service location as string <address>:<port>);
146-
default: "localhost:9090";
144+
--numContexts (number of running I/O contexts as 32-bit integer); default: number of hardware thread contexts / 2;
145+
--numWorkers (number of worker threads as 32-bit integer); default: number of hardware thread contexts;
146+
--target (Erigon Core gRPC service location as string <address>:<port>); default: "localhost:9090";
147147
--timeout (gRPC call timeout as 32-bit integer); default: 10000;
148148
```
149149

silkrpc/commands/eth_api.hpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
#ifndef SILKRPC_COMMANDS_ETH_API_HPP_
1818
#define SILKRPC_COMMANDS_ETH_API_HPP_
1919

20-
#include <cstddef>
2120
#include <memory>
22-
#include <thread>
2321
#include <vector>
2422

2523
#include <silkrpc/config.hpp> // NOLINT(build/include_order)
@@ -46,8 +44,8 @@ namespace silkrpc::commands {
4644

4745
class EthereumRpcApi {
4846
public:
49-
explicit EthereumRpcApi(Context& context, std::size_t workers_count = std::thread::hardware_concurrency())
50-
: context_(context), database_(context.database), backend_(context.backend), workers_{workers_count} {}
47+
explicit EthereumRpcApi(Context& context, asio::thread_pool& workers)
48+
: context_(context), database_(context.database), backend_(context.backend), workers_{workers} {}
5149
virtual ~EthereumRpcApi() {}
5250

5351
EthereumRpcApi(const EthereumRpcApi&) = delete;
@@ -103,7 +101,7 @@ class EthereumRpcApi {
103101
Context& context_;
104102
std::unique_ptr<ethdb::Database>& database_;
105103
std::unique_ptr<ethbackend::BackEnd>& backend_;
106-
asio::thread_pool workers_;
104+
asio::thread_pool& workers_;
107105

108106
friend class silkrpc::http::RequestHandler;
109107
};

silkrpc/commands/rpc_api.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include <memory>
2121

22+
#include <asio/thread_pool.hpp>
23+
2224
#include <silkrpc/commands/debug_api.hpp>
2325
#include <silkrpc/commands/eth_api.hpp>
2426
#include <silkrpc/commands/net_api.hpp>
@@ -34,8 +36,8 @@ namespace silkrpc::commands {
3436

3537
class RpcApi : protected EthereumRpcApi, NetRpcApi, Web3RpcApi, DebugRpcApi, ParityRpcApi, TurboGethRpcApi, TraceRpcApi {
3638
public:
37-
explicit RpcApi(Context& context) :
38-
EthereumRpcApi{context}, NetRpcApi{context.backend}, Web3RpcApi{context}, DebugRpcApi{context.database},
39+
explicit RpcApi(Context& context, asio::thread_pool& workers) :
40+
EthereumRpcApi{context, workers}, NetRpcApi{context.backend}, Web3RpcApi{context}, DebugRpcApi{context.database},
3941
ParityRpcApi{context.database}, TurboGethRpcApi{context.database}, TraceRpcApi{context.database} {}
4042
virtual ~RpcApi() {}
4143

silkrpc/http/connection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
namespace silkrpc::http {
4040

41-
Connection::Connection(Context& context) : socket_{*context.io_context}, request_handler_{context} {
41+
Connection::Connection(Context& context, asio::thread_pool& workers) : socket_{*context.io_context}, request_handler_{context, workers} {
4242
request_.content.reserve(1024);
4343
request_.headers.reserve(8);
4444
request_.method.reserve(64);

silkrpc/http/connection.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
#include <asio/awaitable.hpp>
3232
#include <asio/ip/tcp.hpp>
33+
#include <asio/thread_pool.hpp>
3334

3435
#include <silkrpc/context_pool.hpp>
3536
#include "reply.hpp"
@@ -48,7 +49,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
4849
Connection& operator=(const Connection&) = delete;
4950

5051
/// Construct a connection running within the given execution context.
51-
explicit Connection(Context& context);
52+
explicit Connection(Context& context, asio::thread_pool& workers);
5253

5354
~Connection();
5455

silkrpc/http/request_handler.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <silkrpc/config.hpp>
3131

3232
#include <asio/awaitable.hpp>
33+
#include <asio/thread_pool.hpp>
3334

3435
#include <silkrpc/commands/rpc_api.hpp>
3536
#include <silkrpc/context_pool.hpp>
@@ -44,7 +45,7 @@ class RequestHandler {
4445
RequestHandler(const RequestHandler&) = delete;
4546
RequestHandler& operator=(const RequestHandler&) = delete;
4647

47-
explicit RequestHandler(Context& context) : rpc_api_{context} {}
48+
explicit RequestHandler(Context& context, asio::thread_pool& workers) : rpc_api_{context, workers} {}
4849

4950
virtual ~RequestHandler() {}
5051

silkrpc/http/server.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535

3636
namespace silkrpc::http {
3737

38-
Server::Server(const std::string& address, const std::string& port, ContextPool& context_pool)
39-
: context_pool_(context_pool), acceptor_{context_pool.get_io_context()} {
38+
Server::Server(const std::string& address, const std::string& port, ContextPool& context_pool, std::size_t num_workers)
39+
: context_pool_(context_pool), acceptor_{context_pool.get_io_context()}, workers_{num_workers} {
4040
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
4141
asio::ip::tcp::resolver resolver{acceptor_.get_executor()};
4242
asio::ip::tcp::endpoint endpoint = *resolver.resolve(address, port).begin();
@@ -62,7 +62,7 @@ asio::awaitable<void> Server::run() {
6262

6363
SILKRPC_DEBUG << "Server::start accepting using io_context " << io_context << "...\n" << std::flush;
6464

65-
auto new_connection = std::make_shared<Connection>(context);
65+
auto new_connection = std::make_shared<Connection>(context, workers_);
6666
co_await acceptor_.async_accept(new_connection->socket(), asio::use_awaitable);
6767
if (!acceptor_.is_open()) {
6868
SILKRPC_TRACE << "Server::start returning...\n";

silkrpc/http/server.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
#ifndef SILKRPC_HTTP_SERVER_HPP_
2424
#define SILKRPC_HTTP_SERVER_HPP_
2525

26+
#include <cstddef>
2627
#include <string>
2728

2829
#include <silkrpc/config.hpp>
2930

3031
#include <asio/awaitable.hpp>
3132
#include <asio/ip/tcp.hpp>
33+
#include <asio/thread_pool.hpp>
3234

3335
#include <silkrpc/context_pool.hpp>
3436

@@ -41,7 +43,7 @@ class Server {
4143
Server& operator=(const Server&) = delete;
4244

4345
// Construct the server to listen on the specified TCP address and port
44-
explicit Server(const std::string& address, const std::string& port, ContextPool& context_pool);
46+
explicit Server(const std::string& address, const std::string& port, ContextPool& context_pool, std::size_t num_workers);
4547

4648
void start();
4749

@@ -55,6 +57,8 @@ class Server {
5557

5658
// The acceptor used to listen for incoming TCP connections
5759
asio::ip::tcp::acceptor acceptor_;
60+
61+
asio::thread_pool workers_;
5862
};
5963

6064
} // namespace silkrpc::http

silkrpc/main.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ ABSL_FLAG(std::string, chaindata, silkrpc::common::kEmptyChainData, "chain data
4545
ABSL_FLAG(std::string, local, silkrpc::common::kDefaultLocal, "HTTP JSON local binding as string <address>:<port>");
4646
ABSL_FLAG(std::string, target, silkrpc::common::kDefaultTarget, "TG Core gRPC service location as string <address>:<port>");
4747
ABSL_FLAG(uint32_t, numContexts, std::thread::hardware_concurrency() / 2, "number of running I/O contexts as 32-bit integer");
48+
ABSL_FLAG(uint32_t, numWorkers, std::thread::hardware_concurrency(), "number of worker threads as 32-bit integer");
4849
ABSL_FLAG(uint32_t, timeout, silkrpc::common::kDefaultTimeout.count(), "gRPC call timeout as 32-bit integer");
4950
ABSL_FLAG(silkrpc::LogLevel, logLevel, silkrpc::LogLevel::Critical, "logging level");
5051

@@ -74,7 +75,7 @@ int main(int argc, char* argv[]) {
7475
auto chaindata{absl::GetFlag(FLAGS_chaindata)};
7576
if (!chaindata.empty() && !std::filesystem::exists(chaindata)) {
7677
SILKRPC_ERROR << "Parameter chaindata is invalid: [" << chaindata << "]\n";
77-
SILKRPC_ERROR << "Use --chaindata flag to specify the path of Turbo-Geth database\n";
78+
SILKRPC_ERROR << "Use --chaindata flag to specify the path of Erigon database\n";
7879
return -1;
7980
}
8081

@@ -88,20 +89,20 @@ int main(int argc, char* argv[]) {
8889
auto target{absl::GetFlag(FLAGS_target)};
8990
if (!target.empty() && target.find(":") == std::string::npos) {
9091
SILKRPC_ERROR << "Parameter target is invalid: [" << target << "]\n";
91-
SILKRPC_ERROR << "Use --target flag to specify the location of Turbo-Geth running instance\n";
92+
SILKRPC_ERROR << "Use --target flag to specify the location of Erigon running instance\n";
9293
return -1;
9394
}
9495

9596
if (chaindata.empty() && target.empty()) {
9697
SILKRPC_ERROR << "Parameters chaindata and target cannot be both empty, specify one of them\n";
97-
SILKRPC_ERROR << "Use --chaindata or --target flag to specify the path or the location of Turbo-Geth instance\n";
98+
SILKRPC_ERROR << "Use --chaindata or --target flag to specify the path or the location of Erigon instance\n";
9899
return -1;
99100
}
100101

101102
auto timeout{absl::GetFlag(FLAGS_timeout)};
102103
if (timeout < 0) {
103104
SILKRPC_ERROR << "Parameter timeout is invalid: [" << timeout << "]\n";
104-
SILKRPC_ERROR << "Use --timeout flag to specify the timeout in msecs for Turbo-Geth KV gRPC I/F\n";
105+
SILKRPC_ERROR << "Use --timeout flag to specify the timeout in msecs for Erigon KV gRPC I/F\n";
105106
return -1;
106107
}
107108

@@ -112,6 +113,13 @@ int main(int argc, char* argv[]) {
112113
return -1;
113114
}
114115

116+
auto numWorkers{absl::GetFlag(FLAGS_numWorkers)};
117+
if (numWorkers < 0) {
118+
SILKRPC_ERROR << "Parameter numWorkers is invalid: [" << numWorkers << "]\n";
119+
SILKRPC_ERROR << "Use --numWorkers flag to specify the number of worker threads executing long-run operations\n";
120+
return -1;
121+
}
122+
115123
if (chaindata.empty()) {
116124
SILKRPC_LOG << "Silkrpc launched with target " << target << " using " << numContexts << " contexts\n";
117125
} else {
@@ -139,7 +147,7 @@ int main(int argc, char* argv[]) {
139147

140148
const auto http_host = local.substr(0, local.find(kAddressPortSeparator));
141149
const auto http_port = local.substr(local.find(kAddressPortSeparator) + 1, std::string::npos);
142-
silkrpc::http::Server http_server{http_host, http_port, context_pool};
150+
silkrpc::http::Server http_server{http_host, http_port, context_pool, numWorkers};
143151

144152
auto& io_context = context_pool.get_io_context();
145153
asio::signal_set signals{io_context, SIGINT, SIGTERM};

tests/integration/jsonrpc_commands_goerli.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@
113113
"response":{
114114
"jsonrpc":"2.0",
115115
"id":1,
116-
"result":false
116+
"result":null
117117
}
118118
},
119119
{

0 commit comments

Comments
 (0)