Skip to content

Commit 9be86a4

Browse files
committed
Upgrade to RxJava 3.0.0-RC3, adjust internals
1 parent 0b90b7f commit 9be86a4

25 files changed

+156
-259
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RxJava 3.x implementation of extra sources, operators and components and ports o
1313

1414
```
1515
dependencies {
16-
compile "com.github.akarnokd:rxjava3-extensions:3.0.0-RC2"
16+
compile "com.github.akarnokd:rxjava3-extensions:3.0.0-RC3"
1717
}
1818
```
1919

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dependencies {
4949
signature 'org.codehaus.mojo.signature:java16:1.1@signature'
5050

5151
compile "org.reactivestreams:reactive-streams:1.0.3"
52-
compile "io.reactivex.rxjava3:rxjava:3.0.0-RC2"
52+
compile "io.reactivex.rxjava3:rxjava:3.0.0-RC3"
5353

5454
testCompile group: 'junit', name: 'junit', version: '4.12'
5555

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=3.0.0-RC2
1+
version=3.0.0-RC3

src/main/java/hu/akarnokd/rxjava3/basetypes/NonoConcat.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.reactivex.rxjava3.internal.queue.*;
2626
import io.reactivex.rxjava3.internal.subscriptions.*;
2727
import io.reactivex.rxjava3.internal.util.*;
28-
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2928

3029
/**
3130
* Concatenate sources emitted by a Publisher one after another and complete after each complete.
@@ -64,7 +63,7 @@ abstract static class AbstractConcatSubscriber extends BasicIntQueueSubscription
6463

6564
final int limit;
6665

67-
final AtomicThrowable error;
66+
final AtomicThrowable errors;
6867

6968
final InnerSubscriber inner;
7069

@@ -86,7 +85,7 @@ abstract static class AbstractConcatSubscriber extends BasicIntQueueSubscription
8685
this.downstream = downstream;
8786
this.prefetch = prefetch;
8887
this.limit = prefetch - (prefetch >> 2);
89-
this.error = new AtomicThrowable();
88+
this.errors = new AtomicThrowable();
9089
this.inner = new InnerSubscriber();
9190
}
9291

@@ -240,14 +239,14 @@ static final class ConcatImmediateSubscriber extends AbstractConcatSubscriber {
240239

241240
@Override
242241
public void onError(Throwable t) {
243-
cancel();
244-
HalfSerializer.onError(downstream, t, this, error);
242+
cancelIf(true);
243+
HalfSerializer.onError(downstream, t, this, errors);
245244
}
246245

247246
@Override
248247
public void innerError(Throwable t) {
249-
cancel();
250-
HalfSerializer.onError(downstream, t, this, error);
248+
cancelIf(true);
249+
HalfSerializer.onError(downstream, t, this, errors);
251250
}
252251

253252
@Override
@@ -258,13 +257,21 @@ public void onComplete() {
258257

259258
@Override
260259
public void cancel() {
260+
cancelIf(false);
261+
}
262+
263+
void cancelIf(boolean error) {
261264
cancelled = true;
262265
upstream.cancel();
263266
inner.dispose();
267+
if (!error) {
268+
errors.tryTerminateAndReport();
269+
}
264270

265271
if (wip.getAndIncrement() == 0) {
266272
queue.clear();
267273
}
274+
268275
}
269276

270277
@Override
@@ -289,14 +296,14 @@ public void drain() {
289296
Exceptions.throwIfFatal(ex);
290297
upstream.cancel();
291298
queue.clear();
292-
HalfSerializer.onError(downstream, ex, this, error);
299+
HalfSerializer.onError(downstream, ex, this, errors);
293300
return;
294301
}
295302

296303
boolean empty = np == null;
297304

298305
if (d && empty) {
299-
HalfSerializer.onComplete(downstream, this, error);
306+
HalfSerializer.onComplete(downstream, this, errors);
300307
return;
301308
}
302309

@@ -324,11 +331,9 @@ static final class ConcatDelayedSubscriber extends AbstractConcatSubscriber {
324331

325332
@Override
326333
public void onError(Throwable t) {
327-
if (error.addThrowable(t)) {
334+
if (errors.tryAddThrowableOrReport(t)) {
328335
done = true;
329336
drain();
330-
} else {
331-
RxJavaPlugins.onError(t);
332337
}
333338
}
334339

@@ -343,6 +348,7 @@ public void cancel() {
343348
cancelled = true;
344349
upstream.cancel();
345350
inner.dispose();
351+
errors.tryTerminateAndReport();
346352

347353
if (getAndIncrement() == 0) {
348354
queue.clear();
@@ -362,9 +368,9 @@ void drain() {
362368
}
363369

364370
if (!active) {
365-
if (!tillTheEnd && error.get() != null) {
371+
if (!tillTheEnd && errors.get() != null) {
366372
queue.clear();
367-
downstream.onError(error.terminate());
373+
errors.tryTerminateConsumer(downstream);
368374
return;
369375
}
370376

@@ -378,21 +384,16 @@ void drain() {
378384
Exceptions.throwIfFatal(ex);
379385
upstream.cancel();
380386
queue.clear();
381-
error.addThrowable(ex);
387+
errors.tryAddThrowableOrReport(ex);
382388

383-
downstream.onError(error.terminate());
389+
errors.tryTerminateConsumer(downstream);
384390
return;
385391
}
386392

387393
boolean empty = np == null;
388394

389395
if (d && empty) {
390-
Throwable ex = error.terminate();
391-
if (ex != null) {
392-
downstream.onError(ex);
393-
} else {
394-
downstream.onComplete();
395-
}
396+
errors.tryTerminateConsumer(downstream);
396397
return;
397398
}
398399

@@ -408,14 +409,12 @@ void drain() {
408409

409410
@Override
410411
void innerError(Throwable t) {
411-
if (error.addThrowable(t)) {
412+
if (errors.tryAddThrowableOrReport(t)) {
412413
if (!tillTheEnd) {
413414
upstream.cancel();
414415
}
415416
active = false;
416417
drain();
417-
} else {
418-
RxJavaPlugins.onError(t);
419418
}
420419
}
421420
}

src/main/java/hu/akarnokd/rxjava3/basetypes/NonoConcatArray.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ static final class ConcatSubscriber extends BasicRefQueueSubscription<Void, Subs
7070
@Override
7171
public void cancel() {
7272
SubscriptionHelper.cancel(this);
73+
if (errors != null) {
74+
errors.tryTerminateAndReport();
75+
}
7376
}
7477

7578
@Override
@@ -86,9 +89,10 @@ public void onNext(Void t) {
8689
public void onError(Throwable t) {
8790
AtomicThrowable err = errors;
8891
if (err != null) {
89-
err.addThrowable(t);
90-
active = false;
91-
drain();
92+
if (err.tryAddThrowableOrReport(t)) {
93+
active = false;
94+
drain();
95+
}
9296
} else {
9397
downstream.onError(t);
9498
}
@@ -113,9 +117,8 @@ void drain() {
113117
if (!active) {
114118
int idx = index;
115119
if (idx == sources.length) {
116-
Throwable ex = errors != null ? errors.terminate() : null;
117-
if (ex != null) {
118-
downstream.onError(ex);
120+
if (errors != null) {
121+
errors.tryTerminateConsumer(downstream);
119122
} else {
120123
downstream.onComplete();
121124
}
@@ -129,8 +132,8 @@ void drain() {
129132
if (np == null) {
130133
NullPointerException npe = new NullPointerException("One of the sources is null");
131134
if (errors != null) {
132-
errors.addThrowable(npe);
133-
downstream.onError(errors.terminate());
135+
errors.tryAddThrowableOrReport(npe);
136+
errors.tryTerminateConsumer(downstream);
134137
} else {
135138
downstream.onError(npe);
136139
}

src/main/java/hu/akarnokd/rxjava3/basetypes/NonoConcatIterable.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ static final class ConcatSubscriber extends BasicRefQueueSubscription<Void, Subs
8181
@Override
8282
public void cancel() {
8383
SubscriptionHelper.cancel(this);
84+
if (errors != null) {
85+
errors.tryTerminateAndReport();
86+
}
8487
}
8588

8689
@Override
@@ -97,9 +100,10 @@ public void onNext(Void t) {
97100
public void onError(Throwable t) {
98101
AtomicThrowable err = errors;
99102
if (err != null) {
100-
err.addThrowable(t);
101-
active = false;
102-
drain();
103+
if (err.tryAddThrowableOrReport(t)) {
104+
active = false;
105+
drain();
106+
}
103107
} else {
104108
downstream.onError(t);
105109
}
@@ -133,17 +137,16 @@ void drain() {
133137
} catch (Throwable ex) {
134138
Exceptions.throwIfFatal(ex);
135139
if (errors != null) {
136-
errors.addThrowable(ex);
137-
downstream.onError(errors.terminate());
140+
errors.tryAddThrowableOrReport(ex);
141+
errors.tryTerminateConsumer(downstream);
138142
} else {
139143
downstream.onError(ex);
140144
}
141145
return;
142146
}
143147
if (!b) {
144-
Throwable ex = errors != null ? errors.terminate() : null;
145-
if (ex != null) {
146-
downstream.onError(ex);
148+
if (errors != null) {
149+
errors.tryTerminateConsumer(downstream);
147150
} else {
148151
downstream.onComplete();
149152
}

src/main/java/hu/akarnokd/rxjava3/basetypes/NonoMerge.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,11 @@ public void onNext(Nono t) {
9898

9999
@Override
100100
public void onError(Throwable t) {
101-
if (errors.addThrowable(t)) {
101+
if (errors.tryAddThrowable(t)) {
102102
if (!delayErrors) {
103103
set.dispose();
104104

105-
Throwable ex = errors.terminate();
106-
if (ex != ExceptionHelper.TERMINATED) {
107-
downstream.onError(ex);
108-
}
105+
errors.tryTerminateConsumer(downstream);
109106
} else {
110107
onComplete();
111108
}
@@ -117,12 +114,7 @@ public void onError(Throwable t) {
117114
@Override
118115
public void onComplete() {
119116
if (decrementAndGet() == 0) {
120-
Throwable ex = errors.terminate();
121-
if (ex != null) {
122-
downstream.onError(ex);
123-
} else {
124-
downstream.onComplete();
125-
}
117+
errors.tryTerminateConsumer(downstream);
126118
}
127119
}
128120

@@ -146,7 +138,7 @@ void complete() {
146138

147139
void innerError(Disposable inner, Throwable error) {
148140
set.delete(inner);
149-
if (errors.addThrowable(error)) {
141+
if (errors.tryAddThrowableOrReport(error)) {
150142
if (!delayErrors) {
151143
set.dispose();
152144

@@ -157,15 +149,14 @@ void innerError(Disposable inner, Throwable error) {
157149
} else {
158150
complete();
159151
}
160-
} else {
161-
RxJavaPlugins.onError(error);
162152
}
163153
}
164154

165155
@Override
166156
public void cancel() {
167157
upstream.cancel();
168158
set.dispose();
159+
errors.tryTerminateAndReport();
169160
}
170161

171162
final class MergeInnerSubscriber extends AtomicReference<Subscription> implements Subscriber<Void>, Disposable {

src/main/java/hu/akarnokd/rxjava3/basetypes/NonoMergeArray.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222

2323
import hu.akarnokd.rxjava3.util.CompositeSubscription;
2424
import io.reactivex.rxjava3.internal.subscriptions.*;
25-
import io.reactivex.rxjava3.internal.util.*;
26-
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
25+
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
2726

2827
/**
2928
* Run Nono sources in parallel and complete when all complete.
@@ -108,6 +107,7 @@ public void request(long n) {
108107
public void cancel() {
109108
cancelled = true;
110109
set.cancel();
110+
errors.tryTerminateAndReport();
111111
}
112112

113113
void subscribe(int n) {
@@ -143,16 +143,13 @@ void subscribe(int n) {
143143
Nono np = srcs[i];
144144

145145
if (np == null) {
146-
errors.addThrowable(new NullPointerException("A source is null"));
146+
errors.tryAddThrowableOrReport(new NullPointerException("A source is null"));
147147
if (delayErrors) {
148148
i = f;
149149
break;
150150
}
151151
set.cancel();
152-
Throwable ex = errors.terminate();
153-
if (ex != ExceptionHelper.TERMINATED) {
154-
downstream.onError(ex);
155-
}
152+
errors.tryTerminateConsumer(downstream);
156153
return;
157154
}
158155

@@ -190,20 +187,15 @@ void subscribe(int n) {
190187
@Override
191188
public void innerError(InnerSubscriber inner, Throwable ex) {
192189
set.delete(inner);
193-
if (errors.addThrowable(ex)) {
190+
if (errors.tryAddThrowableOrReport(ex)) {
194191
if (!delayErrors) {
195192
set.cancel();
196193

197-
ex = errors.terminate();
198-
if (ex != ExceptionHelper.TERMINATED) {
199-
downstream.onError(ex);
200-
}
194+
errors.tryTerminateConsumer(downstream);
201195
} else {
202196
subscribe(1);
203197
complete();
204198
}
205-
} else {
206-
RxJavaPlugins.onError(ex);
207199
}
208200
}
209201

0 commit comments

Comments
 (0)