@@ -163,10 +163,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) {
163163 pool->grow (reserveBytes);
164164}
165165
166- void SharedArbitrator::releaseMemory (MemoryPool* pool) {
166+ uint64_t SharedArbitrator::releaseMemory (MemoryPool* pool, uint64_t bytes ) {
167167 std::lock_guard<std::mutex> l (mutex_);
168- const uint64_t freedBytes = pool->shrink (0 );
168+ const uint64_t freedBytes = pool->shrink (bytes );
169169 incrementFreeCapacityLocked (freedBytes);
170+ return freedBytes;
170171}
171172
172173std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats (
@@ -245,10 +246,7 @@ bool SharedArbitrator::ensureCapacity(
245246 if (checkCapacityGrowth (*requestor, targetBytes)) {
246247 return true ;
247248 }
248- const uint64_t reclaimedBytes = reclaim (requestor, targetBytes);
249- // NOTE: return the reclaimed bytes back to the arbitrator and let the memory
250- // arbitration process to grow the requestor's memory capacity accordingly.
251- incrementFreeCapacity (reclaimedBytes);
249+ reclaim (requestor, targetBytes);
252250 // Check if the requestor has been aborted in reclaim operation above.
253251 if (requestor->aborted ()) {
254252 ++numFailures_;
@@ -293,51 +291,57 @@ bool SharedArbitrator::arbitrateMemory(
293291 const uint64_t growTarget = std::min (
294292 maxGrowBytes (*requestor),
295293 std::max (memoryPoolTransferCapacity_, targetBytes));
296- uint64_t freedBytes = decrementFreeCapacity (growTarget);
297- if (freedBytes >= targetBytes) {
298- requestor->grow (freedBytes);
299- return true ;
300- }
301- VELOX_CHECK_LT (freedBytes, growTarget);
294+ uint64_t unusedFreedBytes = decrementFreeCapacity (growTarget);
302295
303296 auto freeGuard = folly::makeGuard ([&]() {
304297 // Returns the unused freed memory capacity back to the arbitrator.
305- if (freedBytes > 0 ) {
306- incrementFreeCapacity (freedBytes );
298+ if (unusedFreedBytes > 0 ) {
299+ incrementFreeCapacity (unusedFreedBytes );
307300 }
308301 });
309302
310- freedBytes +=
311- reclaimFreeMemoryFromCandidates (candidates, growTarget - freedBytes);
312- if (freedBytes >= targetBytes) {
313- const uint64_t bytesToGrow = std::min (growTarget, freedBytes);
314- requestor->grow (bytesToGrow);
315- freedBytes -= bytesToGrow;
303+ if (unusedFreedBytes >= targetBytes) {
304+ requestor->grow (unusedFreedBytes);
305+ unusedFreedBytes = 0 ;
306+ return true ;
307+ }
308+ VELOX_CHECK_LT (unusedFreedBytes, growTarget);
309+
310+ reclaimFreeMemoryFromCandidates (candidates, growTarget - unusedFreedBytes);
311+ unusedFreedBytes += decrementFreeCapacity (growTarget - unusedFreedBytes);
312+ if (unusedFreedBytes >= targetBytes) {
313+ requestor->grow (unusedFreedBytes);
314+ unusedFreedBytes = 0 ;
316315 return true ;
317316 }
318317
319- VELOX_CHECK_LT (freedBytes, growTarget);
320- freedBytes += reclaimUsedMemoryFromCandidates (
321- requestor, candidates, growTarget - freedBytes);
318+ VELOX_CHECK_LT (unusedFreedBytes, growTarget);
319+ reclaimUsedMemoryFromCandidates (
320+ requestor, candidates, growTarget - unusedFreedBytes);
321+ unusedFreedBytes += decrementFreeCapacity (growTarget - unusedFreedBytes);
322322 if (requestor->aborted ()) {
323323 ++numFailures_;
324324 VELOX_MEM_POOL_ABORTED (" The requestor pool has been aborted." );
325325 }
326326
327327 VELOX_CHECK (!requestor->aborted ());
328328
329- if (freedBytes < targetBytes) {
329+ if (unusedFreedBytes < targetBytes) {
330330 VELOX_MEM_LOG (WARNING)
331331 << " Failed to arbitrate sufficient memory for memory pool "
332332 << requestor->name () << " , request " << succinctBytes (targetBytes)
333- << " , only " << succinctBytes (freedBytes )
333+ << " , only " << succinctBytes (unusedFreedBytes )
334334 << " has been freed, Arbitrator state: " << toString ();
335335 return false ;
336336 }
337337
338- const uint64_t bytesToGrow = std::min (freedBytes, growTarget);
339- requestor->grow (bytesToGrow);
340- freedBytes -= bytesToGrow;
338+ if (unusedFreedBytes > growTarget) {
339+ requestor->grow (growTarget);
340+ unusedFreedBytes -= growTarget;
341+ return true ;
342+ }
343+ requestor->grow (unusedFreedBytes);
344+ unusedFreedBytes = 0 ;
341345 return true ;
342346}
343347
@@ -358,7 +362,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
358362 if (bytesToShrink <= 0 ) {
359363 break ;
360364 }
361- freedBytes += candidate.pool ->shrink (bytesToShrink);
365+ uint64_t shrunk = candidate.pool ->shrink (bytesToShrink);
366+ incrementFreeCapacity (shrunk);
367+ freedBytes += shrunk;
362368 if (freedBytes >= targetBytes) {
363369 break ;
364370 }
@@ -398,6 +404,7 @@ uint64_t SharedArbitrator::reclaim(
398404 uint64_t freedBytes{0 };
399405 try {
400406 freedBytes = pool->shrink (targetBytes);
407+ incrementFreeCapacity (freedBytes);
401408 if (freedBytes < targetBytes) {
402409 pool->reclaim (targetBytes - freedBytes);
403410 }
@@ -407,7 +414,7 @@ uint64_t SharedArbitrator::reclaim(
407414 abort (pool, std::current_exception ());
408415 // Free up all the free capacity from the aborted pool as the associated
409416 // query has failed at this point.
410- pool->shrink ();
417+ incrementFreeCapacity ( pool->shrink () );
411418 }
412419 const uint64_t newCapacity = pool->capacity ();
413420 VELOX_CHECK_GE (oldCapacity, newCapacity);
0 commit comments