Skip to content

Commit 2af6952

Browse files
committed
client: return requested future with result argument of wait
Currently, the argument returns any decoded future - that is inconvenient and completely unusable. Let's return only the requested future, or nothing, if it's not ready. Along the way, reformat argument list of modified functions to make them conform clang-format. Closes #112
1 parent 206d6ee commit 2af6952

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

src/Client/Connection.hpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ class Connection
230230

231231
template<class B, class N>
232232
friend
233-
enum DecodeStatus processResponse(Connection<B, N> &conn,
234-
Response<B> *result);
233+
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
235234

236235
template<class B, class N>
237236
friend
@@ -530,8 +529,7 @@ inputBufGC(Connection<BUFFER, NetProvider> &conn)
530529

531530
template<class BUFFER, class NetProvider>
532531
DecodeStatus
533-
processResponse(Connection<BUFFER, NetProvider> &conn,
534-
Response<BUFFER> *result)
532+
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
535533
{
536534
//Decode response. In case of success - fill in feature map
537535
//and adjust end-of-decoded data pointer. Call GC if needed.
@@ -563,7 +561,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn,
563561
}
564562
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
565563
response.header.code, ", schema=", response.header.schema_id);
566-
if (result != nullptr) {
564+
if (result != nullptr && response.header.sync == req_sync) {
567565
*result = std::move(response);
568566
} else {
569567
conn.impl->futures.insert({response.header.sync,

src/Client/Connector.hpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,11 @@ class Connector
9494
/**
9595
* A helper to decode responses of a connection.
9696
* Can be called when the connection is not ready to decode - it's just no-op.
97+
* If `result` is not `nullptr`, it is used to return response for a request with
98+
* `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
9799
* Returns -1 in the case of any error, 0 on success.
98100
*/
99-
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
101+
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync = -1,
100102
Response<BUFFER> *result = nullptr);
101103

102104
private:
@@ -172,7 +174,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
172174

173175
template<class BUFFER, class NetProvider>
174176
int
175-
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
177+
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync,
176178
Response<BUFFER> *result)
177179
{
178180
if (!hasDataToDecode(conn))
@@ -183,7 +185,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
183185

184186
int rc = 0;
185187
while (hasDataToDecode(conn)) {
186-
DecodeStatus status = processResponse(conn, result);
188+
DecodeStatus status = processResponse(conn, req_sync, result);
187189
if (status == DECODE_ERR) {
188190
rc = -1;
189191
break;
@@ -213,11 +215,13 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
213215
Timer timer{timeout};
214216
timer.start();
215217
static constexpr int INVALID_SYNC = -1;
218+
int req_sync = static_cast<int>(future);
216219
if (result != NULL)
217220
result->header.sync = INVALID_SYNC;
218-
if (connectionDecodeResponses(conn, result) != 0)
221+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
219222
return -1;
220223
if (result != NULL && result->header.sync != INVALID_SYNC) {
224+
assert(result->header.sync == req_sync);
221225
LOG_DEBUG("Future ", future, " is ready and decoded");
222226
return 0;
223227
}
@@ -227,9 +231,10 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
227231
strerror(errno), errno);
228232
return -1;
229233
}
230-
if (connectionDecodeResponses(conn, result) != 0)
234+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
231235
return -1;
232236
if (result != NULL && result->header.sync != INVALID_SYNC) {
237+
assert(result->header.sync == req_sync);
233238
LOG_DEBUG("Future ", future, " is ready and decoded");
234239
return 0;
235240
}

test/ClientTest.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,27 @@ test_wait(Connector<BUFFER, NetProvider> &client)
13031303
fail_unless(result.header.sync == static_cast<int>(f));
13041304
fail_unless(result.header.code == 0);
13051305

1306+
TEST_CASE("wait with argument result - several requests");
1307+
/* Obtain in direct order. */
1308+
f1 = conn.ping();
1309+
f2 = conn.ping();
1310+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1311+
fail_unless(result.header.sync == static_cast<int>(f1));
1312+
fail_unless(result.header.code == 0);
1313+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1314+
fail_unless(result.header.sync == static_cast<int>(f2));
1315+
fail_unless(result.header.code == 0);
1316+
1317+
/* Obtain in reversed order. */
1318+
f1 = conn.ping();
1319+
f2 = conn.ping();
1320+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1321+
fail_unless(result.header.sync == static_cast<int>(f2));
1322+
fail_unless(result.header.code == 0);
1323+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1324+
fail_unless(result.header.sync == static_cast<int>(f1));
1325+
fail_unless(result.header.code == 0);
1326+
13061327
client.close(conn);
13071328
}
13081329

0 commit comments

Comments
 (0)