Skip to content

Commit 2eb4144

Browse files
committed
Merge pull request #145 from akarnokd/SerializedProducer
Example synchronized serializing producer+subscription to
2 parents d581cdf + b2e2332 commit 2eb4144

File tree

2 files changed

+115
-20
lines changed

2 files changed

+115
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.reactivestreams;
17+
18+
import java.util.*;
19+
20+
import org.reactivestreams.Subscription;
21+
22+
public final class RxJavaSynchronizedProducer implements rx.Producer, rx.Subscription {
23+
private final Subscription subscription;
24+
private volatile boolean unsubscribed;
25+
/** Guarded by this. */
26+
private boolean emitting;
27+
/** Guarded by this. */
28+
private List<Long> requests;
29+
30+
public RxJavaSynchronizedProducer(Subscription subscription) {
31+
if (subscription == null) {
32+
throw new NullPointerException("subscription");
33+
}
34+
this.subscription = subscription;
35+
}
36+
@Override
37+
public boolean isUnsubscribed() {
38+
return unsubscribed;
39+
}
40+
@Override
41+
public void request(long n) {
42+
if (n > 0 && !unsubscribed) {
43+
synchronized (this) {
44+
if (unsubscribed) {
45+
return;
46+
}
47+
if (emitting) {
48+
if (requests == null) {
49+
requests = new ArrayList<Long>(4);
50+
}
51+
requests.add(n);
52+
return;
53+
}
54+
emitting = true;
55+
}
56+
boolean skipFinal = false;
57+
try {
58+
subscription.request(n);
59+
for (;;) {
60+
List<Long> list;
61+
synchronized (this) {
62+
list = requests;
63+
requests = null;
64+
if (list == null) {
65+
emitting = false;
66+
skipFinal = true;
67+
return;
68+
}
69+
}
70+
for (Long v : list) {
71+
if (v.longValue() == 0L) {
72+
unsubscribed = true;
73+
subscription.cancel();
74+
skipFinal = true;
75+
return;
76+
} else {
77+
subscription.request(v);
78+
}
79+
}
80+
}
81+
} finally {
82+
if (!skipFinal) {
83+
synchronized (this) {
84+
emitting = false;
85+
}
86+
}
87+
}
88+
}
89+
}
90+
@Override
91+
public void unsubscribe() {
92+
if (!unsubscribed) {
93+
synchronized (this) {
94+
if (unsubscribed) {
95+
return;
96+
}
97+
if (emitting) {
98+
// replace all pending requests with this single cancel indicator
99+
requests = new ArrayList<Long>(4);
100+
requests.add(0L);
101+
return;
102+
}
103+
emitting = true;
104+
}
105+
unsubscribed = true;
106+
subscription.cancel();
107+
// no need to leave emitting as this is a terminal state
108+
}
109+
}
110+
}

Diff for: rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/SubscriberAdapter.java

+5-20
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,10 @@
1515
*/
1616
package rx.internal.reactivestreams;
1717

18-
import org.reactivestreams.Subscriber;
19-
import org.reactivestreams.Subscription;
20-
import rx.Producer;
21-
import rx.functions.Action0;
22-
import rx.subscriptions.Subscriptions;
23-
2418
import java.util.concurrent.atomic.AtomicBoolean;
2519

20+
import org.reactivestreams.*;
21+
2622
public class SubscriberAdapter<T> implements Subscriber<T> {
2723

2824
private final rx.Subscriber<? super T> rxSubscriber;
@@ -39,21 +35,10 @@ public void onSubscribe(final Subscription rsSubscription) {
3935
}
4036

4137
if (started.compareAndSet(false, true)) {
42-
rxSubscriber.add(Subscriptions.create(new Action0() {
43-
@Override
44-
public void call() {
45-
rsSubscription.cancel();
46-
}
47-
}));
38+
RxJavaSynchronizedProducer sp = new RxJavaSynchronizedProducer(rsSubscription);
39+
rxSubscriber.add(sp);
4840
rxSubscriber.onStart();
49-
rxSubscriber.setProducer(new Producer() {
50-
@Override
51-
public void request(long n) {
52-
if (n > 0) {
53-
rsSubscription.request(n);
54-
}
55-
}
56-
});
41+
rxSubscriber.setProducer(sp);
5742
} else {
5843
rsSubscription.cancel();
5944
}

0 commit comments

Comments
 (0)