Skip to content

Commit 2528f09

Browse files
authored
OAK-12033 : replaced Guava's Monitor with ReentrantLock (#2654)
1 parent b7d25c5 commit 2528f09

File tree

2 files changed

+42
-31
lines changed

2 files changed

+42
-31
lines changed

oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/FileStoreIT.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.TimeoutException;
4444
import java.util.concurrent.atomic.AtomicBoolean;
45+
import java.util.concurrent.locks.Condition;
46+
import java.util.concurrent.locks.Lock;
47+
import java.util.concurrent.locks.ReentrantLock;
4548

46-
import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
47-
import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard;
4849
import org.apache.jackrabbit.oak.api.Blob;
4950
import org.apache.jackrabbit.oak.plugins.memory.AbstractBlob;
5051
import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
@@ -257,20 +258,23 @@ public void blockingBlob() throws Exception {
257258
/* A blob that blocks on read until unblocked */
258259
class BlockingBlob extends AbstractBlob {
259260
private final AtomicBoolean blocking = new AtomicBoolean(true);
260-
private final Monitor readMonitor = new Monitor();
261+
private final Lock lock = new ReentrantLock();
262+
private final Condition readingCondition = lock.newCondition();
261263
private boolean reading = false;
262264

263265
public boolean waitForRead(int time, TimeUnit unit) throws InterruptedException {
264-
readMonitor.enter();
266+
long nanos = unit.toNanos(time);
267+
lock.lock();
265268
try {
266-
return readMonitor.waitFor(new Guard(readMonitor) {
267-
@Override
268-
public boolean isSatisfied() {
269-
return reading;
269+
while (!reading) {
270+
if (nanos <= 0L) {
271+
return false; // timed out
270272
}
271-
}, time, unit);
273+
nanos = readingCondition.awaitNanos(nanos);
274+
}
275+
return true; // reading == true
272276
} finally {
273-
readMonitor.leave();
277+
lock.unlock();
274278
}
275279
}
276280

@@ -295,13 +299,15 @@ public int read(@NotNull byte[] b, int off, int len) throws IOException {
295299

296300
private int readOrEnd() {
297301
if (blocking.get()) {
298-
if (!reading) {
299-
readMonitor.enter();
300-
try {
302+
lock.lock();
303+
try {
304+
if (!reading) {
301305
reading = true;
302-
} finally {
303-
readMonitor.leave();
306+
// wake up any threads waiting in waitForRead
307+
readingCondition.signalAll();
304308
}
309+
} finally {
310+
lock.unlock();
305311
}
306312
return 0;
307313
} else {

oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreIT.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.jackrabbit.oak.plugins.document;
1818

1919
import java.io.InputStream;
20-
import java.util.Date;
2120
import java.util.List;
2221
import java.util.concurrent.Callable;
2322
import java.util.concurrent.ExecutorService;
@@ -26,8 +25,9 @@
2625
import java.util.concurrent.TimeUnit;
2726
import java.util.concurrent.TimeoutException;
2827
import java.util.concurrent.atomic.AtomicBoolean;
29-
30-
import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
28+
import java.util.concurrent.locks.Condition;
29+
import java.util.concurrent.locks.Lock;
30+
import java.util.concurrent.locks.ReentrantLock;
3131

3232
import org.apache.jackrabbit.oak.api.CommitFailedException;
3333
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
@@ -454,20 +454,23 @@ private int getNodesFindCountOfAnUpdate(CountingDocumentStore cds, DocumentNodeS
454454
*/
455455
class BlockingBlob extends AbstractBlob {
456456
private final AtomicBoolean blocking = new AtomicBoolean(true);
457-
private final Monitor readMonitor = new Monitor();
457+
private final Lock lock = new ReentrantLock();
458+
private final Condition readingCondition = lock.newCondition();
458459
private boolean reading = false;
459460

460461
boolean waitForRead(int time, TimeUnit unit) throws InterruptedException {
461-
readMonitor.enter();
462+
long nanos = unit.toNanos(time);
463+
lock.lock();
462464
try {
463-
return readMonitor.waitFor(new Monitor.Guard(readMonitor) {
464-
@Override
465-
public boolean isSatisfied() {
466-
return reading;
465+
while (!reading) {
466+
if (nanos <= 0L) {
467+
return false; // timed out
467468
}
468-
}, time, unit);
469+
nanos = readingCondition.awaitNanos(nanos);
470+
}
471+
return true; // reading == true
469472
} finally {
470-
readMonitor.leave();
473+
lock.unlock();
471474
}
472475
}
473476

@@ -483,13 +486,15 @@ public InputStream getNewStream() {
483486
@Override
484487
public int read() {
485488
while (blocking.get()) {
486-
if (!reading) {
487-
readMonitor.enter();
488-
try {
489+
lock.lock();
490+
try {
491+
if (!reading) {
489492
reading = true;
490-
} finally {
491-
readMonitor.leave();
493+
// wake up anyone waiting in waitForRead()
494+
readingCondition.signalAll();
492495
}
496+
} finally {
497+
lock.unlock();
493498
}
494499
}
495500
return -1;

0 commit comments

Comments
 (0)