Skip to content

Commit fbe6721

Browse files
committed
debugging bulk transfer failures
1 parent 912a728 commit fbe6721

File tree

3 files changed

+65
-50
lines changed

3 files changed

+65
-50
lines changed

common/src/unifyfs_rpc_util.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ void* pull_margo_bulk(hg_handle_t rpc_hdl,
517517
} while (remain > 0);
518518

519519
if (hret == HG_SUCCESS) {
520-
LOGDBG("successful bulk transfer (%zu bytes)", bulk_sz);
520+
LOGDBG("successful bulk pull (%zu bytes)", bulk_sz);
521521
if (local_bulk != NULL) {
522522
*local_bulk = bulk_local;
523523
} else {
@@ -526,7 +526,7 @@ void* pull_margo_bulk(hg_handle_t rpc_hdl,
526526
}
527527
return buffer;
528528
} else {
529-
LOGERR("failed bulk transfer (transferred %zu of %zu bytes) - %s",
529+
LOGERR("failed bulk pull (transferred %zu of %zu bytes) - %s",
530530
(bulk_sz - remain), bulk_sz, HG_Error_to_string(hret));
531531
free(buffer);
532532
return NULL;
@@ -589,12 +589,12 @@ int push_margo_bulk(hg_handle_t rpc_hdl,
589589
} while (remain > 0);
590590

591591
if (hret == HG_SUCCESS) {
592-
LOGDBG("successful bulk transfer (%zu bytes)", buf_sz);
592+
LOGDBG("successful bulk push (%zu bytes)", buf_sz);
593593

594594
/* deregister our bulk transfer buffer */
595595
margo_bulk_free(bulk_local);
596596
} else {
597-
LOGERR("failed bulk transfer (transferred %zu of %zu bytes) - %s",
597+
LOGERR("failed bulk push (transferred %zu of %zu bytes) - %s",
598598
(buf_sz - remain), buf_sz, HG_Error_to_string(hret));
599599
return UNIFYFS_ERROR_MARGO;
600600
}

server/src/unifyfs_group_rpc.c

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,40 +1267,48 @@ int unifyfs_invoke_broadcast_extents_cache(int gfid)
12671267

12681268
/* create bulk data structure containing the extents
12691269
* NOTE: bulk data is always read only at the root of the broadcast tree */
1270+
hg_id_t op_hgid = unifyfsd_rpc_context->rpcs.extent_cache_bcast_id;
12701271
hg_size_t buf_size = n_extents * sizeof(*extents);
12711272
hg_bulk_t extents_bulk;
1272-
void* buf = (void*) extents;
1273-
hg_return_t hret = margo_bulk_create(unifyfsd_rpc_context->svr_mid, 1,
1274-
&buf, &buf_size,
1275-
HG_BULK_READ_ONLY, &extents_bulk);
1276-
if (hret != HG_SUCCESS) {
1277-
LOGERR("margo_bulk_create() failed - %s", HG_Error_to_string(hret));
1278-
ret = UNIFYFS_ERROR_MARGO;
1279-
} else {
1280-
coll_request* coll = NULL;
1281-
extent_cache_bcast_in_t* in = calloc(1, sizeof(*in));
1282-
if (NULL == in) {
1283-
ret = ENOMEM;
1273+
//void* buf = (void*) extents;
1274+
// MJB TESTING: make a copy to avoid reuse of cache as bulk across
1275+
// concurrent bcasts
1276+
void* buf = malloc((size_t)buf_size);
1277+
if (NULL != buf) {
1278+
memcpy(buf, (void*)extents, (size_t)buf_size);
1279+
hg_return_t hret = margo_bulk_create(unifyfsd_rpc_context->svr_mid, 1,
1280+
&buf, &buf_size,
1281+
HG_BULK_READ_ONLY, &extents_bulk);
1282+
if (hret != HG_SUCCESS) {
1283+
LOGERR("margo_bulk_create() failed - %s",
1284+
HG_Error_to_string(hret));
1285+
ret = UNIFYFS_ERROR_MARGO;
1286+
free(buf);
12841287
} else {
1285-
/* set input params */
1286-
in->root = (int32_t) glb_pmi_rank;
1287-
in->gfid = (int32_t) gfid;
1288-
in->extents = extents_bulk;
1289-
in->num_extents = (int32_t) n_extents;
1290-
in->timestamp = ts;
1291-
1292-
hg_id_t op_hgid = unifyfsd_rpc_context->rpcs.extent_cache_bcast_id;
1293-
server_rpc_e rpc = UNIFYFS_SERVER_BCAST_RPC_EXTENTS_CACHE;
1294-
coll = collective_create(rpc, HG_HANDLE_NULL, op_hgid,
1295-
glb_pmi_rank, (void*)in,
1296-
NULL, sizeof(extent_cache_bcast_out_t),
1297-
HG_BULK_NULL, extents_bulk, NULL);
1298-
if (NULL == coll) {
1288+
coll_request* coll = NULL;
1289+
extent_cache_bcast_in_t* in = calloc(1, sizeof(*in));
1290+
if (NULL == in) {
12991291
ret = ENOMEM;
13001292
} else {
1301-
ret = collective_forward(coll);
1302-
if (ret == UNIFYFS_SUCCESS) {
1303-
ret = invoke_bcast_progress_rpc(coll);
1293+
/* set input params */
1294+
in->root = (int32_t) glb_pmi_rank;
1295+
in->gfid = (int32_t) gfid;
1296+
in->extents = extents_bulk;
1297+
in->num_extents = (int32_t) n_extents;
1298+
in->timestamp = ts;
1299+
1300+
server_rpc_e rpc = UNIFYFS_SERVER_BCAST_RPC_EXTENTS_CACHE;
1301+
coll = collective_create(rpc, HG_HANDLE_NULL, op_hgid,
1302+
glb_pmi_rank, (void*)in, NULL,
1303+
sizeof(extent_cache_bcast_out_t),
1304+
HG_BULK_NULL, extents_bulk, buf);
1305+
if (NULL == coll) {
1306+
ret = ENOMEM;
1307+
} else {
1308+
ret = collective_forward(coll);
1309+
if (ret == UNIFYFS_SUCCESS) {
1310+
ret = invoke_bcast_progress_rpc(coll);
1311+
}
13041312
}
13051313
}
13061314
}

server/src/unifyfs_p2p_rpc.c

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,9 +1029,9 @@ int unifyfs_invoke_get_extents_rpc(int gfid,
10291029
} else {
10301030
/* replace local cache with received extents */
10311031
em_arr = (extent_metadata*) buf;
1032+
ret = sm_cache_extents(gfid, n_ext, em_arr, &owner_stamp);
10321033
}
10331034
}
1034-
ret = sm_cache_extents(gfid, n_ext, em_arr, &owner_stamp);
10351035
}
10361036

10371037
clear_pending_extents_get:
@@ -1108,22 +1108,29 @@ static void process_get_extents_rpc(server_rpc_req_t* sreq)
11081108
margo_hg_handle_get_instance(sreq->req_state->handle);
11091109
assert(mid != MARGO_INSTANCE_NULL);
11101110

1111-
void* buf = (void*) extents;
1112-
size_t buf_sz = num_extents * sizeof(extent_metadata);
1113-
hg_bulk_t bulk_handle = HG_BULK_NULL;
1114-
hg_return_t hret = margo_bulk_create(mid, 1, &buf, &buf_sz,
1115-
HG_BULK_READ_ONLY,
1116-
&bulk_handle);
1117-
if (hret != HG_SUCCESS) {
1118-
LOGERR("margo_bulk_create() failed - %s",
1119-
HG_Error_to_string(hret));
1120-
ret = UNIFYFS_ERROR_MARGO;
1121-
} else {
1122-
/* set request output bulk for auto-free at cleanup */
1123-
sreq->req_state->bulk = bulk_handle;
1124-
bulk_resp_handle = bulk_handle;
1125-
LOGDBG("returning %zu extents for gfid=%d to rank=%d",
1126-
num_extents, gfid, sender);
1111+
hg_size_t buf_sz = num_extents * sizeof(extent_metadata);
1112+
//void* buf = (void*) extents;
1113+
// MJB TESTING: make a copy to avoid reuse of cache as bulk
1114+
// across concurrent bcasts
1115+
void* buf = malloc((size_t)buf_sz);
1116+
if (NULL != buf) {
1117+
memcpy(buf, (void*)extents, (size_t)buf_sz);
1118+
hg_bulk_t bulk_handle = HG_BULK_NULL;
1119+
hg_return_t hret = margo_bulk_create(mid, 1, &buf, &buf_sz,
1120+
HG_BULK_READ_ONLY,
1121+
&bulk_handle);
1122+
if (hret != HG_SUCCESS) {
1123+
LOGERR("margo_bulk_create() failed - %s",
1124+
HG_Error_to_string(hret));
1125+
ret = UNIFYFS_ERROR_MARGO;
1126+
free(buf);
1127+
} else {
1128+
/* set request output bulk for auto-free at cleanup */
1129+
sreq->req_state->bulk = bulk_handle;
1130+
bulk_resp_handle = bulk_handle;
1131+
LOGDBG("returning %zu extents for gfid=%d to rank=%d",
1132+
num_extents, gfid, sender);
1133+
}
11271134
}
11281135
}
11291136
}

0 commit comments

Comments
 (0)