Skip to content

Commit 58fd517

Browse files
committed
free bulk response buffer
1 parent fbe6721 commit 58fd517

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

common/src/unifyfs_rpc_types.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ typedef struct rpc_state {
3434
size_t inputs_sz; // if non-zero, we are allocating space for input_args
3535
size_t outputs_sz; // if non-zero, we are allocating space for output_args
3636

37-
void* bulk_buf;
37+
void* bulk_buf; // set this to free buf on cleanup
3838
size_t bulk_sz;
3939
hg_bulk_t bulk; // set this to free bulk on cleanup
4040

common/src/unifyfs_rpc_util.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ create_rpc_response(hg_handle_t handle,
203203
if (NULL != new_rpc) {
204204
LOGDBG("created state for response rpc(%p) with handle(%p)",
205205
new_rpc, handle);
206-
206+
207207
new_rpc->initiator = 0;
208208
new_rpc->handle = handle;
209209

@@ -216,11 +216,11 @@ create_rpc_response(hg_handle_t handle,
216216
assert(hgi);
217217
new_rpc->rpc_id = hgi->id;
218218
}
219-
219+
220220
if (NULL != input) {
221221
new_rpc->inputs = input;
222222
new_rpc->have_input = 1;
223-
}
223+
}
224224

225225
if (NULL != output) {
226226
new_rpc->outputs = output;
@@ -268,7 +268,12 @@ int cleanup_rpc_state(rpc_state* rpc)
268268
if (HG_BULK_NULL != rpc->bulk) {
269269
LOGDBG("calling margo_bulk_free(%p) for rpc(%p)", rpc->bulk, rpc);
270270
margo_bulk_free(rpc->bulk);
271-
}
271+
}
272+
273+
if (NULL != rpc->bulk_buf) {
274+
LOGDBG("freeing bulk buffer(%p) for rpc(%p)", rpc->bulk_buf, rpc);
275+
free(rpc->bulk_buf);
276+
}
272277

273278
if ((NULL != rpc->inputs) && (0 != rpc->inputs_sz)) {
274279
/* free since we allocated it */
@@ -387,7 +392,7 @@ int async_rpc_request_finish(rpc_state* rpc)
387392
return EINVAL;
388393

389394
int ret = UNIFYFS_SUCCESS;
390-
395+
391396
hg_return_t hret = margo_wait(rpc->mreq);
392397
if (hret == HG_SUCCESS) {
393398
if (NULL != rpc->outputs) {
@@ -408,7 +413,7 @@ int async_rpc_request_finish(rpc_state* rpc)
408413
LOGERR("margo_wait(%p) failed - %s",
409414
rpc->mreq, HG_Error_to_string(hret));
410415
ret = UNIFYFS_ERROR_MARGO;
411-
}
416+
}
412417
return ret;
413418
}
414419

@@ -563,7 +568,7 @@ int push_margo_bulk(hg_handle_t rpc_hdl,
563568
HG_Error_to_string(hret));
564569
return UNIFYFS_ERROR_MARGO;
565570
}
566-
571+
567572
/* execute the transfer to push data from local buffer
568573
* into remote buffer.
569574
*
@@ -590,7 +595,7 @@ int push_margo_bulk(hg_handle_t rpc_hdl,
590595

591596
if (hret == HG_SUCCESS) {
592597
LOGDBG("successful bulk push (%zu bytes)", buf_sz);
593-
598+
594599
/* deregister our bulk transfer buffer */
595600
margo_bulk_free(bulk_local);
596601
} else {

server/src/unifyfs_p2p_rpc.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,7 @@ static void process_get_extents_rpc(server_rpc_req_t* sreq)
11271127
} else {
11281128
/* set request output bulk for auto-free at cleanup */
11291129
sreq->req_state->bulk = bulk_handle;
1130+
sreq->req_state->bulk_buf = buf;
11301131
bulk_resp_handle = bulk_handle;
11311132
LOGDBG("returning %zu extents for gfid=%d to rank=%d",
11321133
num_extents, gfid, sender);

0 commit comments

Comments
 (0)