@@ -419,17 +419,41 @@ void service_local_reqs(
419419 /* copy data from local write log into user buffer */
420420 off_t log_offset = ext_log_pos + ext_byte_offset ;
421421 size_t nread = 0 ;
422- int rc = unifyfs_logio_read (client -> state .logio_ctx , log_offset ,
423- cover_length , req_ptr , & nread );
424- if (rc == UNIFYFS_SUCCESS ) {
425- /* update bytes we have filled in the request buffer */
426- update_read_req_coverage (req , req_byte_offset , nread );
422+ /* we need to use the logio_ctx from correct client */
423+ logio_context * logio_ctx = NULL ;
424+ if (next -> client_id == client -> state .client_id ) {
425+ logio_ctx = client -> state .logio_ctx ;
426+ } else if (client -> logio_ctx_ptrs [next -> client_id ] != NULL ) {
427+ logio_ctx = client -> logio_ctx_ptrs [next -> client_id ];
427428 } else {
428- LOGERR ("local log read failed for offset=%zu size=%zu" ,
429- (size_t )log_offset , cover_length );
430- req -> errcode = rc ;
429+ size_t shmem_size = 0 ;
430+ if (client -> state .logio_ctx -> shmem != NULL ) {
431+ shmem_size = client -> state .logio_ctx -> shmem -> size ;
432+ }
433+ char * spill_dir = NULL ;
434+ if (client -> state .logio_ctx -> spill_sz > 0 ) {
435+ spill_dir = client -> cfg .logio_spill_dir ;
436+ }
437+ unifyfs_logio_init (client -> state .app_id ,
438+ next -> client_id ,
439+ shmem_size ,
440+ client -> state .logio_ctx -> spill_sz ,
441+ spill_dir ,
442+ & client -> logio_ctx_ptrs [next -> client_id ]);
443+ logio_ctx = client -> logio_ctx_ptrs [next -> client_id ];
444+ }
445+ if (NULL != logio_ctx ) {
446+ int rc = unifyfs_logio_read (logio_ctx , log_offset ,
447+ cover_length , req_ptr , & nread );
448+ if (rc == UNIFYFS_SUCCESS ) {
449+ /* update bytes we have filled in the request buffer */
450+ update_read_req_coverage (req , req_byte_offset , nread );
451+ } else {
452+ LOGERR ("local log read failed for offset=%zu size=%zu" ,
453+ (size_t ) log_offset , cover_length );
454+ req -> errcode = rc ;
455+ }
431456 }
432-
433457 /* get the next element in the tree */
434458 next = seg_tree_iter (extents , next );
435459 }
@@ -560,7 +584,7 @@ static void update_read_req_result(unifyfs_client* client,
560584 */
561585int process_gfid_reads (unifyfs_client * client ,
562586 read_req_t * in_reqs ,
563- int in_count )
587+ size_t in_count )
564588{
565589 if (0 == in_count ) {
566590 return UNIFYFS_SUCCESS ;
@@ -597,9 +621,76 @@ int process_gfid_reads(unifyfs_client* client,
597621 /* this records the pointer to the temp request array if
598622 * we allocate one, we should free this later if not NULL */
599623 read_req_t * reqs = NULL ;
624+ if (client -> use_node_local_extents ) {
625+ extents_list_t * list = calloc (1 , sizeof (struct extents_list ));
626+ struct extents_list * cur = list ;
627+ int num_request_selected = 0 ;
628+ for (int i = 0 ; i < in_count ; ++ i ) {
629+ int fid = unifyfs_fid_from_gfid (client , in_reqs [i ].gfid );
630+ /* get meta for this file id */
631+ unifyfs_filemeta_t * meta = unifyfs_get_meta_from_fid (client , fid );
632+ if (meta != NULL ) {
633+ if (!meta -> attrs .is_laminated || !meta -> needs_reads_sync ) {
634+ /* do not proceed for this request as
635+ * it is not a laminated file or has already been synced.*/
636+ continue ;
637+ }
638+ num_request_selected ++ ;
639+ off_t filesize_offt = unifyfs_gfid_filesize (client ,
640+ in_reqs [i ].gfid );
641+ cur -> value .file_pos = 0 ;
642+ cur -> value .length = filesize_offt - 1 ;
643+ cur -> value .gfid = in_reqs [i ].gfid ;
644+ if (i < in_count - 1 ) {
645+ cur -> next = calloc (1 , sizeof (struct extents_list ));
646+ cur -> next -> next = NULL ;
647+ cur = cur -> next ;
648+ } else {
649+ cur -> next = NULL ;
650+ }
651+ meta -> needs_reads_sync = 0 ;
652+ }
653+ }
654+ if (num_request_selected > 0 ) {
655+ /* There are files which are laminated and
656+ * require sync of extents */
657+ size_t extent_count = 0 ;
658+ unifyfs_client_index_t * extents = NULL ;
659+ int rc =
660+ invoke_client_node_local_extents_get_rpc (client ,
661+ num_request_selected ,
662+ list ,
663+ & extent_count ,
664+ & extents );
665+ if (rc == UNIFYFS_SUCCESS && extent_count != 0 ) {
666+ for (int j = 0 ; j < extent_count ; ++ j ) {
667+ if (extents [j ].log_app_id ==
668+ client -> state .app_id ) {
669+ int fid = unifyfs_fid_from_gfid (client ,
670+ extents [j ].gfid );
671+ /* get meta for this file id */
672+ unifyfs_filemeta_t * meta = unifyfs_get_meta_from_fid (
673+ client ,
674+ fid );
675+ if (meta != NULL ) {
676+ seg_tree_add (& meta -> extents ,
677+ extents [j ].file_pos ,
678+ extents [j ].file_pos +
679+ extents [j ].length - 1 ,
680+ extents [j ].log_pos ,
681+ extents [j ].log_client_id );
682+ }
683+ }
684+ }
685+ }
686+ if (extents != NULL ) {
687+ free (extents );
688+ }
689+ }
690+ }
600691
601692 /* attempt to complete requests locally if enabled */
602- if (client -> use_local_extents ) {
693+ if (client -> use_local_extents || client -> use_node_local_extents ) {
603694 /* allocate space to make local and server copies of the requests,
604695 * each list will be at most in_count long */
605696 size_t reqs_size = 2 * in_count ;
@@ -743,7 +834,7 @@ int process_gfid_reads(unifyfs_client* client,
743834 /* if we attempted to service requests from our local extent map,
744835 * then we need to copy the resulting read requests from the local
745836 * and server arrays back into the user's original array */
746- if (client -> use_local_extents ) {
837+ if (client -> use_local_extents || client -> use_node_local_extents ) {
747838 /* TODO: would be nice to copy these back into the same order
748839 * in which we received them. */
749840
0 commit comments