Skip to content

Commit ffb995e

Browse files
committed
Add parallelReadStreamBuf for parallel read
1 parent c66d8c2 commit ffb995e

File tree

5 files changed

+545
-93
lines changed

5 files changed

+545
-93
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright (C) 2018-2026 Intel Corporation
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
5+
#pragma once
6+
7+
#include <algorithm>
8+
#include <cstring>
9+
#include <stdexcept>
10+
#include <streambuf>
11+
12+
#ifdef _WIN32
13+
# ifndef NOMINMAX
14+
# define NOMINMAX
15+
# endif
16+
# ifndef WIN32_LEAN_AND_MEAN
17+
# define WIN32_LEAN_AND_MEAN
18+
# endif
19+
# include <windows.h>
20+
#endif
21+
22+
#include "openvino/core/parallel.hpp"
23+
24+
namespace ov {
25+
namespace util {
26+
27+
/// @brief A std::streambuf that reads from an in-memory buffer using parallel
28+
/// memcpy for large reads.
29+
///
30+
/// Intended for mmap-backed tensors: the tensor's raw memory is already mapped
31+
/// into the process but pages may not yet be resident. For large reads,
32+
/// splitting the copy across N threads triggers concurrent page faults, raising
33+
/// the OS I/O queue depth and saturating NVMe bandwidth.
34+
///
35+
/// On Windows, after each large copy the consumed source pages are released
36+
/// from the process working-set via VirtualFree(MEM_RESET) to relieve RAM
37+
/// pressure when loading multi-GB models.
38+
///
39+
/// Usage:
40+
/// @code
41+
/// // In plugin::import_model(const ov::Tensor& model, ...):
42+
/// ov::util::ParallelMemStreamBuf par_buf(model.data(), model.get_byte_size());
43+
/// std::istream stream(&par_buf);
44+
/// // pass stream to BinaryInputBuffer or any std::istream& consumer
45+
/// @endcode
46+
class ParallelMemStreamBuf : public std::streambuf {
47+
public:
48+
static constexpr size_t DEFAULT_THRESHOLD = 4UL * 1024 * 1024; // 4 MB
49+
50+
/// @param data Pointer to the start of the memory region.
51+
/// @param size Total size of the memory region in bytes.
52+
/// @param threshold Minimum read size to engage parallel memcpy.
53+
ParallelMemStreamBuf(const void* data, size_t size, size_t threshold = DEFAULT_THRESHOLD)
54+
: m_begin(static_cast<const char*>(data)),
55+
m_end(static_cast<const char*>(data) + size),
56+
m_current(static_cast<const char*>(data)),
57+
m_threshold(threshold) {}
58+
59+
~ParallelMemStreamBuf() override = default;
60+
61+
ParallelMemStreamBuf(const ParallelMemStreamBuf&) = delete;
62+
ParallelMemStreamBuf& operator=(const ParallelMemStreamBuf&) = delete;
63+
64+
protected:
65+
// -----------------------------------------------------------------------
66+
// xsgetn: hot path — called by sgetn() for all bulk reads
67+
// -----------------------------------------------------------------------
68+
std::streamsize xsgetn(char_type* dst, std::streamsize n) override {
69+
if (m_current >= m_end) {
70+
return 0;
71+
}
72+
const std::streamsize avail = static_cast<std::streamsize>(m_end - m_current);
73+
const std::streamsize to_copy = std::min(n, avail);
74+
75+
if (static_cast<size_t>(to_copy) >= m_threshold) {
76+
parallel_copy(dst, m_current, static_cast<size_t>(to_copy));
77+
} else {
78+
std::memcpy(dst, m_current, static_cast<size_t>(to_copy));
79+
}
80+
81+
m_current += to_copy;
82+
return to_copy;
83+
}
84+
85+
// -----------------------------------------------------------------------
86+
// underflow: single-char peek path
87+
// -----------------------------------------------------------------------
88+
int_type underflow() override {
89+
if (m_current >= m_end) {
90+
return traits_type::eof();
91+
}
92+
return traits_type::to_int_type(*m_current);
93+
}
94+
95+
int_type uflow() override {
96+
if (m_current >= m_end) {
97+
return traits_type::eof();
98+
}
99+
return traits_type::to_int_type(*m_current++);
100+
}
101+
102+
// -----------------------------------------------------------------------
103+
// Seek support
104+
// -----------------------------------------------------------------------
105+
pos_type seekoff(off_type off,
106+
std::ios_base::seekdir way,
107+
std::ios_base::openmode /* which */) override {
108+
const char* new_pos = nullptr;
109+
if (way == std::ios_base::beg) {
110+
new_pos = m_begin + off;
111+
} else if (way == std::ios_base::cur) {
112+
new_pos = m_current + off;
113+
} else {
114+
new_pos = m_end + off;
115+
}
116+
117+
if (new_pos < m_begin || new_pos > m_end) {
118+
return pos_type(off_type(-1));
119+
}
120+
121+
m_current = new_pos;
122+
return pos_type(static_cast<off_type>(m_current - m_begin));
123+
}
124+
125+
pos_type seekpos(pos_type pos, std::ios_base::openmode /* which */) override {
126+
return seekoff(off_type(pos), std::ios_base::beg, std::ios_base::in);
127+
}
128+
129+
std::streamsize showmanyc() override {
130+
const std::streamsize avail = static_cast<std::streamsize>(m_end - m_current);
131+
return avail > 0 ? avail : -1;
132+
}
133+
134+
private:
135+
void parallel_copy(char* dst, const char* src, size_t size) {
136+
constexpr size_t MIN_CHUNK = 2UL * 1024 * 1024; // 2 MB minimum per thread
137+
const size_t num_chunks = std::max(size_t{1}, size / MIN_CHUNK);
138+
const size_t chunk_size = (size + num_chunks - 1) / num_chunks;
139+
140+
#ifdef _WIN32
141+
// Prefetch: trigger page faults up-front to maximise NVMe queue depth.
142+
WIN32_MEMORY_RANGE_ENTRY prefetch_range{const_cast<char*>(src), size};
143+
PrefetchVirtualMemory(GetCurrentProcess(), 1, &prefetch_range, 0);
144+
#endif
145+
146+
ov::parallel_for(num_chunks, [&](size_t i) {
147+
const size_t offset = i * chunk_size;
148+
const size_t copy_size = (i + 1 == num_chunks) ? (size - offset) : chunk_size;
149+
std::memcpy(dst + offset, src + offset, copy_size);
150+
});
151+
152+
#ifdef _WIN32
153+
// Release consumed mmap pages from the working-set to avoid RAM pressure
154+
// when loading multi-GB models. MEM_RESET marks pages as no longer needed;
155+
// the kernel may reclaim them without writing to the page file.
156+
constexpr uintptr_t PAGE_MASK = ~static_cast<uintptr_t>(4095u);
157+
const char* reset_begin =
158+
reinterpret_cast<const char*>(reinterpret_cast<uintptr_t>(src) & PAGE_MASK);
159+
const char* reset_end = reinterpret_cast<const char*>(
160+
(reinterpret_cast<uintptr_t>(src) + size) & PAGE_MASK);
161+
if (reset_begin < reset_end) {
162+
VirtualFree(const_cast<char*>(reset_begin),
163+
static_cast<SIZE_T>(reset_end - reset_begin),
164+
MEM_RESET);
165+
}
166+
#endif
167+
}
168+
169+
const char* m_begin;
170+
const char* m_end;
171+
const char* m_current;
172+
size_t m_threshold;
173+
};
174+
175+
} // namespace util
176+
} // namespace ov

0 commit comments

Comments
 (0)