Skip to content

Commit 4141575

Browse files
committed
reimplement client node-local support
1 parent c75da91 commit 4141575

File tree

7 files changed

+148
-34
lines changed

7 files changed

+148
-34
lines changed

client/src/client_read.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ int process_gfid_reads(unifyfs_client* client,
701701

702702
meta->needs_reads_sync = 0;
703703

704-
/* MJB TODO - rewrite to request local extents for single gfid */
704+
/* request local extents for single gfid */
705705
size_t chunk_count = 0;
706706
unifyfs_data_chunk_t* chunks = NULL;
707707
rc = invoke_client_node_local_extents_get_rpc(client,

client/src/margo_client.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -958,8 +958,8 @@ int invoke_client_node_local_extents_get_rpc(unifyfs_client* client,
958958
LOGDBG("%s got response ret=%" PRIi32, rpc_name, out.ret);
959959
ret = (int) out.ret;
960960
if (ret == (int) UNIFYFS_SUCCESS) {
961-
*chunk_count = out.chunk_count;
962-
void* out_buffer = pull_margo_bulk(rpc->handle, out.bulk_data,
961+
*chunk_count = out.ext_count;
962+
void* out_buffer = pull_margo_bulk(rpc->handle, out.bulk_extents,
963963
out.bulk_size, NULL);
964964
*chunks = (unifyfs_data_chunk_t*) out_buffer;
965965
}

common/src/unifyfs_client_rpcs.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,9 @@ MERCURY_GEN_PROC(unifyfs_node_local_extents_get_in_t,
332332
((int32_t)(gfid)))
333333
MERCURY_GEN_PROC(unifyfs_node_local_extents_get_out_t,
334334
((int32_t)(ret))
335-
((hg_size_t)(chunk_count))
336-
((hg_bulk_t)(bulk_data))
337-
((hg_size_t)(bulk_size)))
335+
((hg_size_t)(ext_count))
336+
((hg_size_t)(bulk_size))
337+
((hg_bulk_t)(bulk_extents)))
338338
DECLARE_MARGO_RPC_HANDLER(unifyfs_node_local_extents_get_rpc)
339339

340340
/* unifyfs_get_gfids_rpc (client => server)

server/src/unifyfs_client_rpc.c

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,6 @@ void process_client_gfids_rpc(client_rpc_req_t* creq)
748748
// files, but only use the gfids. The client must then issue
749749
// a separate metaget request for each gfid.
750750
unifyfs_file_attr_t* file_attrs = NULL;
751-
hg_bulk_t bulk_gfids = HG_BULK_NULL;
752751
int* new_gfid_list = NULL;
753752
int num_file_attrs = 0;
754753
ret = unifyfs_invoke_broadcast_metaget_all(&file_attrs,
@@ -759,24 +758,26 @@ void process_client_gfids_rpc(client_rpc_req_t* creq)
759758
// Package all the gfids up into one list
760759
new_gfid_list = (int*) calloc(num_file_attrs, sizeof(int));
761760
if (NULL != new_gfid_list) {
762-
/* initialize bulk handle for the gfid_list */
761+
for (int i=0; i < num_file_attrs; i++) {
762+
new_gfid_list[i] = file_attrs[i].gfid;
763+
}
763764

764-
hg_size_t sizes[1] = { num_file_attrs * sizeof(int) };
765-
void* ptrs[1] = { (void*)new_gfid_list };
765+
/* initialize bulk for the gfid_list */
766+
hg_bulk_t bulk_gfids = HG_BULK_NULL;
767+
hg_size_t bulk_sz = (hg_size_t) num_file_attrs * sizeof(int);
766768
hret = margo_bulk_create(unifyfsd_rpc_context->shm_mid,
767-
1, ptrs, sizes,
769+
1, (void**)&new_gfid_list, &bulk_sz,
768770
HG_BULK_READ_ONLY, &bulk_gfids);
769771
if (hret != HG_SUCCESS) {
770772
LOGDBG("margo_bulk_create() failed - %s",
771773
HG_Error_to_string(hret));
772774
free(new_gfid_list);
773775
ret = UNIFYFS_ERROR_MARGO;
774776
} else {
775-
for (int i=0; i < num_file_attrs; i++) {
776-
new_gfid_list[i] = file_attrs[i].gfid;
777-
}
778777
out->bulk_gfids = bulk_gfids;
779-
creq->req_state->bulk = bulk_gfids; // free on rpc cleanup
778+
779+
/* request bulk free on rpc cleanup */
780+
creq->req_state->bulk = bulk_gfids;
780781
}
781782
} else {
782783
ret = ENOMEM;
@@ -806,9 +807,38 @@ void process_client_node_local_extents_rpc(client_rpc_req_t* creq)
806807
unifyfs_node_local_extents_get_out_t* out = creq->req_state->outputs;
807808
assert((in != NULL) && (out != NULL));
808809

809-
/* MJB TODO - rewrite to take a single gfid and return the local extents
810-
* if the file is laminated and has not been reverse-synced */
811-
ret = UNIFYFS_ERROR_NYI;
810+
unifyfs_fops_ctx_t ctx = {
811+
.client_req = creq,
812+
.app_id = in->app_id,
813+
.client_id = in->client_id,
814+
};
815+
816+
int gfid = (int) in->gfid;
817+
size_t n_extents = 0;
818+
unifyfs_data_chunk_t* extents = NULL;
819+
ret = unifyfs_fops_local_extents(&ctx, gfid, &n_extents, &extents);
820+
if (ret != UNIFYFS_SUCCESS) {
821+
LOGERR("unifyfs_fops_local_extents() failed");
822+
} else {
823+
out->ext_count = (hg_size_t) n_extents;
824+
825+
/* register user buffer for bulk access */
826+
hg_bulk_t bulk_local;
827+
hg_size_t bulk_sz = (hg_size_t) n_extents * sizeof(unifyfs_data_chunk_t);
828+
hg_return_t hret = margo_bulk_create(unifyfsd_rpc_context->shm_mid,
829+
1, (void**)&extents, &bulk_sz,
830+
HG_BULK_READ_ONLY, &bulk_local);
831+
if (hret != HG_SUCCESS) {
832+
LOGERR("margo_bulk_create() failed");
833+
ret = UNIFYFS_ERROR_MARGO;
834+
} else {
835+
out->bulk_extents = bulk_local;
836+
out->bulk_size = bulk_sz;
837+
838+
/* request bulk free on rpc cleanup */
839+
creq->req_state->bulk = bulk_local;
840+
}
841+
}
812842

813843
out->ret = (int32_t) ret;
814844

@@ -1163,7 +1193,7 @@ static void unifyfs_node_local_extents_get_rpc(hg_handle_t handle)
11631193
if (NULL == creq) {
11641194
unifyfs_node_local_extents_get_out_t out;
11651195
out.ret = (int32_t) ENOMEM;
1166-
out.chunk_count = 0;
1196+
out.ext_count = 0;
11671197
hg_return_t hret = margo_respond(handle, &out);
11681198
if (hret != HG_SUCCESS) {
11691199
LOGERR("margo_respond() failed - %s", HG_Error_to_string(hret));

server/src/unifyfs_fops.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ typedef int (*unifyfs_fops_read_t)(unifyfs_fops_ctx_t* ctx,
5858
size_t* coverage_begin_offset,
5959
size_t* coverage_end_offset);
6060

61-
6261
typedef int (*unifyfs_fops_transfer_t)(unifyfs_fops_ctx_t* ctx,
6362
int transfer_id,
6463
int gfid,
@@ -72,13 +71,19 @@ typedef int (*unifyfs_fops_unlink_t)(unifyfs_fops_ctx_t* ctx, int gfid);
7271

7372
typedef int (*unifyfs_fops_get_gfids_t)(int** gfid_list, int* num_gfids);
7473

74+
typedef int (*unifyfs_fops_local_extents_t)(unifyfs_fops_ctx_t* ctx,
75+
int gfid,
76+
size_t* ext_count,
77+
unifyfs_data_chunk_t** extents);
78+
7579
struct unifyfs_fops {
7680
const char* name;
7781
unifyfs_fops_init_t init;
7882
unifyfs_fops_filesize_t filesize;
7983
unifyfs_fops_fsync_t fsync;
8084
unifyfs_fops_get_gfids_t get_gfids;
8185
unifyfs_fops_laminate_t laminate;
86+
unifyfs_fops_local_extents_t local_extents;
8287
unifyfs_fops_metaget_t metaget;
8388
unifyfs_fops_metaset_t metaset;
8489
unifyfs_fops_mread_t mread;
@@ -145,6 +150,18 @@ static inline int unifyfs_fops_laminate(unifyfs_fops_ctx_t* ctx, int gfid)
145150
return global_fops_tab->laminate(ctx, gfid);
146151
}
147152

153+
static inline int unifyfs_fops_local_extents(unifyfs_fops_ctx_t* ctx,
154+
int gfid,
155+
size_t* ext_count,
156+
unifyfs_data_chunk_t** extents)
157+
{
158+
if (!global_fops_tab->local_extents) {
159+
return ENOSYS;
160+
}
161+
162+
return global_fops_tab->local_extents(ctx, gfid, ext_count, extents);
163+
}
164+
148165
static inline int unifyfs_fops_metaget(unifyfs_fops_ctx_t* ctx,
149166
int gfid, unifyfs_file_attr_t* attr)
150167
{

server/src/unifyfs_fops_rpc.c

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -490,20 +490,86 @@ int rpc_get_gfids(
490490
return unifyfs_get_gfids(num_gfids, gfid_list);
491491
}
492492

493+
static
494+
int rpc_local_extents(unifyfs_fops_ctx_t* ctx,
495+
int gfid,
496+
size_t* ext_count,
497+
unifyfs_data_chunk_t** extents)
498+
{
499+
int ret;
500+
501+
assert((NULL != ext_count) && (NULL != extents));
502+
503+
*ext_count = 0;
504+
*extents = NULL;
505+
506+
/* define extent covering whole file */
507+
unifyfs_file_attr_t attrs;
508+
memset(&attrs, 0, sizeof(attrs));
509+
int rc = sm_get_fileattr(gfid, &attrs);
510+
if (rc == UNIFYFS_SUCCESS) {
511+
unifyfs_extent_t whole_file = {
512+
.gfid = gfid,
513+
.length = attrs.size,
514+
.offset = 0
515+
};
516+
517+
/* get all file extents */
518+
unsigned int n_chunks = 0;
519+
unifyfs_data_chunk_t* chunks = NULL;
520+
int coverage;
521+
rc = unifyfs_inode_get_extent_chunks(&whole_file,
522+
&n_chunks, &chunks,
523+
&coverage);
524+
if ((UNIFYFS_SUCCESS == rc) && (n_chunks > 0)) {
525+
/* find local chunks */
526+
unsigned int n_local = 0;
527+
unifyfs_data_chunk_t* local_chunks = (unifyfs_data_chunk_t*)
528+
calloc(n_chunks, sizeof(unifyfs_data_chunk_t));
529+
if (NULL == local_chunks) {
530+
ret = ENOMEM;
531+
} else {
532+
for (unsigned int i=0; i < n_chunks; i++) {
533+
unifyfs_data_chunk_t* chk = chunks + i;
534+
unifyfs_data_chunk_t* lchk = local_chunks + n_local;
535+
if (chk->log_server == glb_pmi_rank) {
536+
*lchk = *chk;
537+
n_local++;
538+
}
539+
}
540+
ret = UNIFYFS_SUCCESS;
541+
}
542+
*ext_count = (size_t) n_local;
543+
*extents = local_chunks;
544+
545+
/* release chunks array */
546+
free(chunks);
547+
} else {
548+
ret = rc;
549+
}
550+
} else {
551+
/* gfid not found, so we have no local extents */
552+
ret = UNIFYFS_SUCCESS;
553+
}
554+
555+
return ret;
556+
}
557+
493558
static struct unifyfs_fops _fops_rpc = {
494-
.name = "rpc",
495-
.init = rpc_init,
496-
.filesize = rpc_filesize,
497-
.fsync = rpc_fsync,
498-
.get_gfids = rpc_get_gfids,
499-
.laminate = rpc_laminate,
500-
.metaget = rpc_metaget,
501-
.metaset = rpc_metaset,
502-
.mread = rpc_mread,
503-
.read = rpc_read,
504-
.transfer = rpc_transfer,
505-
.truncate = rpc_truncate,
506-
.unlink = rpc_unlink
559+
.name = "rpc",
560+
.init = rpc_init,
561+
.filesize = rpc_filesize,
562+
.fsync = rpc_fsync,
563+
.get_gfids = rpc_get_gfids,
564+
.laminate = rpc_laminate,
565+
.local_extents = rpc_local_extents,
566+
.metaget = rpc_metaget,
567+
.metaset = rpc_metaset,
568+
.mread = rpc_mread,
569+
.read = rpc_read,
570+
.transfer = rpc_transfer,
571+
.truncate = rpc_truncate,
572+
.unlink = rpc_unlink
507573
};
508574

509575
struct unifyfs_fops* unifyfs_fops_impl = &_fops_rpc;

server/src/unifyfs_inode.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,8 @@ int get_extent_cache_chunks(unifyfs_extent_t* extent,
777777
/* found begin and end extents, convert to chunks */
778778
int gap_found = 0;
779779
int n_chk = 1 + (em_end - em_begin);
780-
unifyfs_data_chunk_t* chks = calloc(n_chk, sizeof(*chks));
780+
unifyfs_data_chunk_t* chks = (unifyfs_data_chunk_t*)
781+
calloc(n_chk, sizeof(unifyfs_data_chunk_t));
781782
if (NULL != chks) {
782783
extent_metadata* em_prev;
783784
extent_metadata* em_iter = em_begin;

0 commit comments

Comments
 (0)