@@ -446,9 +446,9 @@ struct data : public primitive_base<data> {
446446 }
447447
448448 used_fast_io = ov::util::read_binary_file_parallel (ov::util::make_path (weights_path),
449- mem->buffer_ptr (),
450- data_size,
451- (size_t )cur_offset + offset_compensation);
449+ mem->buffer_ptr (),
450+ data_size,
451+ (size_t )cur_offset + offset_compensation);
452452 if (used_fast_io) {
453453 ib.get_stream ().seekg (data_size, std::ios::cur);
454454 }
@@ -465,146 +465,81 @@ struct data : public primitive_base<data> {
465465 ib >> make_data (_buf.data (), data_size);
466466 mem->copy_from (strm, _buf.data ());
467467 } else {
468- bool used_fast_io = false ;
468+ // Pre-calculate file offset if weights_path is available for fast parallel IO
469+ bool can_use_fast_io = !weights_path.empty () && data_size >= FAST_IO_THRESHOLD;
470+ size_t file_base_offset = 0 ;
471+ auto file_path = ov::util::make_path (weights_path);
469472
470- // Try parallel file read + pipelined async GPU copy for non-host-accessible memory
471- if (!weights_path.empty () && data_size >= FAST_IO_THRESHOLD) {
473+ if (can_use_fast_io) {
472474 auto cur_offset = ib.get_stream ().tellg ();
473-
474- // Auto-detect header offset compensation (same as host-accessible path)
475475 size_t offset_compensation = 0 ;
476476 auto restore_pos = ib.get_stream ().tellg ();
477477 ib.get_stream ().seekg (0 , std::ios::end);
478478 auto stream_end = (size_t )ib.get_stream ().tellg ();
479479 ib.get_stream ().seekg (restore_pos, std::ios::beg);
480480
481- int64_t phys_size = ov::util::file_size (ov::util::make_path (weights_path) );
481+ int64_t phys_size = ov::util::file_size (file_path );
482482 size_t physical_size = (phys_size >= 0 ) ? static_cast <size_t >(phys_size) : 0 ;
483+
483484 if (physical_size > stream_end) {
484485 offset_compensation = physical_size - stream_end;
485486 }
487+ file_base_offset = (size_t )cur_offset + offset_compensation;
488+ }
486489
487- size_t file_base_offset = (size_t )cur_offset + offset_compensation;
488- auto file_path = ov::util::make_path (weights_path);
489-
490- // Pipelined: parallel file read into double-buffered CPU staging + async GPU copy
491- // This overlaps reading chunk N+1 from disk with GPU DMA of chunk N
492- const size_t PIPE_CHUNK_SIZE = 32 * 1024 * 1024 ; // 32MB per pipeline stage
493- size_t actual_chunk = std::min (PIPE_CHUNK_SIZE, data_size);
494- std::vector<uint8_t > staging_a (actual_chunk);
495- std::vector<uint8_t > staging_b (actual_chunk);
496-
497- size_t num_blocks = (data_size + PIPE_CHUNK_SIZE - 1 ) / PIPE_CHUNK_SIZE;
498- size_t first_chunk = std::min (PIPE_CHUNK_SIZE, data_size);
499-
500- // Use ITaskExecutor to dispatch file read tasks instead of std::async
501- auto io_executor_cfg = ov::threading::IStreamsExecutor::Config{" GPUWeightLoadIO" , 1 };
502- auto io_executor = ov::threading::executor_manager ()->get_idle_cpu_streams_executor (io_executor_cfg);
503-
504- // Helper: submit a file read task via executor, return a future<bool>
505- auto submit_read = [&io_executor](const std::filesystem::path& path, uint8_t * buf,
506- size_t size, size_t offset) {
507- auto promise = std::make_shared<std::promise<bool >>();
508- auto future = promise->get_future ();
509- io_executor->run ([promise, path, buf, size, offset]() {
510- try {
511- promise->set_value (
512- ov::util::read_binary_file_parallel (path, buf, size, offset));
513- } catch (...) {
514- promise->set_exception (std::current_exception ());
515- }
516- });
517- return future;
518- };
519-
520- // Kick off first parallel file read
521- auto read_future = submit_read (file_path, staging_a.data (),
522- first_chunk, file_base_offset);
523-
524- event::ptr gpu_ev = nullptr ;
525- size_t dst_offset = 0 ;
526- bool cur_is_a = true ;
527- used_fast_io = true ;
528-
529- for (size_t block = 0 ; block < num_blocks && used_fast_io; block++) {
530- size_t chunk_size = std::min (PIPE_CHUNK_SIZE, data_size - dst_offset);
531- uint8_t * cur_buf = cur_is_a ? staging_a.data () : staging_b.data ();
532-
533- // Wait for current chunk's file read to complete
534- if (!read_future.get ()) {
535- used_fast_io = false ;
536- break ;
490+ // Double-buffered sequential stream read + async GPU copy.
491+ // Uses 4MB chunk size to perfectly fit in CPU L3 cache and maintain fine-grained pipeline overlap.
492+ auto block_layout = layout (ov::PartialShape{static_cast <int64_t >(DATA_BLOCK_SIZE)}, data_types::u8 , format::bfyx);
493+ auto buf1_mem = ib.get_engine ().allocate_memory (block_layout, allocation_type::usm_host, false );
494+ auto buf2_mem = ib.get_engine ().allocate_memory (block_layout, allocation_type::usm_host, false );
495+ uint8_t * _buf1 = reinterpret_cast <uint8_t *>(buf1_mem->buffer_ptr ());
496+ uint8_t * _buf2 = reinterpret_cast <uint8_t *>(buf2_mem->buffer_ptr ());
497+ bool buf_flag = true ;
498+ event::ptr ev1, ev2;
499+ ev1 = ev2 = nullptr ;
500+ size_t dst_offset = 0 ;
501+
502+ while (dst_offset < data_size) {
503+ const bool is_blocking = false ;
504+ const size_t src_offset = 0 ;
505+ size_t copy_size = (data_size > (dst_offset + DATA_BLOCK_SIZE)) ? DATA_BLOCK_SIZE : (data_size - dst_offset);
506+
507+ if (buf_flag) {
508+ if (can_use_fast_io) {
509+ ov::util::read_binary_file_parallel (file_path, _buf1, copy_size, file_base_offset + dst_offset);
510+ } else {
511+ ib >> make_data (_buf1, copy_size);
537512 }
538-
539- // Start next chunk's parallel file read into the OTHER staging buffer
540- // This overlaps with the GPU copy of the current chunk below
541- if (block + 1 < num_blocks) {
542- size_t next_offset = dst_offset + chunk_size;
543- size_t next_chunk = std::min (PIPE_CHUNK_SIZE, data_size - next_offset);
544- uint8_t * next_buf = cur_is_a ? staging_b.data () : staging_a.data ();
545- size_t next_file_off = file_base_offset + next_offset;
546- read_future = submit_read (file_path, next_buf,
547- next_chunk, next_file_off);
513+ if (ev2 != nullptr ) {
514+ ev2->wait ();
515+ ev2 = nullptr ;
548516 }
549-
550- // Wait for previous GPU copy to complete before reusing its staging buffer
551- if (gpu_ev != nullptr ) {
552- gpu_ev->wait ();
553- gpu_ev = nullptr ;
517+ ev1 = mem->copy_from (strm, *buf1_mem, src_offset, dst_offset, copy_size, is_blocking);
518+ } else {
519+ if (can_use_fast_io) {
520+ ov::util::read_binary_file_parallel (file_path, _buf2, copy_size, file_base_offset + dst_offset);
521+ } else {
522+ ib >> make_data (_buf2, copy_size);
554523 }
555-
556- // Issue async GPU copy from current staging buffer
557- gpu_ev = mem->copy_from (strm, cur_buf, 0 , dst_offset, chunk_size, false );
558-
559- dst_offset += chunk_size;
560- cur_is_a = !cur_is_a;
561- }
562-
563- if (gpu_ev != nullptr ) {
564- gpu_ev->wait ();
565- }
566-
567- if (used_fast_io) {
568- ib.get_stream ().seekg (data_size, std::ios::cur);
524+ if (ev1 != nullptr ) {
525+ ev1->wait ();
526+ ev1 = nullptr ;
527+ }
528+ ev2 = mem->copy_from (strm, *buf2_mem, src_offset, dst_offset, copy_size, is_blocking);
569529 }
530+ dst_offset += DATA_BLOCK_SIZE;
531+ buf_flag = !buf_flag;
532+ }
533+ if (ev2 != nullptr ) {
534+ ev2->wait ();
535+ }
536+ if (ev1 != nullptr ) {
537+ ev1->wait ();
570538 }
571539
572- if (!used_fast_io) {
573- // Fallback: double-buffered sequential stream read + async GPU copy
574- std::vector<uint8_t > _buf1 (DATA_BLOCK_SIZE);
575- std::vector<uint8_t > _buf2 (DATA_BLOCK_SIZE);
576- bool buf_flag = true ;
577- event::ptr ev1, ev2;
578- ev1 = ev2 = nullptr ;
579- size_t dst_offset = 0 ;
580- while (dst_offset < data_size) {
581- const bool is_blocking = false ;
582- const size_t src_offset = 0 ;
583- size_t copy_size = (data_size > (dst_offset + DATA_BLOCK_SIZE)) ? DATA_BLOCK_SIZE : (data_size - dst_offset);
584- if (buf_flag) {
585- ib >> make_data (_buf1.data (), copy_size);
586- if (ev2 != nullptr ) {
587- ev2->wait ();
588- ev2 = nullptr ;
589- }
590- ev1 = mem->copy_from (strm, _buf1.data (), src_offset, dst_offset, copy_size, is_blocking);
591- } else {
592- ib >> make_data (_buf2.data (), copy_size);
593- if (ev1 != nullptr ) {
594- ev1->wait ();
595- ev1 = nullptr ;
596- }
597- ev2 = mem->copy_from (strm, _buf2.data (), src_offset, dst_offset, copy_size, is_blocking);
598- }
599- dst_offset += DATA_BLOCK_SIZE;
600- buf_flag = !buf_flag;
601- }
602- if (ev2 != nullptr ) {
603- ev2->wait ();
604- }
605- if (ev1 != nullptr ) {
606- ev1->wait ();
607- }
540+ if (can_use_fast_io) {
541+ // Advance the global stream pointer by the amount we read directly
542+ ib.get_stream ().seekg (data_size, std::ios::cur);
608543 }
609544 }
610545 }
0 commit comments