Skip to content

Commit 88ca8df

Browse files
committed
rework pending waiter signaling logic
1 parent 62fe4f3 commit 88ca8df

File tree

2 files changed

+37
-14
lines changed

2 files changed

+37
-14
lines changed

server/src/unifyfs_p2p_rpc.c

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,6 @@ int wait_for_p2p_request(p2p_request* preq)
147147

148148
void cleanup_p2p_request(p2p_request* preq)
149149
{
150-
/* cleanup p2p rpc state */
151-
int rc = cleanup_rpc_state(preq->req_state);
152-
if (rc != UNIFYFS_SUCCESS) {
153-
LOGERR("failed to cleanup rpc state for p2p request(%p)", preq);
154-
}
155-
156150
/* cleanup pending client reqs */
157151
if (NULL != preq->pending_client_reqs) {
158152
/* NOTE: normally, the pending client list should
@@ -173,12 +167,28 @@ void cleanup_p2p_request(p2p_request* preq)
173167
preq->pending_client_reqs = NULL;
174168
}
175169

176-
if (ABT_COND_NULL != preq->pending_cond) {
177-
ABT_cond_free(&(preq->pending_cond));
178-
}
170+
/* release other pending state */
179171
if (ABT_MUTEX_NULL != preq->pending_sync) {
172+
if (ABT_COND_NULL != preq->pending_cond) {
173+
do {
174+
int waiters = 0;
175+
ABT_mutex_lock(preq->pending_sync);
176+
waiters = preq->pending_waiters;
177+
ABT_mutex_unlock(preq->pending_sync);
178+
if (waiters) {
179+
ABT_cond_broadcast(preq->pending_cond);
180+
}
181+
while (waiters > 0);
182+
ABT_cond_free(&(preq->pending_cond));
183+
}
180184
ABT_mutex_free(&(preq->pending_sync));
181185
}
186+
187+
/* cleanup p2p rpc state */
188+
int rc = cleanup_rpc_state(preq->req_state);
189+
if (rc != UNIFYFS_SUCCESS) {
190+
LOGERR("failed to cleanup rpc state for p2p request(%p)", preq);
191+
}
182192
}
183193

184194
int add_pending_remote_request(int peer_rank,
@@ -941,6 +951,7 @@ int unifyfs_invoke_get_extents_rpc(int gfid,
941951
clock_gettime(CLOCK_REALTIME, &timeout);
942952
timeout.tv_sec += 5;
943953
ABT_mutex_lock(preq->pending_sync);
954+
preq->pending_waiters++;
944955
LOGDBG("waiting on pending get_extents condition for preq(%p)", preq);
945956
rc = ABT_cond_timedwait(preq->pending_cond, preq->pending_sync,
946957
&timeout);
@@ -951,11 +962,14 @@ int unifyfs_invoke_get_extents_rpc(int gfid,
951962
LOGERR("failed to wait on condition (err=%d)", rc);
952963
ret = UNIFYFS_ERROR_MARGO;
953964
} else {
954-
LOGDBG("pending get_extents condition for preq(%p) was signaled",
965+
LOGDBG("pending get_extents condition for preq(%p) was signaled",
955966
preq);
956967
}
968+
preq->pending_waiters--;
957969
ABT_mutex_unlock(preq->pending_sync);
958970
return ret;
971+
} else {
972+
LOGDBG("added pending get_extents for gfid=%d - preq(%p)", gfid, preq);
959973
}
960974

961975
assert(preq->req_state != NULL);
@@ -1004,15 +1018,23 @@ int unifyfs_invoke_get_extents_rpc(int gfid,
10041018
}
10051019
}
10061020

1007-
ABT_cond_broadcast(preq->pending_cond);
1008-
10091021
clear_pending_extents_get:
1010-
LOGDBG("clearing pending get_extents for gfid=%d", gfid);
1022+
LOGDBG("clearing pending get_extents for preq(%p) gfid=%d", preq, gfid);
10111023
rc = clear_pending_remote_request(preq);
10121024
if (rc != UNIFYFS_SUCCESS) {
10131025
LOGWARN("failed to clear pending metaget for gfid=%d", gfid);
10141026
}
10151027

1028+
int waiters = 0;
1029+
ABT_mutex_lock(preq->pending_sync);
1030+
waiters = preq->pending_waiters;
1031+
ABT_mutex_unlock(preq->pending_sync);
1032+
if (waiters > 1) {
1033+
ABT_cond_broadcast(preq->pending_cond);
1034+
} else if (waiters == 1) {
1035+
ABT_cond_signal(preq->pending_cond);
1036+
}
1037+
10161038
cleanup_p2p_request(preq);
10171039

10181040
return ret;

server/src/unifyfs_p2p_rpc.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ typedef struct {
3838
client_rpc_req_t* client_req; // for only one
3939
arraylist_t* pending_client_reqs; // for more than one
4040

41-
ABT_cond pending_cond; /* condition to signal upon pending completion */
41+
ABT_cond pending_cond; /* condition to signal upon pending completion */
4242
ABT_mutex pending_sync; /* mutex for above condition variable */
43+
int pending_waiters; /* track number of pending waiters */
4344
} p2p_request;
4445

4546
/* helper method to initialize peer rpc request */

0 commit comments

Comments
 (0)