15
15
*/
16
16
package rx .internal .reactivestreams ;
17
17
18
- import java .util .concurrent .ConcurrentHashMap ;
19
- import java .util .concurrent .ConcurrentMap ;
20
- import java .util .concurrent .atomic .AtomicBoolean ;
21
- import java .util .concurrent .atomic .AtomicLong ;
18
+ import java .util .concurrent .atomic .*;
22
19
23
- import org .reactivestreams .Publisher ;
24
- import org .reactivestreams .Subscriber ;
25
- import org .reactivestreams .Subscription ;
20
+ import org .reactivestreams .*;
26
21
27
22
import rx .Observable ;
28
23
import rx .internal .operators .BackpressureUtils ;
@@ -31,94 +26,84 @@ public class PublisherAdapter<T> implements Publisher<T> {
31
26
32
27
private final Observable <T > observable ;
33
28
34
- private final ConcurrentMap <Subscriber <?>, Object > subscribers = new ConcurrentHashMap <Subscriber <?>, Object >();
35
-
36
29
public PublisherAdapter (final Observable <T > observable ) {
37
30
this .observable = observable .serialize ();
38
31
}
39
32
40
33
@ Override
41
34
public void subscribe (final Subscriber <? super T > s ) {
42
- if (subscribers .putIfAbsent (s , s ) == null ) {
43
- observable .subscribe (new rx .Subscriber <T >() {
44
- private final AtomicBoolean done = new AtomicBoolean ();
45
- private final AtomicLong childRequested = new AtomicLong ();
46
- private void doRequest (long n ) {
47
- if (!done .get ()) {
48
- BackpressureUtils .getAndAddRequest (childRequested , n );
49
- request (n );
50
- }
35
+ observable .subscribe (new rx .Subscriber <T >() {
36
+ private final AtomicBoolean done = new AtomicBoolean ();
37
+ private final AtomicLong childRequested = new AtomicLong ();
38
+ private void doRequest (long n ) {
39
+ if (!done .get ()) {
40
+ BackpressureUtils .getAndAddRequest (childRequested , n );
41
+ request (n );
51
42
}
52
-
53
- @ Override
54
- public void onStart () {
55
- final AtomicBoolean requested = new AtomicBoolean ();
56
- s .onSubscribe (new Subscription () {
57
- @ Override
58
- public void request (long n ) {
59
- if (n < 1 ) {
60
- unsubscribe ();
61
- onError (new IllegalArgumentException ("3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST throw a java.lang.IllegalArgumentException if the argument is <= 0." ));
62
- return ;
63
- }
64
-
65
- requested .set (true );
66
- doRequest (n );
67
- }
68
-
69
- @ Override
70
- public void cancel () {
43
+ }
44
+
45
+ @ Override
46
+ public void onStart () {
47
+ final AtomicBoolean requested = new AtomicBoolean ();
48
+ s .onSubscribe (new Subscription () {
49
+ @ Override
50
+ public void request (long n ) {
51
+ if (n < 1 ) {
71
52
unsubscribe ();
72
- fireDone ();
53
+ onError (new IllegalArgumentException ("3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST throw a java.lang.IllegalArgumentException if the argument is <= 0." ));
54
+ return ;
73
55
}
74
- });
75
56
76
- if (! requested .get ()) {
77
- request ( 0 );
57
+ requested .set ( true );
58
+ doRequest ( n );
78
59
}
79
- }
80
60
81
- private boolean fireDone () {
82
- boolean first = done . compareAndSet ( false , true );
83
- if ( first ) {
84
- subscribers . remove ( s );
61
+ @ Override
62
+ public void cancel () {
63
+ unsubscribe ();
64
+ fireDone ( );
85
65
}
86
- return first ;
87
- }
66
+ });
88
67
89
- @ Override
90
- public void onCompleted () {
91
- if (fireDone ()) {
92
- s .onComplete ();
93
- }
68
+ if (!requested .get ()) {
69
+ request (0 );
94
70
}
71
+ }
95
72
96
- @ Override
97
- public void onError (Throwable e ) {
98
- if (fireDone ()) {
99
- s .onError (e );
100
- }
73
+ boolean fireDone () {
74
+ return done .compareAndSet (false , true );
75
+ }
76
+
77
+ @ Override
78
+ public void onCompleted () {
79
+ if (fireDone ()) {
80
+ s .onComplete ();
101
81
}
82
+ }
102
83
103
- @ Override
104
- public void onNext (T t ) {
105
- if (!done .get ()) {
106
- if (childRequested .get () > 0 ) {
107
- s .onNext (t );
108
- childRequested .decrementAndGet ();
109
- } else {
110
- try {
111
- onError (new IllegalStateException ("1.1 source doesn't respect backpressure" ));
112
- } finally {
113
- unsubscribe ();
114
- }
84
+ @ Override
85
+ public void onError (Throwable e ) {
86
+ if (fireDone ()) {
87
+ s .onError (e );
88
+ }
89
+ }
90
+
91
+ @ Override
92
+ public void onNext (T t ) {
93
+ if (!done .get ()) {
94
+ if (childRequested .get () > 0 ) {
95
+ s .onNext (t );
96
+ childRequested .decrementAndGet ();
97
+ } else {
98
+ try {
99
+ onError (new IllegalStateException ("1.1 source doesn't respect backpressure" ));
100
+ } finally {
101
+ unsubscribe ();
115
102
}
116
103
}
117
104
}
118
- });
119
- } else {
120
- s .onError (new IllegalArgumentException ("1.10 Subscriber cannot subscribe more than once" ));
121
- }
105
+ }
106
+ });
122
107
}
123
108
124
109
}
0 commit comments