Skip to content

Commit d499cde

Browse files
committed
Single and Completable converters (#150)
* Add Single and Completable converters. * Added remaining conversion classes and tests
1 parent fa725e1 commit d499cde

9 files changed

+817
-2
lines changed

Diff for: rxjava-reactive-streams/src/main/java/rx/RxReactiveStreams.java

+64-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package rx;
1717

1818
import org.reactivestreams.Publisher;
19-
import rx.internal.reactivestreams.PublisherAdapter;
20-
import rx.internal.reactivestreams.SubscriberAdapter;
19+
20+
import rx.internal.reactivestreams.*;
2121

2222
/**
2323
* This type provides static factory methods for converting to and from RxJava types and Reactive Streams types.
@@ -70,4 +70,66 @@ public static <T> org.reactivestreams.Subscriber<T> toSubscriber(final rx.Subscr
7070
return new SubscriberAdapter<T>(rxSubscriber);
7171
}
7272

73+
/**
74+
* Converts an RxJava Completable into a Publisher that emits only onError or onComplete.
75+
* @param <T> the target value type
76+
* @param completable the Completable instance to convert
77+
* @return the new Publisher instance
78+
* @since 1.1
79+
* @throws NullPointerException if completable is null
80+
*/
81+
public static <T> Publisher<T> toPublisher(Completable completable) {
82+
if (completable == null) {
83+
throw new NullPointerException("completable");
84+
}
85+
return new CompletableAsPublisher<T>(completable);
86+
}
87+
88+
/**
89+
* Converst a Publisher into a Completable by ignoring all onNext values and emitting
90+
* onError or onComplete only.
91+
* @param publisher the Publisher instance to convert
92+
* @return the Completable instance
93+
* @since 1.1
94+
* @throws NullPointerException if publisher is null
95+
*/
96+
public static Completable toCompletable(Publisher<?> publisher) {
97+
if (publisher == null) {
98+
throw new NullPointerException("publisher");
99+
}
100+
return Completable.create(new PublisherAsCompletable(publisher));
101+
}
102+
103+
/**
104+
* Converts a Single into a Publisher which emits an onNext+onComplete if
105+
* the source Single signals a non-null onSuccess; or onError if the source signals
106+
* onError(NullPointerException) or a null value.
107+
* @param single the Single instance to convert
108+
* @return the Publisher instance
109+
* @since 1.1
110+
* @throws NullPointerException if single is null
111+
*/
112+
public static <T> Publisher<T> toPublisher(Single<T> single) {
113+
if (single == null) {
114+
throw new NullPointerException("single");
115+
}
116+
return new SingleAsPublisher<T>(single);
117+
}
118+
119+
/**
120+
* Converts a Publisher into a Single which emits onSuccess if the
121+
* Publisher signals an onNext+onComplete; or onError if the publisher signals an
122+
* onError, the source Publisher is empty (NoSuchElementException) or the
123+
* source Publisher signals more than one onNext (IndexOutOfBoundsException).
124+
* @param publisher the Publisher instance to convert
125+
* @return the Single instance
126+
* @since 1.1
127+
* @throws NullPointerException if publisher is null
128+
*/
129+
public static <T> Single<T> toSingle(Publisher<T> publisher) {
130+
if (publisher == null) {
131+
throw new NullPointerException("publisher");
132+
}
133+
return Single.create(new PublisherAsSingle<T>(publisher));
134+
}
73135
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.reactivestreams;
18+
19+
import org.reactivestreams.*;
20+
21+
import rx.Completable;
22+
23+
/**
24+
* Wraps a Completable and exposes it as a Publisher.
25+
*
26+
* @param <T> the value type of the publisher
27+
*/
28+
public final class CompletableAsPublisher<T> implements Publisher<T> {
29+
30+
final Completable completable;
31+
32+
public CompletableAsPublisher(Completable completable) {
33+
this.completable = completable;
34+
}
35+
36+
@Override
37+
public void subscribe(Subscriber<? super T> s) {
38+
completable.subscribe(new CompletableAsPublisherSubscriber<T>(s));
39+
}
40+
41+
static final class CompletableAsPublisherSubscriber<T>
42+
implements Completable.CompletableSubscriber, Subscription {
43+
44+
final Subscriber<? super T> actual;
45+
46+
rx.Subscription d;
47+
48+
public CompletableAsPublisherSubscriber(Subscriber<? super T> actual) {
49+
this.actual = actual;
50+
}
51+
52+
@Override
53+
public void onSubscribe(rx.Subscription d) {
54+
this.d = d;
55+
actual.onSubscribe(this);
56+
}
57+
58+
@Override
59+
public void onError(Throwable e) {
60+
actual.onError(e);
61+
}
62+
63+
@Override
64+
public void onCompleted() {
65+
actual.onComplete();
66+
}
67+
68+
@Override
69+
public void request(long n) {
70+
// No values will be emitted
71+
}
72+
73+
@Override
74+
public void cancel() {
75+
d.unsubscribe();
76+
}
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.reactivestreams;
18+
19+
import org.reactivestreams.*;
20+
21+
import rx.Completable.CompletableSubscriber;
22+
23+
/**
24+
* Wraps an arbitrary Publisher and exposes it as a Completable, ignoring any onNext events.
25+
*/
26+
public final class PublisherAsCompletable implements rx.Completable.CompletableOnSubscribe {
27+
28+
final Publisher<?> publisher;
29+
30+
public PublisherAsCompletable(Publisher<?> publisher) {
31+
this.publisher = publisher;
32+
}
33+
34+
@Override
35+
public void call(CompletableSubscriber t) {
36+
publisher.subscribe(new PublisherAsCompletableSubscriber(t));
37+
}
38+
39+
static final class PublisherAsCompletableSubscriber implements Subscriber<Object>, rx.Subscription {
40+
41+
final CompletableSubscriber actual;
42+
43+
Subscription s;
44+
45+
volatile boolean unsubscribed;
46+
47+
public PublisherAsCompletableSubscriber(CompletableSubscriber actual) {
48+
this.actual = actual;
49+
}
50+
51+
@Override
52+
public void onSubscribe(Subscription s) {
53+
this.s = s;
54+
actual.onSubscribe(this);
55+
s.request(Long.MAX_VALUE);
56+
}
57+
58+
@Override
59+
public void onNext(Object t) {
60+
// values are ignored
61+
}
62+
63+
@Override
64+
public void onError(Throwable t) {
65+
actual.onError(t);
66+
}
67+
68+
@Override
69+
public void onComplete() {
70+
actual.onCompleted();
71+
}
72+
73+
@Override
74+
public boolean isUnsubscribed() {
75+
return unsubscribed;
76+
}
77+
78+
@Override
79+
public void unsubscribe() {
80+
if (!unsubscribed) {
81+
unsubscribed = true;
82+
s.cancel();
83+
}
84+
}
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.reactivestreams;
18+
19+
import java.util.NoSuchElementException;
20+
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
25+
import rx.*;
26+
27+
/**
28+
* Wraps a Publisher and exposes it as a Single, signalling NoSuchElementException
29+
* if the Publisher is empty or IndexOutOfBoundsExcepion if the Publisher produces
30+
* more than one element.
31+
*
32+
* @param <T> the value type
33+
*/
34+
public final class PublisherAsSingle<T> implements Single.OnSubscribe<T> {
35+
36+
final Publisher<T> publisher;
37+
38+
public PublisherAsSingle(Publisher<T> publisher) {
39+
this.publisher = publisher;
40+
}
41+
42+
@Override
43+
public void call(SingleSubscriber<? super T> t) {
44+
publisher.subscribe(new PublisherAsSingleSubscriber<T>(t));
45+
}
46+
47+
static final class PublisherAsSingleSubscriber<T> implements Subscriber<T>, rx.Subscription {
48+
49+
final SingleSubscriber<? super T> actual;
50+
51+
Subscription s;
52+
53+
T value;
54+
55+
boolean hasValue;
56+
57+
boolean done;
58+
59+
public PublisherAsSingleSubscriber(SingleSubscriber<? super T> actual) {
60+
this.actual = actual;
61+
}
62+
63+
@Override
64+
public void onSubscribe(Subscription s) {
65+
this.s = s;
66+
67+
actual.add(this);
68+
69+
s.request(Long.MAX_VALUE);
70+
}
71+
72+
@Override
73+
public void onNext(T t) {
74+
if (done) {
75+
return;
76+
}
77+
if (hasValue) {
78+
done = true;
79+
s.cancel();
80+
actual.onError(new IndexOutOfBoundsException("The source Publisher emitted multiple values"));
81+
} else {
82+
value = t;
83+
hasValue = true;
84+
}
85+
}
86+
87+
@Override
88+
public void onError(Throwable t) {
89+
if (done) {
90+
return;
91+
}
92+
actual.onError(t);
93+
}
94+
95+
@Override
96+
public void onComplete() {
97+
if (done) {
98+
return;
99+
}
100+
if (hasValue) {
101+
T v = value;
102+
value = null;
103+
actual.onSuccess(v);
104+
} else {
105+
actual.onError(new NoSuchElementException("The source Publisher was empty"));
106+
}
107+
}
108+
109+
@Override
110+
public boolean isUnsubscribed() {
111+
return actual.isUnsubscribed();
112+
}
113+
114+
@Override
115+
public void unsubscribe() {
116+
s.cancel();
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)