Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hazelcast/include/hazelcast/client/client_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class HAZELCAST_API client_properties

const client_property& get_internal_executor_pool_size() const;

const client_property& get_io_thread_count() const;

const client_property& get_shuffle_member_list() const;

const client_property& get_max_concurrent_invocations() const;
Expand Down Expand Up @@ -181,6 +183,17 @@ class HAZELCAST_API client_properties
static const std::string INTERNAL_EXECUTOR_POOL_SIZE;
static const std::string INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT;

/**
* Number of IO threads for the networking layer.
* Each IO thread runs its own Boost.Asio io_context event loop.
* Connections are distributed across IO threads in round-robin fashion.
* This is the C++ equivalent of Java's IO_INPUT_THREAD_COUNT +
* IO_OUTPUT_THREAD_COUNT (unified because Asio's proactor pattern handles
* both read and write directions per thread without blocking).
*/
static const std::string IO_THREAD_COUNT;
static const std::string IO_THREAD_COUNT_DEFAULT;

/**
* Client shuffles the given member list to prevent all clients to connect
* to the same node when this property is set to true. When it is set to
Expand Down Expand Up @@ -315,6 +328,7 @@ class HAZELCAST_API client_properties
client_property invocation_timeout_seconds_;
client_property event_thread_count_;
client_property internal_executor_pool_size_;
client_property io_thread_count_;
client_property shuffle_member_list_;
client_property max_concurrent_invocations_;
client_property backpressure_backoff_timeout_millis_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,14 @@ class HAZELCAST_API ClientConnectionManagerImpl
logger& logger_;
std::chrono::milliseconds connection_timeout_millis_;
spi::ClientContext& client_;
std::unique_ptr<boost::asio::io_context> io_context_;
std::vector<std::unique_ptr<boost::asio::io_context>> io_contexts_;
std::vector<std::unique_ptr<boost::asio::ip::tcp::resolver>> io_resolvers_;
std::unique_ptr<internal::socket::SocketFactory> socket_factory_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has moved (wasn't updated). Any good reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason except that socket factory is using io_context and resolver in its methods, they are related but move is a random choice, I can revert it, no problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a massive problem, just wanted to check my understanding.

std::vector<std::thread> io_threads_;
std::vector<std::unique_ptr<
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>>
io_guards_;
std::atomic<size_t> next_io_index_{ 0 };
socket_interceptor socket_interceptor_;
util::SynchronizedMap<member, bool> connecting_members_;
// TODO: change with CopyOnWriteArraySet<ConnectionListener> as in Java
Expand All @@ -291,13 +298,7 @@ class HAZELCAST_API ClientConnectionManagerImpl
bool shuffle_member_list_;
std::unique_ptr<AddressProvider> address_provider_;
std::atomic<int32_t> connection_id_gen_;
std::unique_ptr<boost::asio::ip::tcp::resolver> io_resolver_;
std::unique_ptr<internal::socket::SocketFactory> socket_factory_;
HeartbeatManager heartbeat_;
std::thread io_thread_;
std::unique_ptr<
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>
io_guard_;
const bool async_start_;
const config::client_connection_strategy_config::reconnect_mode
reconnect_mode_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class HAZELCAST_API Connection
int32_t connection_id,
internal::socket::SocketFactory& socket_factory,
ClientConnectionManagerImpl& client_connection_manager,
std::chrono::milliseconds& connect_timeout_in_millis);
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver);

~Connection() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,18 @@ namespace socket {
class HAZELCAST_API SocketFactory
{
public:
SocketFactory(spi::ClientContext& client_context,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver);
SocketFactory(spi::ClientContext& client_context);

bool start();

std::unique_ptr<hazelcast::client::socket> create(
const address& address,
std::chrono::milliseconds& connect_timeout_in_millis);
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver);

private:
spi::ClientContext& client_context_;
boost::asio::io_context& io_;
boost::asio::ip::tcp::resolver& io_resolver_;
#ifdef HZ_BUILD_WITH_SSL
std::shared_ptr<boost::asio::ssl::context> ssl_context_;
#endif
Expand Down
11 changes: 11 additions & 0 deletions hazelcast/src/hazelcast/client/client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,10 @@ const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE =
"hazelcast.client.internal.executor.pool.size";
const std::string client_properties::INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT = "3";

const std::string client_properties::IO_THREAD_COUNT =
"hazelcast.client.io.thread.count";
const std::string client_properties::IO_THREAD_COUNT_DEFAULT = "3";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if there was documentation to explain why 3 was chosen. Even if it just links to the config in the Java client or something.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a study conducted a while ago reported at here which gives details about the choice of 3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link to that in the code to help a future maintainer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is internal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, doesn't cpp client verify the public configuration? What do we expect when count is meaningless?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is internal.

I understand that it won't help anyone outside of Hazelcast.


const std::string client_properties::SHUFFLE_MEMBER_LIST =
"hazelcast.client.shuffle.member.list";
const std::string client_properties::SHUFFLE_MEMBER_LIST_DEFAULT = "true";
Expand Down Expand Up @@ -1060,6 +1064,7 @@ client_properties::client_properties(
, event_thread_count_(EVENT_THREAD_COUNT, EVENT_THREAD_COUNT_DEFAULT)
, internal_executor_pool_size_(INTERNAL_EXECUTOR_POOL_SIZE,
INTERNAL_EXECUTOR_POOL_SIZE_DEFAULT)
, io_thread_count_(IO_THREAD_COUNT, IO_THREAD_COUNT_DEFAULT)
, shuffle_member_list_(SHUFFLE_MEMBER_LIST, SHUFFLE_MEMBER_LIST_DEFAULT)
, max_concurrent_invocations_(MAX_CONCURRENT_INVOCATIONS,
MAX_CONCURRENT_INVOCATIONS_DEFAULT)
Expand Down Expand Up @@ -1122,6 +1127,12 @@ client_properties::get_internal_executor_pool_size() const
return internal_executor_pool_size_;
}

const client_property&
client_properties::get_io_thread_count() const
{
return io_thread_count_;
}

const client_property&
client_properties::get_shuffle_member_list() const
{
Expand Down
74 changes: 47 additions & 27 deletions hazelcast/src/hazelcast/client/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,31 @@ ClientConnectionManagerImpl::start()
return false;
}

io_context_.reset(new boost::asio::io_context);
io_resolver_.reset(
new boost::asio::ip::tcp::resolver(io_context_->get_executor()));
socket_factory_.reset(new internal::socket::SocketFactory(
client_, *io_context_, *io_resolver_));
auto guard = boost::asio::make_work_guard(*io_context_);
io_guard_ = std::unique_ptr<
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>(
new boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>(std::move(guard)));
auto& props = client_.get_client_properties();
int io_thread_count = props.get_integer(props.get_io_thread_count());

socket_factory_.reset(new internal::socket::SocketFactory(client_));

if (!socket_factory_->start()) {
return false;
}

socket_interceptor_ = client_.get_client_config().get_socket_interceptor();

io_thread_ = std::thread([=]() { io_context_->run(); });
for (int i = 0; i < io_thread_count; ++i) {
auto ctx =
std::unique_ptr<boost::asio::io_context>(new boost::asio::io_context);
io_guards_.push_back(std::unique_ptr<boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>>(
new boost::asio::executor_work_guard<
boost::asio::io_context::executor_type>(
boost::asio::make_work_guard(*ctx))));
io_resolvers_.push_back(std::unique_ptr<boost::asio::ip::tcp::resolver>(
new boost::asio::ip::tcp::resolver(ctx->get_executor())));
auto raw_ctx = ctx.get();
io_contexts_.push_back(std::move(ctx));
io_threads_.emplace_back([raw_ctx]() { raw_ctx->run(); });
}
Comment on lines +128 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely my inexperience, but this looks... intense.
Could it be more readable / commented?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, that is how C++ code looks. basically we are initializing the vectors and starting the io threads. Lock guards are used for finishing the io threads running. I will put more comments.


executor_.reset(
new hazelcast::util::hz_thread_pool(EXECUTOR_CORE_POOL_SIZE));
Expand Down Expand Up @@ -192,9 +199,18 @@ ClientConnectionManagerImpl::shutdown()
spi::impl::ClientExecutionServiceImpl::shutdown_thread_pool(
executor_.get());

// release the guard so that the io thread can stop gracefully
io_guard_.reset();
io_thread_.join();
// release the guards so that the io threads can stop gracefully
for (auto& guard : io_guards_) {
guard.reset();
}
for (auto& thread : io_threads_) {
if (thread.joinable()) {
thread.join();
}
}
io_contexts_.clear();
io_guards_.clear();
io_resolvers_.clear();

connection_listeners_.clear();
active_connections_.clear();
Expand Down Expand Up @@ -1062,12 +1078,15 @@ ClientConnectionManagerImpl::connect(const address& addr)
info,
boost::str(boost::format("Trying to connect to %1%.") % addr));

auto idx = next_io_index_.fetch_add(1) % io_contexts_.size();
auto connection = std::make_shared<Connection>(addr,
client_,
++connection_id_gen_,
*socket_factory_,
*this,
connection_timeout_millis_);
connection_timeout_millis_,
*io_contexts_[idx],
*io_resolvers_[idx]);
connection->connect();

// call the interceptor from user thread
Expand Down Expand Up @@ -1204,7 +1223,9 @@ Connection::Connection(
int connection_id, // NOLINT(cppcoreguidelines-pro-type-member-init)
internal::socket::SocketFactory& socket_factory,
ClientConnectionManagerImpl& client_connection_manager,
std::chrono::milliseconds& connect_timeout_in_millis)
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver)
: read_handler(*this, 16 << 10)
, start_time_(std::chrono::system_clock::now())
, closed_time_duration_()
Expand All @@ -1217,7 +1238,8 @@ Connection::Connection(
, last_write_time_(std::chrono::steady_clock::now().time_since_epoch())
{
(void)client_connection_manager;
socket_ = socket_factory.create(address, connect_timeout_in_millis);
socket_ =
socket_factory.create(address, connect_timeout_in_millis, io, resolver);
}

Connection::~Connection() = default;
Expand Down Expand Up @@ -1729,12 +1751,8 @@ wait_strategy::sleep()

namespace internal {
namespace socket {
SocketFactory::SocketFactory(spi::ClientContext& client_context,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver)
SocketFactory::SocketFactory(spi::ClientContext& client_context)
: client_context_(client_context)
, io_(io)
, io_resolver_(resolver)
{
}

Expand Down Expand Up @@ -1808,30 +1826,32 @@ SocketFactory::start()

std::unique_ptr<hazelcast::client::socket>
SocketFactory::create(const address& address,
std::chrono::milliseconds& connect_timeout_in_millis)
std::chrono::milliseconds& connect_timeout_in_millis,
boost::asio::io_context& io,
boost::asio::ip::tcp::resolver& resolver)
{
#ifdef HZ_BUILD_WITH_SSL
if (ssl_context_.get()) {
return std::unique_ptr<hazelcast::client::socket>(
new internal::socket::SSLSocket(io_,
new internal::socket::SSLSocket(io,
*ssl_context_,
address,
client_context_.get_client_config()
.get_network_config()
.get_socket_options(),
connect_timeout_in_millis,
io_resolver_));
resolver));
}
#endif

return std::unique_ptr<hazelcast::client::socket>(
new internal::socket::TcpSocket(io_,
new internal::socket::TcpSocket(io,
address,
client_context_.get_client_config()
.get_network_config()
.get_socket_options(),
connect_timeout_in_millis,
io_resolver_));
resolver));
}

#ifdef HZ_BUILD_WITH_SSL
Expand Down
Loading