Skip to content

Commit a9ed8e0

Browse files
chore: rmove unpin to save read latency
1 parent ed97f4a commit a9ed8e0

1 file changed

Lines changed: 12 additions & 36 deletions

File tree

mooncake-store/src/p2p_client_service.cpp

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,8 +1525,7 @@ async_simple::coro::Lazy<ErrorCode> P2PClientService::RunForwardReadOnRoute(
15251525
cleanup.key = req->key;
15261526
cleanup.read_operation_id = pin.value().read_operation_id;
15271527
tl::expected<void, ErrorCode> cleanup_unpin;
1528-
for (int attempt = 0; attempt < kRevokeRetryMaxCnt;
1529-
++attempt) {
1528+
for (int attempt = 0; attempt < kRevokeRetryMaxCnt; ++attempt) {
15301529
cleanup_unpin = co_await route.peer->AsyncUnPinKey(cleanup);
15311530
if (cleanup_unpin) {
15321531
break;
@@ -1548,31 +1547,8 @@ async_simple::coro::Lazy<ErrorCode> P2PClientService::RunForwardReadOnRoute(
15481547
}
15491548
co_return tr.error();
15501549
}
1551-
UnPinKeyRequest unpin_req;
1552-
unpin_req.key = req->key;
1553-
unpin_req.read_operation_id = pin.value().read_operation_id;
1554-
tl::expected<void, ErrorCode> unpin_res;
1555-
for (int attempt = 0; attempt < kRevokeRetryMaxCnt;
1556-
++attempt) {
1557-
unpin_res = co_await route.peer->AsyncUnPinKey(unpin_req);
1558-
if (unpin_res) {
1559-
break;
1560-
}
1561-
if (unpin_res.error() == ErrorCode::LEASE_EXPIRED) {
1562-
unpin_res = tl::expected<void, ErrorCode>{};
1563-
break;
1564-
}
1565-
if (attempt + 1 < kRevokeRetryMaxCnt) {
1566-
LOG(WARNING) << "AsyncUnPinKey retry after forward read, key="
1567-
<< req->key << ", attempt=" << (attempt + 1)
1568-
<< ", error=" << unpin_res.error();
1569-
}
1570-
}
1571-
if (!unpin_res) {
1572-
LOG(ERROR) << "AsyncUnPinKey failed after forward read, key="
1573-
<< req->key << ", error=" << unpin_res.error();
1574-
co_return unpin_res.error();
1575-
}
1550+
// Success: skip UnPinKey to save RPC latency; owner lease / scanner
1551+
// releases pin.
15761552
tl::expected<void, ErrorCode> ok;
15771553
promise->setValue(std::move(ok));
15781554
co_return ErrorCode::OK;
@@ -1591,13 +1567,13 @@ async_simple::coro::Lazy<void> P2PClientService::RunReadWithRetry(
15911567
transfer_direction_mode_ == TransferDirectionMode::FORWARD;
15921568
ErrorCode route_result;
15931569
if (forward_read) {
1594-
route_result = co_await RunForwardReadOnRoute(*route, req,
1595-
promise);
1570+
route_result =
1571+
co_await RunForwardReadOnRoute(*route, req, promise);
15961572
} else {
15971573
auto result =
15981574
co_await route->peer->AsyncReadRemoteData(*req);
1599-
route_result = result.has_value() ? ErrorCode::OK
1600-
: result.error();
1575+
route_result =
1576+
result.has_value() ? ErrorCode::OK : result.error();
16011577
}
16021578

16031579
if (route_result == ErrorCode::OK) {
@@ -1616,11 +1592,12 @@ async_simple::coro::Lazy<void> P2PClientService::RunReadWithRetry(
16161592
<< ", client_id: " << route->proxy.client_id
16171593
<< ", segment_id: " << route->proxy.segment_id
16181594
<< ", is_cached: " << route->is_cached;
1619-
// Request/local constraint errors; another route cannot help.
1595+
// Request/local constraint errors; another route cannot
1596+
// help.
16201597
if (route_result == ErrorCode::INVALID_PARAMS ||
16211598
route_result == ErrorCode::NOT_IMPLEMENTED) {
1622-
promise->setValue(
1623-
tl::make_unexpected(route_result));
1599+
promise->setValue(tl::expected<void, ErrorCode>(
1600+
tl::unexpected(route_result)));
16241601
co_return;
16251602
}
16261603
}
@@ -2042,8 +2019,7 @@ P2PClientService::RemoteForwardWriteOp::RunForwardRemotePut(
20422019
revoke_req.key = write_req->key;
20432020
revoke_req.write_operation_id = pre.value().write_operation_id;
20442021
tl::expected<void, ErrorCode> revoke_res;
2045-
for (int attempt = 0; attempt < kRevokeRetryMaxCnt;
2046-
++attempt) {
2022+
for (int attempt = 0; attempt < kRevokeRetryMaxCnt; ++attempt) {
20472023
revoke_res = co_await peer->AsyncWriteRevoke(revoke_req);
20482024
if (revoke_res) {
20492025
break;

0 commit comments

Comments
 (0)