Skip to content

Commit 8a82325

Browse files
committed
[ntuple] Reduce memory usage of RPageSinkBuf
When IMT is turned on and RPageSinkBuf has an RTaskScheduler, we would previously buffer all pages and create tasks to seal / compress them. While this exposes the maximum work, it's a waste of memory if other threads are not fast enough to process the tasks. Heuristically assume that there is enough work if we already buffer more uncompressed bytes than the approximate zipped cluster size. In a small test, writing random data with ROOT::EnableImplicitMT(1) and therefore no extra worker thread, the application used 500 MB before this change for the default cluster size of 128 MiB. After this change, memory usage is reduced to around 430 MB (compared to a memory usage of 360 MB without IMT). The compression factor is around ~2.1x in this case, which roughly checks out: Instead of buffering the full uncompressed cluster (which is around compression factor * zipped cluster size = 270 MiB), we now buffer uncompressed pages up to the approximate zipped cluster size (128 MiB) and then start compressing pages immediately. The result of course also needs to be buffered, but is much smaller after compression: ((1 - 1 / compression factor) * zipped cluster size = 67 MiB). Accordingly, the gain will be higher for larger compression factors. (cherry picked from commit c421df1)
1 parent ed75267 commit 8a82325

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

tree/ntuple/inc/ROOT/RPageSinkBuf.hxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#include <ROOT/RNTupleMetrics.hxx>
2020
#include <ROOT/RPageStorage.hxx>
2121

22+
#include <atomic>
23+
#include <cstddef>
2224
#include <deque>
2325
#include <functional>
2426
#include <iterator>
@@ -109,6 +111,8 @@ private:
109111
/// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
110112
/// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
111113
std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
114+
/// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
115+
std::atomic<std::size_t> fBufferedUncompressed = 0;
112116
/// Vector of buffered column pages. Indexed by column id.
113117
std::vector<RColumnBuf> fBufferedColumns;
114118
/// Columns committed as suppressed are stored and passed to the inner sink at cluster commit

tree/ntuple/src/RPageSinkBuf.cxx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,13 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const
174174
}
175175
};
176176

177-
if (!fTaskScheduler) {
177+
// If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough
178+
// work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed
179+
// fast enough, and heuristically reduces the memory usage, especially for big compression factors.
180+
std::size_t bufferedUncompressed = fBufferedUncompressed.load();
181+
bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize();
182+
183+
if (!fTaskScheduler || enoughWork) {
178184
allocateBuf();
179185
// Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
180186
RSealPageConfig config;
@@ -193,16 +199,25 @@ void ROOT::Internal::RPageSinkBuf::CommitPage(ColumnHandle_t columnHandle, const
193199
return;
194200
}
195201

202+
// We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed
203+
// directly.
204+
fBufferedUncompressed += page.GetNBytes();
205+
196206
// TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
197207
zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
198208
// make sure the page is aware of how many elements it will have
199209
zipItem.fPage.GrowUnchecked(page.GetNElements());
210+
assert(zipItem.fPage.GetNBytes() == page.GetNBytes());
200211
memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
201212

202213
fCounters->fParallelZip.SetValue(1);
203214
// Thread safety: Each thread works on a distinct zipItem which owns its
204215
// compression buffer.
205216
fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
217+
// The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived
218+
// when we are done.
219+
fBufferedUncompressed -= zipItem.fPage.GetNBytes();
220+
206221
allocateBuf();
207222
RSealPageConfig config;
208223
config.fPage = &zipItem.fPage;
@@ -240,6 +255,7 @@ void ROOT::Internal::RPageSinkBuf::CommitSealedPageV(
240255
void ROOT::Internal::RPageSinkBuf::FlushClusterImpl(std::function<void(void)> FlushClusterFn)
241256
{
242257
WaitForAllTasks();
258+
assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed");
243259

244260
std::vector<RSealedPageGroup> toCommit;
245261
toCommit.reserve(fBufferedColumns.size());

0 commit comments

Comments
 (0)