Skip to content

client: enhance waiting methods #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ class Connector
const std::string& addr, unsigned port);

int wait(Connection<BUFFER, NetProvider> &conn, rid_t future,
int timeout = 0, Response<BUFFER> *result = nullptr);
int timeout = -1, Response<BUFFER> *result = nullptr);
int waitAll(Connection<BUFFER, NetProvider> &conn,
const std::vector<rid_t > &futures, int timeout = 0);
const std::vector<rid_t > &futures, int timeout = -1);
int waitCount(Connection<BUFFER, NetProvider> &conn,
size_t feature_count, int timeout = 0);
size_t feature_count, int timeout = -1);
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = -1);
////////////////////////////Service interfaces//////////////////////////
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = 0);
void readyToDecode(const Connection<BUFFER, NetProvider> &conn);
void readyToSend(const Connection<BUFFER, NetProvider> &conn);
void finishSend(const Connection<BUFFER, NetProvider> &conn);
Expand Down
4 changes: 2 additions & 2 deletions src/Client/EpollNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ template<class BUFFER, class Stream>
int
EpollNetProvider<BUFFER, Stream>::wait(int timeout)
{
assert(timeout >= 0);
if (timeout == 0)
assert(timeout >= -1);
if (timeout == -1)
timeout = TIMEOUT_INFINITY;
LOG_DEBUG("Network engine wait for ", timeout, " milliseconds");
/* Send pending requests. */
Expand Down
2 changes: 1 addition & 1 deletion src/Client/LibevNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ template<class BUFFER, class Stream>
int
LibevNetProvider<BUFFER, Stream>::wait(int timeout)
{
assert(timeout >= 0);
assert(timeout >= -1);
if (timeout > 0) {
ev_timer_init(&m_TimeoutWatcher, &timeout_cb, timeout / MILLISECONDS, 0 /* repeat */);
ev_timer_start(m_Loop, &m_TimeoutWatcher);
Expand Down
4 changes: 2 additions & 2 deletions src/Utils/Timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Timer {
}
bool isExpired() const
{
if (m_Timeout == std::chrono::milliseconds{0})
if (m_Timeout == std::chrono::milliseconds{-1})
return false;
std::chrono::time_point<std::chrono::steady_clock> end =
std::chrono::steady_clock::now();
Expand All @@ -50,7 +50,7 @@ class Timer {
}
int elapsed() const
{
if (m_Timeout == std::chrono::milliseconds{0})
if (m_Timeout == std::chrono::milliseconds{-1})
return 0;
std::chrono::time_point<std::chrono::steady_clock> end =
std::chrono::steady_clock::now();
Expand Down
75 changes: 67 additions & 8 deletions test/ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,11 @@ auto_close(Connector<BUFFER, NetProvider> &client)
/** Several connection, separate/sequence pings, no errors */
template <class BUFFER, class NetProvider>
void
many_conn_ping(Connector<BUFFER, NetProvider> &client)
many_conn_ping(void)
{
TEST_INIT(0);
/* FIXME(gh-123,gh-124): use own client not to leave hanging connection. */
Connector<Buf_t, NetProvider> client;
Connection<Buf_t, NetProvider> conn1(client);
Connection<Buf_t, NetProvider> conn2(client);
Connection<Buf_t, NetProvider> conn3(client);
Expand Down Expand Up @@ -1033,15 +1035,16 @@ test_auth(Connector<BUFFER, NetProvider> &client)
}

/** Single connection, write to closed connection. */
template <class BUFFER, class NetProvider>
void
test_sigpipe(Connector<BUFFER, NetProvider> &client)
test_sigpipe(void)
{
TEST_INIT(0);

int rc = ::launchDummyServer(localhost, dummy_server_port);
fail_unless(rc == 0);

/* FIXME(gh-122): use own client not to leave hanging dead connection. */
Connector<Buf_t, NetProvider> client;
Connection<Buf_t, NetProvider> conn(client);
rc = ::test_connect(client, conn, localhost, dummy_server_port);
fail_unless(rc == 0);
Expand All @@ -1063,15 +1066,16 @@ test_sigpipe(Connector<BUFFER, NetProvider> &client)
}

/** Single connection, wait response from closed connection. */
template <class BUFFER, class NetProvider>
void
test_dead_connection_wait(Connector<BUFFER, NetProvider> &client)
test_dead_connection_wait(void)
{
TEST_INIT(0);

int rc = ::launchDummyServer(localhost, dummy_server_port);
fail_unless(rc == 0);

/* FIXME(gh-122): use own client not to leave hanging dead connection. */
Connector<Buf_t, NetProvider> client;
Connection<Buf_t, NetProvider> conn(client);
rc = ::test_connect(client, conn, localhost, dummy_server_port);
fail_unless(rc == 0);
Expand Down Expand Up @@ -1140,6 +1144,60 @@ response_decoding(Connector<BUFFER, NetProvider> &client)
client.close(conn);
}

/** Checks all available `wait` methods of connector. */
template <class BUFFER, class NetProvider>
void
test_wait(Connector<BUFFER, NetProvider> &client)
{
TEST_INIT(0);
static constexpr double SLEEP_TIME = 0.1;

Connection<Buf_t, NetProvider> conn(client);
int rc = test_connect(client, conn, localhost, port);
fail_unless(rc == 0);

TEST_CASE("wait(0) and wait(-1)");
rid_t f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
fail_unless(!conn.futureIsReady(f));
client.wait(conn, f, 0);
fail_unless(!conn.futureIsReady(f));
client.wait(conn, f, -1);
fail_unless(conn.futureIsReady(f));
std::optional<Response<Buf_t>> response = conn.getResponse(f);
fail_unless(response.has_value());

TEST_CASE("waitAny(0) and waitAny(-1)");
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
fail_unless(!client.waitAny(0).has_value());
fail_unless(client.waitAny(-1).has_value());
response = conn.getResponse(f);
fail_unless(response.has_value());

TEST_CASE("waitAll(0) and waitAll(-1)");
std::vector<rid_t> fs;
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
fail_unless(client.waitAll(conn, fs, 0) == -1);
fail_unless(client.waitAll(conn, fs, -1) == 0);
response = conn.getResponse(fs[0]);
fail_unless(response.has_value());
response = conn.getResponse(fs[1]);
fail_unless(response.has_value());

TEST_CASE("waitCount(0) and waitCount(-1)");
fs.clear();
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
fail_unless(client.waitCount(conn, 2, 0) == -1);
fail_unless(client.waitCount(conn, 2, -1) == 0);
response = conn.getResponse(fs[0]);
fail_unless(response.has_value());
response = conn.getResponse(fs[1]);
fail_unless(response.has_value());

client.close(conn);
}

int main()
{
#ifdef TNTCXX_ENABLE_SSL
Expand Down Expand Up @@ -1167,7 +1225,7 @@ int main()
trivial<Buf_t, NetProvider>(client);
single_conn_ping<Buf_t, NetProvider>(client);
auto_close<Buf_t, NetProvider>(client);
many_conn_ping<Buf_t, NetProvider>(client);
many_conn_ping<Buf_t, NetProvider>();
single_conn_error<Buf_t, NetProvider>(client);
single_conn_replace<Buf_t, NetProvider>(client);
single_conn_insert<Buf_t, NetProvider>(client);
Expand All @@ -1185,9 +1243,10 @@ int main()
* an a lot more complex state machine.
*/
#ifndef TNTCXX_ENABLE_SSL
::test_sigpipe(client);
::test_sigpipe();
#endif
::test_dead_connection_wait(client);
::test_dead_connection_wait();
response_decoding(client);
test_wait(client);
return 0;
}
6 changes: 6 additions & 0 deletions test/cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ function remote_echo(...)
return {...}
end

function remote_sleep(timeout)
local fiber = require('fiber')
fiber.sleep(timeout)
return nil
end

function get_rps()
return box.stat.net().REQUESTS.rps
end
Expand Down
6 changes: 6 additions & 0 deletions test/cfg_ssl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ function remote_echo(...)
return {...}
end

function remote_sleep(timeout)
local fiber = require('fiber')
fiber.sleep(timeout)
return nil
end

function get_rps()
return box.stat.net().REQUESTS.rps
end
Expand Down