Skip to content

Commit d7df387

Browse files
committed
client: return requested future with result argument of wait
Currently, the argument returns any decoded future - that is inconvenient and completely unusable. Let's return only the requested future, or nothing, if it's not ready. Closes #112
1 parent e646501 commit d7df387

File tree

3 files changed

+37
-14
lines changed

3 files changed

+37
-14
lines changed

src/Client/Connection.hpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ class Connection
230230

231231
template<class B, class N>
232232
friend
233-
enum DecodeStatus processResponse(Connection<B, N> &conn,
234-
Response<B> *result);
233+
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
235234

236235
template<class B, class N>
237236
friend
@@ -530,8 +529,7 @@ inputBufGC(Connection<BUFFER, NetProvider> &conn)
530529

531530
template<class BUFFER, class NetProvider>
532531
DecodeStatus
533-
processResponse(Connection<BUFFER, NetProvider> &conn,
534-
Response<BUFFER> *result)
532+
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
535533
{
536534
//Decode response. In case of success - fill in feature map
537535
//and adjust end-of-decoded data pointer. Call GC if needed.
@@ -563,7 +561,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn,
563561
}
564562
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
565563
response.header.code, ", schema=", response.header.schema_id);
566-
if (result != nullptr) {
564+
if (result != nullptr && response.header.sync == req_sync) {
567565
*result = std::move(response);
568566
} else {
569567
conn.impl->futures.insert({response.header.sync,

src/Client/Connector.hpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,17 @@ class Connector
8989
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
9090
void close(Connection<BUFFER, NetProvider> &conn);
9191
void close(ConnectionImpl<BUFFER, NetProvider> &conn);
92+
9293
private:
9394
/**
9495
* A helper to decode responses of a connection.
9596
* Can be called when the connection is not ready to decode - it's just no-op.
97+
* If `result` is not `nullptr`, it is used to return response for a request with
98+
* `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
9699
* Returns -1 in the case of any error, 0 on success.
97100
*/
98-
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
99-
Response<BUFFER> *result);
101+
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result);
102+
100103
private:
101104
NetProvider m_NetProvider;
102105
/**
@@ -170,7 +173,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
170173

171174
template<class BUFFER, class NetProvider>
172175
int
173-
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
176+
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync,
174177
Response<BUFFER> *result)
175178
{
176179
if (!hasDataToDecode(conn))
@@ -181,7 +184,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
181184

182185
int rc = 0;
183186
while (hasDataToDecode(conn)) {
184-
DecodeStatus status = processResponse(conn, result);
187+
DecodeStatus status = processResponse(conn, req_sync, result);
185188
if (status == DECODE_ERR) {
186189
rc = -1;
187190
break;
@@ -211,9 +214,10 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
211214
Timer timer{timeout};
212215
timer.start();
213216
static constexpr int INVALID_SYNC = -1;
217+
int req_sync = static_cast<int>(future);
214218
if (result != NULL)
215219
result->header.sync = INVALID_SYNC;
216-
if (connectionDecodeResponses(conn, result) != 0)
220+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
217221
return -1;
218222
if (result != NULL && result->header.sync != INVALID_SYNC) {
219223
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -225,7 +229,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
225229
strerror(errno), errno);
226230
return -1;
227231
}
228-
if (connectionDecodeResponses(conn, result) != 0)
232+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
229233
return -1;
230234
if (result != NULL && result->header.sync != INVALID_SYNC) {
231235
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -264,7 +268,7 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
264268
strerror(errno), errno);
265269
return -1;
266270
}
267-
if (connectionDecodeResponses(conn, nullptr) != 0)
271+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
268272
return -1;
269273
bool finish = true;
270274
for (size_t i = last_not_ready; i < futures.size(); ++i) {
@@ -304,7 +308,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
304308
}
305309
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
306310
assert(hasDataToDecode(conn));
307-
if (connectionDecodeResponses(conn, nullptr) != 0)
311+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
308312
return std::nullopt;
309313
return conn;
310314
}
@@ -323,7 +327,7 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
323327
strerror(errno), errno);
324328
return -1;
325329
}
326-
if (connectionDecodeResponses(conn, nullptr) != 0)
330+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
327331
return -1;
328332
if ((conn.getFutureCount() - ready_futures) >= future_count)
329333
return 0;

test/ClientTest.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,27 @@ test_wait(Connector<BUFFER, NetProvider> &client)
13031303
fail_unless(result.header.sync == static_cast<int>(f));
13041304
fail_unless(result.header.code == 0);
13051305

1306+
TEST_CASE("wait with argument result - several requests");
1307+
/* Obtain in direct order. */
1308+
f1 = conn.ping();
1309+
f2 = conn.ping();
1310+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1311+
fail_unless(result.header.sync == static_cast<int>(f1));
1312+
fail_unless(result.header.code == 0);
1313+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1314+
fail_unless(result.header.sync == static_cast<int>(f2));
1315+
fail_unless(result.header.code == 0);
1316+
1317+
/* Obtain in reversed order. */
1318+
f1 = conn.ping();
1319+
f2 = conn.ping();
1320+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1321+
fail_unless(result.header.sync == static_cast<int>(f2));
1322+
fail_unless(result.header.code == 0);
1323+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1324+
fail_unless(result.header.sync == static_cast<int>(f1));
1325+
fail_unless(result.header.code == 0);
1326+
13061327
client.close(conn);
13071328
}
13081329

0 commit comments

Comments
 (0)