Skip to content

Commit a7ecb43

Browse files
committed
testing whether cache copy is needed
1 parent 4c0a81b commit a7ecb43

File tree

2 files changed

+42
-50
lines changed

2 files changed

+42
-50
lines changed

server/src/unifyfs_group_rpc.c

Lines changed: 36 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,57 +1271,49 @@ int unifyfs_invoke_broadcast_extents_cache(int gfid)
12711271
hg_size_t buf_size = n_extents * sizeof(*extents);
12721272
hg_bulk_t extents_bulk;
12731273
void* buf = (void*) extents;
1274-
// MJB TESTING: make a copy to avoid reuse of cache as bulk across
1275-
// concurrent bcasts
1276-
//void* buf = malloc((size_t)buf_size);
1277-
//if (NULL != buf) {
1278-
//memcpy(buf, (void*)extents, (size_t)buf_size);
1279-
hg_return_t hret = margo_bulk_create(unifyfsd_rpc_context->svr_mid, 1,
1280-
&buf, &buf_size,
1281-
HG_BULK_READ_ONLY, &extents_bulk);
1282-
if (hret != HG_SUCCESS) {
1283-
LOGERR("margo_bulk_create() failed - %s",
1284-
HG_Error_to_string(hret));
1285-
ret = UNIFYFS_ERROR_MARGO;
1286-
//free(buf);
1274+
hg_return_t hret = margo_bulk_create(unifyfsd_rpc_context->svr_mid, 1,
1275+
&buf, &buf_size,
1276+
HG_BULK_READ_ONLY, &extents_bulk);
1277+
if (hret != HG_SUCCESS) {
1278+
LOGERR("margo_bulk_create() failed - %s",
1279+
HG_Error_to_string(hret));
1280+
ret = UNIFYFS_ERROR_MARGO;
1281+
} else {
1282+
coll_request* coll = NULL;
1283+
extent_cache_bcast_in_t* in = calloc(1, sizeof(*in));
1284+
if (NULL == in) {
1285+
ret = ENOMEM;
12871286
} else {
1288-
coll_request* coll = NULL;
1289-
extent_cache_bcast_in_t* in = calloc(1, sizeof(*in));
1290-
if (NULL == in) {
1287+
/* set input params */
1288+
in->root = (int32_t) glb_pmi_rank;
1289+
in->gfid = (int32_t) gfid;
1290+
in->extents = extents_bulk;
1291+
in->num_extents = (int32_t) n_extents;
1292+
in->timestamp = ts;
1293+
1294+
server_rpc_e rpc = UNIFYFS_SERVER_BCAST_RPC_EXTENTS_CACHE;
1295+
coll = collective_create(rpc, HG_HANDLE_NULL, op_hgid,
1296+
glb_pmi_rank, (void*)in, NULL,
1297+
sizeof(extent_cache_bcast_out_t),
1298+
HG_BULK_NULL, extents_bulk, NULL);
1299+
if (NULL == coll) {
12911300
ret = ENOMEM;
12921301
} else {
1293-
/* set input params */
1294-
in->root = (int32_t) glb_pmi_rank;
1295-
in->gfid = (int32_t) gfid;
1296-
in->extents = extents_bulk;
1297-
in->num_extents = (int32_t) n_extents;
1298-
in->timestamp = ts;
1299-
1300-
server_rpc_e rpc = UNIFYFS_SERVER_BCAST_RPC_EXTENTS_CACHE;
1301-
coll = collective_create(rpc, HG_HANDLE_NULL, op_hgid,
1302-
glb_pmi_rank, (void*)in, NULL,
1303-
sizeof(extent_cache_bcast_out_t),
1304-
HG_BULK_NULL, extents_bulk, NULL//buf
1305-
);
1306-
if (NULL == coll) {
1307-
ret = ENOMEM;
1308-
} else {
1309-
/* start the broadcast */
1310-
ret = collective_forward(coll);
1311-
if (ret == UNIFYFS_SUCCESS) {
1312-
/* progress/finish the bcast operation */
1313-
LOGDBG("BCAST_RPC: bcast progress collective(%p)",
1314-
coll);
1315-
ret = collective_finish(coll);
1316-
if (ret != UNIFYFS_SUCCESS) {
1317-
LOGERR("finish failed for coll(%p) (rc=%d)",
1318-
coll, ret);
1319-
}
1302+
/* start the broadcast */
1303+
ret = collective_forward(coll);
1304+
if (ret == UNIFYFS_SUCCESS) {
1305+
/* progress/finish the bcast operation */
1306+
LOGDBG("BCAST_RPC: bcast progress collective(%p)",
1307+
coll);
1308+
ret = collective_finish(coll);
1309+
if (ret != UNIFYFS_SUCCESS) {
1310+
LOGERR("finish failed for coll(%p) (rc=%d)",
1311+
coll, ret);
13201312
}
13211313
}
13221314
}
13231315
}
1324-
//}
1316+
}
13251317

13261318
return ret;
13271319
}

server/src/unifyfs_p2p_rpc.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,12 +1119,12 @@ static void process_get_extents_rpc(server_rpc_req_t* sreq)
11191119
assert(mid != MARGO_INSTANCE_NULL);
11201120

11211121
hg_size_t buf_sz = num_extents * sizeof(extent_metadata);
1122-
//void* buf = (void*) extents;
1122+
void* buf = (void*) extents;
11231123
// MJB TESTING: make a copy to avoid reuse of cache as bulk
11241124
// across concurrent bcasts
1125-
void* buf = malloc((size_t)buf_sz);
1126-
if (NULL != buf) {
1127-
memcpy(buf, (void*)extents, (size_t)buf_sz);
1125+
//void* buf = malloc((size_t)buf_sz);
1126+
//if (NULL != buf) {
1127+
//memcpy(buf, (void*)extents, (size_t)buf_sz);
11281128
hg_bulk_t bulk_handle = HG_BULK_NULL;
11291129
hg_return_t hret = margo_bulk_create(mid, 1, &buf, &buf_sz,
11301130
HG_BULK_READ_ONLY,
@@ -1133,7 +1133,7 @@ static void process_get_extents_rpc(server_rpc_req_t* sreq)
11331133
LOGERR("margo_bulk_create() failed - %s",
11341134
HG_Error_to_string(hret));
11351135
ret = UNIFYFS_ERROR_MARGO;
1136-
free(buf);
1136+
//free(buf);
11371137
} else {
11381138
/* set request output bulk for auto-free at cleanup */
11391139
sreq->req_state->bulk = bulk_handle;
@@ -1142,7 +1142,7 @@ static void process_get_extents_rpc(server_rpc_req_t* sreq)
11421142
LOGDBG("returning %zu extents for gfid=%d to rank=%d",
11431143
num_extents, gfid, sender);
11441144
}
1145-
}
1145+
//}
11461146
}
11471147
}
11481148
} else {

0 commit comments

Comments
 (0)