Skip to content

client: fix result argument of wait #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/Client/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ class Connection

template<class B, class N>
friend
enum DecodeStatus processResponse(Connection<B, N> &conn,
Response<B> *result);
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);

template<class B, class N>
friend
Expand Down Expand Up @@ -530,8 +529,7 @@ inputBufGC(Connection<BUFFER, NetProvider> &conn)

template<class BUFFER, class NetProvider>
DecodeStatus
processResponse(Connection<BUFFER, NetProvider> &conn,
Response<BUFFER> *result)
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
{
//Decode response. In case of success - fill in feature map
//and adjust end-of-decoded data pointer. Call GC if needed.
Expand Down Expand Up @@ -563,7 +561,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn,
}
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
response.header.code, ", schema=", response.header.schema_id);
if (result != nullptr) {
if (result != nullptr && response.header.sync == req_sync) {
*result = std::move(response);
} else {
conn.impl->futures.insert({response.header.sync,
Expand Down
100 changes: 63 additions & 37 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,25 @@ class Connector
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
void close(Connection<BUFFER, NetProvider> &conn);
void close(ConnectionImpl<BUFFER, NetProvider> &conn);

private:
/**
* A helper to decode responses of a connection.
* Can be called when the connection is not ready to decode - it's just no-op.
* If `result` is not `nullptr`, it is used to return response for a request with
* `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
* Returns -1 in the case of any error, 0 on success.
*/
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync = -1,
Response<BUFFER> *result = nullptr);

private:
NetProvider m_NetProvider;
/**
* Set of connections that are ready to decode.
* Shouldn't be modified directly - is managed by methods `readyToDecode`
* and `connectionDecodeResponses`.
*/
std::set<Connection<BUFFER, NetProvider>> m_ReadyToDecode;
};

Expand Down Expand Up @@ -157,21 +174,35 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)

template<class BUFFER, class NetProvider>
int
connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
Response<BUFFER> *result)
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync,
Response<BUFFER> *result)
{
if (!hasDataToDecode(conn))
return 0;

/* Ready to decode connection must be in the corresponding set. */
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());

int rc = 0;
while (hasDataToDecode(conn)) {
DecodeStatus rc = processResponse(conn, result);
if (rc == DECODE_ERR)
return -1;
DecodeStatus status = processResponse(conn, req_sync, result);
if (status == DECODE_ERR) {
rc = -1;
break;
}
//In case we've received only a part of response
//we should wait until the rest arrives - otherwise
//we can't properly decode response. */
if (rc == DECODE_NEEDMORE)
return 0;
assert(rc == DECODE_SUCC);
if (status == DECODE_NEEDMORE) {
rc = 0;
break;
}
assert(status == DECODE_SUCC);
}
return 0;
/* A connection that has no data to decode must not be left in the set. */
if (!hasDataToDecode(conn))
m_ReadyToDecode.erase(conn);
return rc;
}

template<class BUFFER, class NetProvider>
Expand All @@ -183,24 +214,29 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
LOG_DEBUG("Waiting for the future ", future, " with timeout ", timeout);
Timer timer{timeout};
timer.start();
if (connectionDecodeResponses(conn, result) != 0)
static constexpr int INVALID_SYNC = -1;
int req_sync = static_cast<int>(future);
if (result != NULL)
result->header.sync = INVALID_SYNC;
if (connectionDecodeResponses(conn, req_sync, result) != 0)
return -1;
if (result != NULL && result->header.sync != INVALID_SYNC) {
assert(result->header.sync == req_sync);
LOG_DEBUG("Future ", future, " is ready and decoded");
return 0;
}
while (!conn.hasError() && !conn.futureIsReady(future)) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
conn.setError(std::string("Failed to poll: ") +
strerror(errno), errno);
return -1;
}
if (hasDataToDecode(conn)) {
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
if (connectionDecodeResponses(conn, result) != 0)
return -1;
/*
* In case we've handled whole data in input buffer -
* mark connection as completed.
*/
if (!hasDataToDecode(conn))
m_ReadyToDecode.erase(conn);
if (connectionDecodeResponses(conn, req_sync, result) != 0)
return -1;
if (result != NULL && result->header.sync != INVALID_SYNC) {
assert(result->header.sync == req_sync);
LOG_DEBUG("Future ", future, " is ready and decoded");
return 0;
}
if (timer.isExpired())
break;
Expand All @@ -213,6 +249,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
LOG_DEBUG("Connection has been timed out: future ", future,
" is not ready");
return -1;
} else if (result != NULL) {
*result = std::move(conn.getResponse(future));
}
LOG_DEBUG("Feature ", future, " is ready and decoded");
return 0;
Expand All @@ -233,13 +271,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
strerror(errno), errno);
return -1;
}
if (hasDataToDecode(conn)) {
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
return -1;
if (!hasDataToDecode(conn))
m_ReadyToDecode.erase(conn);
}
if (connectionDecodeResponses(conn) != 0)
return -1;
bool finish = true;
for (size_t i = last_not_ready; i < futures.size(); ++i) {
if (!conn.futureIsReady(futures[i])) {
Expand Down Expand Up @@ -278,10 +311,8 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
}
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
assert(hasDataToDecode(conn));
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
if (connectionDecodeResponses(conn) != 0)
return std::nullopt;
if (!hasDataToDecode(conn))
m_ReadyToDecode.erase(conn);
return conn;
}

Expand All @@ -299,13 +330,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
strerror(errno), errno);
return -1;
}
if (hasDataToDecode(conn)) {
assert(m_ReadyToDecode.find(conn) != m_ReadyToDecode.end());
if (connectionDecodeResponses(conn, static_cast<Response<BUFFER>*>(nullptr)) != 0)
return -1;
if (!hasDataToDecode(conn))
m_ReadyToDecode.erase(conn);
}
if (connectionDecodeResponses(conn) != 0)
return -1;
if ((conn.getFutureCount() - ready_futures) >= future_count)
return 0;
if (timer.isExpired())
Expand Down
85 changes: 85 additions & 0 deletions test/ClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,91 @@ test_wait(Connector<BUFFER, NetProvider> &client)
response = conn.getResponse(f);
fail_unless(response.has_value());

TEST_CASE("waitAny after several waits (gh-124)");
Connection<Buf_t, NetProvider> conn1(client);
Connection<Buf_t, NetProvider> conn2(client);
Connection<Buf_t, NetProvider> conn3(client);
rc = test_connect(client, conn1, localhost, port);
fail_unless(rc == 0);
rc = test_connect(client, conn2, localhost, port);
fail_unless(rc == 0);
rc = test_connect(client, conn3, localhost, port);
fail_unless(rc == 0);
rid_t f1 = conn1.ping();
rid_t f2 = conn2.ping();
rid_t f3 = conn3.ping();

/* Wait for all connections. */
fail_unless(client.wait(conn1, f1, WAIT_TIMEOUT) == 0);
fail_unless(conn1.futureIsReady(f1));
fail_unless(conn1.getResponse(f1).header.code == 0);

fail_unless(client.wait(conn2, f2, WAIT_TIMEOUT) == 0);
fail_unless(conn2.futureIsReady(f2));
fail_unless(conn2.getResponse(f2).header.code == 0);

fail_unless(client.wait(conn3, f3, WAIT_TIMEOUT) == 0);
fail_unless(conn3.futureIsReady(f3));
fail_unless(conn3.getResponse(f3).header.code == 0);

/*
* Wait any - we shouldn't get any of the connections here since we've
* received all the responses.
* Note that the connector used to crash here (gh-124) because some of the
* connnections still could appear in `m_ReadyToDecode` set.
*/
std::optional<Connection<Buf_t, NetProvider>> conn_opt = client.waitAny(WAIT_TIMEOUT);
fail_if(conn_opt.has_value());

/* Close all connections used only by the case. */
client.close(conn1);
client.close(conn2);
client.close(conn3);

TEST_CASE("wait with argument result");
f = conn.ping();
fail_unless(!conn.futureIsReady(f));
Response<BUFFER> result;
fail_unless(client.wait(conn, f, WAIT_TIMEOUT, &result) == 0);
/* The result was consumed, so the future is not ready. */
fail_unless(!conn.futureIsReady(f));
/* The future is actually request sync - check if the result is valid. */
fail_unless(result.header.sync == static_cast<int>(f));
fail_unless(result.header.code == 0);

TEST_CASE("wait with argument result for decoded future");
f = conn.ping();
fail_unless(!conn.futureIsReady(f));
fail_unless(client.wait(conn, f, WAIT_TIMEOUT) == 0);
fail_unless(conn.futureIsReady(f));
fail_unless(client.wait(conn, f, WAIT_TIMEOUT, &result) == 0);
/* The result was consumed, so the future is not ready. */
fail_unless(!conn.futureIsReady(f));
/* The future is actually request sync - check if the result is valid. */
fail_unless(result.header.sync == static_cast<int>(f));
fail_unless(result.header.code == 0);

TEST_CASE("wait with argument result - several requests");
/* Obtain in direct order. */
f1 = conn.ping();
f2 = conn.ping();
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
fail_unless(result.header.sync == static_cast<int>(f1));
fail_unless(result.header.code == 0);
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
fail_unless(result.header.sync == static_cast<int>(f2));
fail_unless(result.header.code == 0);

/* Obtain in reversed order. */
f1 = conn.ping();
f2 = conn.ping();
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
fail_unless(result.header.sync == static_cast<int>(f2));
fail_unless(result.header.code == 0);
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
fail_unless(result.header.sync == static_cast<int>(f1));
fail_unless(result.header.code == 0);

client.close(conn);
}

Expand Down
Loading