Skip to content

Commit d290898

Browse files
authored
OAK-12072 : removed Guava's Monitor and Guard (#2721)
* OAK-12072 : removed Guava's Monitor and Guard * OAK-12072 : added unit cases for ChangeProcessor class
1 parent d7905a4 commit d290898

File tree

2 files changed

+235
-53
lines changed

2 files changed

+235
-53
lines changed

oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java

Lines changed: 97 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929

3030
import java.util.Map;
3131
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.concurrent.atomic.AtomicReference;
35+
import java.util.concurrent.locks.ReentrantLock;
3436

3537
import javax.jcr.observation.Event;
3638
import javax.jcr.observation.EventIterator;
@@ -73,9 +75,6 @@
7375
import org.slf4j.Logger;
7476
import org.slf4j.LoggerFactory;
7577

76-
import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
77-
import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard;
78-
7978
/**
8079
* A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
8180
* based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}.
@@ -417,8 +416,8 @@ public boolean excludes(NodeState root, CommitInfo info) {
417416
});
418417
}
419418

420-
private final Monitor runningMonitor = new Monitor();
421-
private final RunningGuard running = new RunningGuard(runningMonitor);
419+
private final AtomicBoolean stopped = new AtomicBoolean(false);
420+
private final ReentrantLock runningLock = new ReentrantLock(false);
422421

423422
/**
424423
* Try to stop this change processor if running. This method will wait
@@ -437,18 +436,23 @@ public boolean excludes(NodeState root, CommitInfo info) {
437436
*/
438437
public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
439438
Validate.checkState(registration != null, "Change processor not started");
440-
if (running.stop()) {
441-
if (runningMonitor.enter(timeOut, unit)) {
442-
registration.unregister();
443-
runningMonitor.leave();
444-
return true;
445-
} else {
446-
// Timed out
447-
return false;
448-
}
449-
} else {
450-
// Stopped already
439+
440+
if (!stopFlagSet()) {
441+
// already stopped, return from here
442+
return true;
443+
}
444+
445+
if (!enterWithTimeout(timeOut, unit)) {
446+
// timed out acquiring the running lock, return early
447+
return false;
448+
}
449+
450+
// lock has been acquired
451+
try {
452+
registration.unregister();
451453
return true;
454+
} finally {
455+
runningLock.unlock();
452456
}
453457
}
454458

@@ -460,9 +464,15 @@ public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
460464
*/
461465
public synchronized void stop() {
462466
Validate.checkState(registration != null, "Change processor not started");
463-
if (running.stop()) {
464-
registration.unregister();
465-
runningMonitor.leave();
467+
468+
if (stopFlagSet()) {
469+
// Wait until no contentChanged is in the critical section
470+
runningLock.lock();
471+
try {
472+
registration.unregister();
473+
} finally {
474+
runningLock.unlock();
475+
}
466476
}
467477
}
468478

@@ -503,19 +513,24 @@ public void contentChanged(@NotNull NodeState before,
503513
long time = System.nanoTime();
504514
boolean hasEvents = events.hasNext();
505515
tracker.recordProducerTime(System.nanoTime() - time, TimeUnit.NANOSECONDS);
506-
if (hasEvents && runningMonitor.enterIf(running)) {
507-
if (commitRateLimiter != null) {
508-
commitRateLimiter.beforeNonBlocking();
509-
}
516+
if (hasEvents && enterIfRunning()) {
517+
// lock has been acquired
510518
try {
511-
CountingIterator countingEvents = new CountingIterator(events);
512-
eventListener.onEvent(countingEvents);
513-
countingEvents.updateCounters(eventCount, eventDuration);
514-
} finally {
515519
if (commitRateLimiter != null) {
516-
commitRateLimiter.afterNonBlocking();
520+
commitRateLimiter.beforeNonBlocking();
517521
}
518-
runningMonitor.leave();
522+
try {
523+
CountingIterator countingEvents = new CountingIterator(events);
524+
eventListener.onEvent(countingEvents);
525+
countingEvents.updateCounters(eventCount, eventDuration);
526+
} finally {
527+
if (commitRateLimiter != null) {
528+
commitRateLimiter.afterNonBlocking();
529+
}
530+
}
531+
} finally {
532+
// unlock now
533+
runningLock.unlock();
519534
}
520535
}
521536
}
@@ -602,29 +617,6 @@ public void remove() {
602617
}
603618
}
604619

605-
private static class RunningGuard extends Guard {
606-
private boolean stopped;
607-
608-
public RunningGuard(Monitor monitor) {
609-
super(monitor);
610-
}
611-
612-
@Override
613-
public boolean isSatisfied() {
614-
return !stopped;
615-
}
616-
617-
/**
618-
* @return {@code true} if this call set this guard to stopped,
619-
* {@code false} if another call set this guard to stopped before.
620-
*/
621-
public boolean stop() {
622-
boolean wasStopped = stopped;
623-
stopped = true;
624-
return !wasStopped;
625-
}
626-
}
627-
628620
@Override
629621
public String toString() {
630622
return "ChangeProcessor ["
@@ -634,7 +626,7 @@ public String toString() {
634626
+ ", eventCount=" + eventCount
635627
+ ", eventDuration=" + eventDuration
636628
+ ", commitRateLimiter=" + commitRateLimiter
637-
+ ", running=" + running.isSatisfied() + "]";
629+
+ ", running=" + !stopped.get() + "]";
638630
}
639631

640632
/** for logging only **/
@@ -689,4 +681,56 @@ private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet ch
689681
return FilterResult.INCLUDE;
690682
}
691683
}
684+
685+
// helper methods for lock/unlocking
686+
687+
private boolean stopFlagSet() {
688+
// true only for the first caller that changes false -> true
689+
return stopped.compareAndSet(false, true);
690+
}
691+
692+
private boolean enterIfRunning() {
693+
runningLock.lock();
694+
boolean ok = false;
695+
try {
696+
ok = !stopped.get(); // guard: same as RunningGuard.isSatisfied()
697+
return ok;
698+
} finally {
699+
if (!ok) {
700+
runningLock.unlock();
701+
}
702+
}
703+
}
704+
705+
private boolean enterWithTimeout(long timeout, TimeUnit unit) {
706+
707+
// non-fair fast path
708+
if (runningLock.tryLock()) {
709+
return true;
710+
}
711+
712+
long timeoutNanos = unit.toNanos(timeout);
713+
714+
boolean interrupted = false;
715+
try {
716+
long start = System.nanoTime();
717+
long remaining = timeoutNanos;
718+
719+
while (remaining > 0L) {
720+
try {
721+
// timed out
722+
return runningLock.tryLock(remaining, TimeUnit.NANOSECONDS);
723+
} catch (InterruptedException e) {
724+
interrupted = true;
725+
long elapsed = System.nanoTime() - start;
726+
remaining = timeoutNanos - elapsed;
727+
}
728+
}
729+
return false; // timeout
730+
} finally {
731+
if (interrupted) {
732+
Thread.currentThread().interrupt();
733+
}
734+
}
735+
}
692736
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.jackrabbit.oak.jcr.observation;
20+
21+
import org.junit.Test;
22+
import org.mockito.Mockito;
23+
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
24+
import org.apache.jackrabbit.oak.api.ContentSession;
25+
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
26+
import org.apache.jackrabbit.commons.observation.ListenerTracker;
27+
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
28+
import org.apache.jackrabbit.oak.stats.StatisticManager;
29+
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
30+
import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
31+
import org.junit.Assert;
32+
33+
import java.lang.reflect.Field;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
36+
/**
37+
* Unit cases for {@link ChangeProcessor} class
38+
*/
39+
public class ChangeProcessorTest {
40+
41+
@Test
42+
public void testStopSetStoppedFlagCorrectly() {
43+
ChangeProcessor changeProcessor = createChangeProcessor();
44+
CompositeRegistration registration = Mockito.mock(CompositeRegistration.class);
45+
setRegistration(changeProcessor, registration);
46+
// Assert stopped flag is false before calling stop()
47+
Assert.assertFalse("stopped flag should be false before stop()", getStoppedFlag(changeProcessor).get());
48+
// Now call stop(), which should throw due to leave() without enter()
49+
changeProcessor.stop();
50+
Mockito.verify(registration).unregister();
51+
// Assert stopped flag is true
52+
Assert.assertTrue("stopped flag should be true after stop()", getStoppedFlag(changeProcessor).get());
53+
54+
55+
}
56+
57+
@Test
58+
public void testToStringRunningStateBeforeAndAfterStop() {
59+
ChangeProcessor changeProcessor = createChangeProcessor();
60+
CompositeRegistration registration = Mockito.mock(CompositeRegistration.class);
61+
setRegistration(changeProcessor, registration);
62+
63+
String toStringBeforeStop = changeProcessor.toString();
64+
System.out.println("Before stop: " + toStringBeforeStop);
65+
Assert.assertTrue("toString() before stop should indicate running=true: " + toStringBeforeStop,
66+
toStringBeforeStop.contains("running=true"));
67+
68+
changeProcessor.stop();
69+
70+
String toStringAfterStop = changeProcessor.toString();
71+
System.out.println("After stop: " + toStringAfterStop);
72+
Assert.assertTrue("toString() after stop should indicate running=false: " + toStringAfterStop,
73+
toStringAfterStop.contains("running=false"));
74+
}
75+
76+
@Test
77+
public void testStopIdempotency() {
78+
ChangeProcessor changeProcessor = createChangeProcessor();
79+
CompositeRegistration registration = Mockito.mock(CompositeRegistration.class);
80+
setRegistration(changeProcessor, registration);
81+
// Assert stopped flag is false before calling stop()
82+
Assert.assertFalse("stopped flag should be false before stop()", getStoppedFlag(changeProcessor).get());
83+
// Call stop() first time
84+
changeProcessor.stop();
85+
Assert.assertTrue("stopped flag should be true after first stop()", getStoppedFlag(changeProcessor).get());
86+
Mockito.verify(registration).unregister();
87+
// Call stop() second time
88+
changeProcessor.stop();
89+
Assert.assertTrue("stopped flag should remain true after second stop()", getStoppedFlag(changeProcessor).get());
90+
// unregister should only be called once
91+
Mockito.verify(registration, Mockito.times(1)).unregister();
92+
}
93+
94+
95+
// helper methods
96+
97+
private static ChangeProcessor createChangeProcessor() {
98+
ContentSession contentSession = Mockito.mock(ContentSession.class);
99+
NamePathMapper namePathMapper = Mockito.mock(NamePathMapper.class);
100+
ListenerTracker tracker = Mockito.mock(ListenerTracker.class);
101+
FilterProvider filter = Mockito.mock(FilterProvider.class);
102+
StatisticManager statisticManager = Mockito.mock(StatisticManager.class);
103+
int queueLength = 1;
104+
CommitRateLimiter commitRateLimiter = Mockito.mock(CommitRateLimiter.class);
105+
BlobAccessProvider blobAccessProvider = Mockito.mock(BlobAccessProvider.class);
106+
107+
return new ChangeProcessor(
108+
contentSession,
109+
namePathMapper,
110+
tracker,
111+
filter,
112+
statisticManager,
113+
queueLength,
114+
commitRateLimiter,
115+
blobAccessProvider
116+
);
117+
}
118+
119+
private static AtomicBoolean getStoppedFlag(ChangeProcessor changeProcessor) {
120+
try {
121+
Field stoppedField = ChangeProcessor.class.getDeclaredField("stopped");
122+
stoppedField.setAccessible(true);
123+
return (AtomicBoolean) stoppedField.get(changeProcessor);
124+
} catch (Exception e) {
125+
throw new RuntimeException(e);
126+
}
127+
}
128+
129+
private static void setRegistration(ChangeProcessor changeProcessor, CompositeRegistration registration) {
130+
try {
131+
Field regField = ChangeProcessor.class.getDeclaredField("registration");
132+
regField.setAccessible(true);
133+
regField.set(changeProcessor, registration);
134+
} catch (Exception e) {
135+
throw new RuntimeException(e);
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)