Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ if (WIN32)
target_link_libraries(${TARGET_NAME} PRIVATE Shlwapi)
endif()

find_package(Threads REQUIRED)
target_link_libraries(${TARGET_NAME} PRIVATE Threads::Threads)

target_include_directories(${TARGET_NAME} PUBLIC $<BUILD_INTERFACE:${UTIL_INCLUDE_DIR}>)

ov_add_clang_format_target(${TARGET_NAME}_clang FOR_TARGETS ${TARGET_NAME})
Expand Down
10 changes: 10 additions & 0 deletions src/common/util/include/openvino/util/file_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ std::filesystem::path get_plugin_path(const std::filesystem::path& plugin,
*/
std::vector<uint8_t> load_binary(const std::filesystem::path& path);

/**
* @brief Reads data from file into buffer with optimized method (Parallel IO)
* @param path File path
* @param buffer Destination buffer
* @param size Number of bytes to read
* @param offset Offset in file
* @return true if read successful
*/
bool read_binary_file_parallel(const std::filesystem::path& path, void* buffer, size_t size, size_t offset);

/**
* @brief save binary data to file
* @param path - binary file path to store
Expand Down
196 changes: 194 additions & 2 deletions src/common/util/src/file_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
#include "openvino/util/file_util.hpp"

#include <algorithm>
#include <atomic>
#include <climits>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <future>
#include <iostream>
#include <sstream>
#include <string_view>
#include <thread>
#include <vector>

#include "openvino/util/common_util.hpp"

Expand All @@ -18,11 +24,16 @@
# define NOMINMAX
# endif
# include <direct.h>
# include <malloc.h>
# include <shlwapi.h>
# include <windows.h>
#else
# include <dirent.h>
# include <dlfcn.h>
# include <fcntl.h>
# include <sys/stat.h>
# include <sys/types.h>
# include <unistd.h>
#endif

std::filesystem::path ov::util::get_directory(const std::filesystem::path& path) {
Expand Down Expand Up @@ -250,12 +261,193 @@ std::filesystem::path ov::util::get_plugin_path(const std::filesystem::path& plu
std::vector<uint8_t> ov::util::load_binary(const std::filesystem::path& path) {
std::vector<uint8_t> buffer;
if (auto input = std::ifstream(path, std::ios::binary); input.is_open()) {
buffer.reserve(std::filesystem::file_size(path));
input.read(reinterpret_cast<char*>(buffer.data()), buffer.capacity());
buffer.resize(std::filesystem::file_size(path));
input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
}
return buffer;
}

#ifdef _WIN32
bool ov::util::read_binary_file_parallel(const std::filesystem::path& path, void* buffer, size_t size, size_t offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this implementation to dedicated file for windows under os folder

if (path.empty())
return false;

// CreateFileW expects wchar_t*
const std::wstring& wpath = path.native();

HANDLE hFile = CreateFileW(wpath.c_str(),
Comment on lines +276 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const std::wstring& wpath = path.native();
HANDLE hFile = CreateFileW(wpath.c_str(),
HANDLE hFile = CreateFileW(path.c_str(),

GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (hFile == INVALID_HANDLE_VALUE)
return false;

// Safety check: File size
LARGE_INTEGER fileSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LARGE_INTEGER fileSize;
LARGE_INTEGER file_size;

Use snake_case for variables

if (GetFileSizeEx(hFile, &fileSize)) {
if (static_cast<unsigned long long>(fileSize.QuadPart) < offset + size) {
CloseHandle(hFile);
return false;
}
}
CloseHandle(hFile);

const size_t num_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()), size / (1024 * 1024));
if (num_threads <= 1) {
// Fallback to single threaded read if size is small or bad concurrency
HANDLE s_hFile = CreateFileW(wpath.c_str(),
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (s_hFile == INVALID_HANDLE_VALUE)
return false;

OVERLAPPED ov = {0};
ov.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((offset >> 32) & 0xFFFFFFFF);
DWORD bytesRead = 0;
// Note: ReadFile takes DWORD (32-bit) for size. If size > 4GB, this simple fallback needs loop.
// But for single threaded simple read we might just fail or loop.
// Let's implement loop for correctness.

char* current_ptr = static_cast<char*>(buffer);
size_t remaining_size = size;
size_t current_file_offset = offset;
bool success = true;

while (remaining_size > 0) {
DWORD to_read = static_cast<DWORD>(std::min(remaining_size, static_cast<size_t>(UINT_MAX - 1024)));
ov.Offset = static_cast<DWORD>(current_file_offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((current_file_offset >> 32) & 0xFFFFFFFF);

if (!ReadFile(s_hFile, current_ptr, to_read, &bytesRead, &ov) || bytesRead != to_read) {
if (GetLastError() != ERROR_IO_PENDING) {
success = false;
break;
}
}
remaining_size -= bytesRead;
current_ptr += bytesRead;
current_file_offset += bytesRead;
}
CloseHandle(s_hFile);
return success;
}

std::vector<std::future<void>> futures;
size_t chunk_size = size / num_threads;
chunk_size = (chunk_size + 4095) & ~4095;

size_t current_offset = 0;
std::atomic<bool> overall_status{true};

for (size_t i = 0; i < num_threads; i++) {
size_t read_size = (i == num_threads - 1) ? (size - current_offset) : chunk_size;
if (read_size == 0)
break;

void* ptr = static_cast<char*>(buffer) + current_offset;
size_t file_offset = offset + current_offset;

futures.emplace_back(std::async(std::launch::async, [wpath, file_offset, ptr, read_size, &overall_status] {
HANDLE t_hFile = CreateFileW(wpath.c_str(),
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL);
if (t_hFile == INVALID_HANDLE_VALUE) {
overall_status = false;
return;
}

size_t remaining_size = read_size;
char* current_ptr = static_cast<char*>(ptr);
size_t current_file_offset = file_offset;

while (remaining_size > 0 && overall_status) {
DWORD to_read = static_cast<DWORD>(std::min(remaining_size, static_cast<size_t>(UINT_MAX - 1024)));

OVERLAPPED ov = {0};
ov.Offset = static_cast<DWORD>(current_file_offset & 0xFFFFFFFF);
ov.OffsetHigh = static_cast<DWORD>((current_file_offset >> 32) & 0xFFFFFFFF);

DWORD bytesRead = 0;
if (!ReadFile(t_hFile, current_ptr, to_read, &bytesRead, &ov) || bytesRead != to_read) {
if (GetLastError() != ERROR_IO_PENDING) {
overall_status = false;
break;
}
}

remaining_size -= bytesRead;
current_ptr += bytesRead;
current_file_offset += bytesRead;
}
CloseHandle(t_hFile);
}));

current_offset += read_size;
}

for (auto& f : futures) {
f.get();
}
return overall_status;
}
#else
bool ov::util::read_binary_file_parallel(const std::filesystem::path& path, void* buffer, size_t size, size_t offset) {
std::ifstream ifs(path, std::ios::binary);
if (!ifs.is_open())
return false;

const size_t num_threads = std::min(static_cast<size_t>(std::thread::hardware_concurrency()), size / (1024 * 1024));
// Fallback to single thread if not enough work or threads
if (num_threads <= 1) {
ifs.seekg(offset, std::ios::beg);
ifs.read(static_cast<char*>(buffer), size);
return ifs.good();
}

std::vector<std::future<void>> futures;
size_t chunk_size = size / num_threads;
chunk_size = (chunk_size + 4095) & ~4095;
size_t current_offset = 0;

// We open file in each thread to have independent file pointers
for (size_t i = 0; i < num_threads; i++) {
size_t read_size = (i == num_threads - 1) ? (size - current_offset) : chunk_size;
if (read_size == 0)
break;

void* ptr = static_cast<char*>(buffer) + current_offset;
size_t file_offset = offset + current_offset;

futures.emplace_back(std::async(std::launch::async, [path, file_offset, ptr, read_size] {
std::ifstream t_ifs(path, std::ios::binary);
if (t_ifs.is_open()) {
t_ifs.seekg(file_offset, std::ios::beg);
t_ifs.read(static_cast<char*>(ptr), read_size);
}
}));

current_offset += read_size;
}

for (auto& f : futures) {
f.get();
}
return true; // Simplified error handling for parallel ifstream
}
#endif

void ov::util::save_binary(const std::filesystem::path& path, const void* binary, size_t bin_size) {
if (std::ofstream out_file(path, std::ios::binary); out_file.is_open()) {
out_file.write(reinterpret_cast<const char*>(binary), bin_size);
Expand Down
13 changes: 13 additions & 0 deletions src/inference/src/dev/core_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,19 @@ ov::SoPtr<ov::ICompiledModel> ov::CoreImpl::load_model_from_cache(
}
}

// Pass the cached blob file path to plugins that support it (e.g. GPU plugin)
// so they can use optimized parallel I/O to read weights directly from the blob file
if (!cacheContent.m_blob_id.empty() && util::contains(plugin.get_property(ov::supported_properties),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core logic should be avoided, especially for device specific properties.
The cache entry is manage by cache manger and here should not be any logic to add such property or bypass what is opened cache manger. Also use any hardcoded path is not correct.

The proper solution is open the stream (fast version) or (mmap) version which allow to read the parallel better read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At early time, I also hope to do so, but ifstream cannot meet requirement of parallel read due to each thread need seek to different offset to read data. Could you give some test sample of such parallel read with stream?

ov::PropertyName("GPU_CACHED_BLOB_PATH"))) {
if (auto cache_dir_it = config.find(ov::cache_dir.name()); cache_dir_it != config.end()) {
auto blob_path = std::filesystem::path(cache_dir_it->second.as<std::string>()) /
(cacheContent.m_blob_id + ".blob");
if (ov::util::file_exists(blob_path)) {
update_config["GPU_CACHED_BLOB_PATH"] = util::path_to_string(blob_path);
}
}
}

ov::util::VariantVisitor model_importer{
[&](const ov::Tensor& compiled_blob) -> ov::SoPtr<ov::ICompiledModel> {
const ov::Tensor compiled_blob_without_header{compiled_blob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ class BinaryInputBuffer : public InputBuffer<BinaryInputBuffer> {
virtual void read(void* const data, std::streamsize size) {
auto const read_size = _stream.rdbuf()->sgetn(reinterpret_cast<char*>(data), size);
OPENVINO_ASSERT(read_size == size,
"[GPU] Failed to read " + std::to_string(size) + " bytes from stream! Read " + std::to_string(read_size));
"[GPU] Failed to read " + std::to_string(size) + " bytes to stream! Read " + std::to_string(read_size));
}

std::istream& get_stream() { return _stream; }

void setKernelImplParams(void* impl_params) { _impl_params = impl_params; }
void* getKernelImplParams() const { return _impl_params; }

Expand Down
Loading
Loading