Skip to content

client: enhance waiting methods #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ if (rc != 0) {
}
```

Note that some methods can return `rc < 0` without any error. For example, `wait`
can return `-1` when timeout is exceeded. In order to differentiate between such
situations, one can use method `Connection::hasError()`.

To reset connection after errors (clean up error message and connection status),
one can use `Connection::reset()`.

Expand All @@ -182,6 +186,51 @@ request is ready, `wait()` terminates. It also provides negative return code in
case of system related fails (e.g. broken or time outed connection). If `wait()`
returns 0, then response is received and expected to be parsed.

### Waiting for Responses

The connector provides several wait methods. All methods accept an integer `timeout`
argument with the following semantics:
* If `timeout > 0`, the connector blocks for `timeout` **milliseconds** or until
all required responses are received. Time is measured against the monotonic clock.
* If `timeout == 0`, the connector decodes all available responses and returns
immediately.
* If `timeout == -1`, the connector blocks until required responses are received
(basically, no timeout).

All the waiting functions (except for `waitAny`, its description will be later)
return `0` on success and `-1` in the case of any internal error (for example,
when the underlying connection is closed) or when timeout is exceeded.
See [this section](#error-handling) for error handling details.

Method `wait` waits for one request:
```c++
int rc = client.wait(conn, ping, WAIT_TIMEOUT);
```
An optional argument allows to obtain response right away in the case of success:
```c++
Response<Buf_t> response;
int rc = client.wait(conn, ping, WAIT_TIMEOUT, &response);
```

Method `waitAll` waits for completion of all the given requests of a connection:
```c++
std::vector<rid_t> futures{ping1, ping2, call, replace};
int rc = client.waitAll(conn, futures, WAIT_TIMEOUT);
```

Method `waitCount` waits until the given connection will complete any `future_count` requests:
```c++
int rc = client.waitCount(conn, future_count, WAIT_TIMEOUT);
```

Method `waitAny` is different - it allows to poll all the connections simultaneously.
In the case of success, the function returns a connection that received a response.
In the case of internal error or when timeout is exceeded, returns `std::nullopt`.
See [this section](#error-handling) for error handling details.
```c++
std::optional<Connection<Buf, NetProvider>> conn_ready = client.waitAny(WAIT_TIMEOUT);
```

### Receiving Responses

To get the response when it is ready, we can use `Connection::getResponse()`.
Expand Down
44 changes: 26 additions & 18 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ class Connector
const std::string& addr, unsigned port);

int wait(Connection<BUFFER, NetProvider> &conn, rid_t future,
int timeout = 0, Response<BUFFER> *result = nullptr);
int timeout = -1, Response<BUFFER> *result = nullptr);
int waitAll(Connection<BUFFER, NetProvider> &conn,
const std::vector<rid_t > &futures, int timeout = 0);
const std::vector<rid_t > &futures, int timeout = -1);
int waitCount(Connection<BUFFER, NetProvider> &conn,
size_t feature_count, int timeout = 0);
size_t feature_count, int timeout = -1);
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = -1);
////////////////////////////Service interfaces//////////////////////////
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = 0);
void readyToDecode(const Connection<BUFFER, NetProvider> &conn);
void readyToSend(const Connection<BUFFER, NetProvider> &conn);
void finishSend(const Connection<BUFFER, NetProvider> &conn);
Expand Down Expand Up @@ -185,9 +185,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
timer.start();
if (connectionDecodeResponses(conn, result) != 0)
return -1;
while (!conn.hasError() && !conn.futureIsReady(future) &&
!timer.isExpired()) {
if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) {
while (!conn.hasError() && !conn.futureIsReady(future)) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
conn.setError(std::string("Failed to poll: ") +
strerror(errno), errno);
return -1;
Expand All @@ -203,13 +202,15 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
if (!hasDataToDecode(conn))
m_ReadyToDecode.erase(conn);
}
if (timer.isExpired())
break;
}
if (conn.hasError()) {
LOG_ERROR("Connection got an error: ", conn.getError().msg);
return -1;
}
if (! conn.futureIsReady(future)) {
LOG_ERROR("Connection has been timed out: future ", future,
LOG_DEBUG("Connection has been timed out: future ", future,
" is not ready");
return -1;
}
Expand All @@ -226,8 +227,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
Timer timer{timeout};
timer.start();
size_t last_not_ready = 0;
while (!conn.hasError() && !timer.isExpired()) {
if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) {
while (!conn.hasError()) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
conn.setError(std::string("Failed to poll: ") +
strerror(errno), errno);
return -1;
Expand All @@ -249,12 +250,14 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
}
if (finish)
return 0;
if (timer.isExpired())
break;
}
if (conn.hasError()) {
LOG_ERROR("Connection got an error: ", conn.getError().msg);
return -1;
}
LOG_ERROR("Connection has been timed out: not all futures are ready");
LOG_DEBUG("Connection has been timed out: not all futures are ready");
return -1;
}

Expand All @@ -264,10 +267,13 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
{
Timer timer{timeout};
timer.start();
while (m_ReadyToDecode.empty() && !timer.isExpired())
m_NetProvider.wait(timeout - timer.elapsed());
while (m_ReadyToDecode.empty()) {
m_NetProvider.wait(timer.timeLeft());
if (timer.isExpired())
break;
}
if (m_ReadyToDecode.empty()) {
LOG_ERROR("wait() has been timed out! No responses are received");
LOG_DEBUG("wait() has been timed out! No responses are received");
return std::nullopt;
}
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
Expand All @@ -287,8 +293,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
Timer timer{timeout};
timer.start();
size_t ready_futures = conn.getFutureCount();
while (!conn.hasError() && !timer.isExpired()) {
if (m_NetProvider.wait(timeout - timer.elapsed()) != 0) {
while (!conn.hasError()) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
conn.setError(std::string("Failed to poll: ") +
strerror(errno), errno);
return -1;
Expand All @@ -302,13 +308,15 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
}
if ((conn.getFutureCount() - ready_futures) >= future_count)
return 0;
if (timer.isExpired())
break;
}
if (conn.hasError()) {
LOG_ERROR("Connection got an error: ", conn.getError().msg);
return -1;
}
LOG_ERROR("Connection has been timed out: only ",
conn.getFutureCount() - ready_futures, " are ready");
LOG_DEBUG("Connection has been timed out: only ",
conn.getFutureCount() - ready_futures, " are ready");
return -1;
}

Expand Down
4 changes: 2 additions & 2 deletions src/Client/EpollNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ template<class BUFFER, class Stream>
int
EpollNetProvider<BUFFER, Stream>::wait(int timeout)
{
assert(timeout >= 0);
if (timeout == 0)
assert(timeout >= -1);
if (timeout == -1)
timeout = TIMEOUT_INFINITY;
LOG_DEBUG("Network engine wait for ", timeout, " milliseconds");
/* Send pending requests. */
Expand Down
8 changes: 5 additions & 3 deletions src/Client/LibevNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ void
LibevNetProvider<BUFFER, Stream>::timeout_cb(EV_P_ ev_timer *w, int /* revents */)
{
(void) w;
LOG_ERROR("Libev timed out!");
LOG_DEBUG("Libev timed out!");
/* Stop external loop */
ev_break(EV_A_ EVBREAK_ONE);
}
Expand All @@ -363,7 +363,7 @@ template<class BUFFER, class Stream>
int
LibevNetProvider<BUFFER, Stream>::wait(int timeout)
{
assert(timeout >= 0);
assert(timeout >= -1);
if (timeout > 0) {
ev_timer_init(&m_TimeoutWatcher, &timeout_cb, timeout / MILLISECONDS, 0 /* repeat */);
ev_timer_start(m_Loop, &m_TimeoutWatcher);
Expand All @@ -381,6 +381,8 @@ LibevNetProvider<BUFFER, Stream>::wait(int timeout)

}
}
ev_run(m_Loop, EVRUN_ONCE);
/* Work in non-blocking mode when the timeout is zero. */
int flags = timeout == 0 ? EVRUN_NOWAIT : EVRUN_ONCE;
ev_run(m_Loop, flags);
return 0;
}
20 changes: 15 additions & 5 deletions src/Utils/Timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <algorithm>
#include <chrono>

class Timer {
Expand All @@ -40,21 +41,30 @@ class Timer {
}
bool isExpired() const
{
if (m_Timeout == std::chrono::milliseconds{0})
if (m_Timeout == std::chrono::milliseconds{-1})
return false;
std::chrono::time_point<std::chrono::steady_clock> end =
std::chrono::steady_clock::now();
std::chrono::milliseconds elapsed =
std::chrono::duration_cast<std::chrono::milliseconds>(end - m_Start);
return elapsed >= m_Timeout;
}
int elapsed() const
/**
* The function to obtain amount of time left. Returns:
* 1. `-1` if the initial timeout was `-1`.
* 2. `0` if the timer has expired.
* 3. Otherwise, amount of milliseconds left is returned.
* NB: the function should not be used for expiration check - use `isExpired` instead.
*/
int timeLeft() const
{
if (m_Timeout == std::chrono::milliseconds{0})
return 0;
if (m_Timeout == std::chrono::milliseconds{-1})
return -1;
std::chrono::time_point<std::chrono::steady_clock> end =
std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(end - m_Start).count();
int timeLeft = m_Timeout.count() -
std::chrono::duration_cast<std::chrono::milliseconds>(end - m_Start).count();
return std::max(0, timeLeft);
}
private:
std::chrono::milliseconds m_Timeout;
Expand Down
Loading
Loading