1
1
package rx .observers ;
2
2
3
- import java .util .ArrayList ;
4
-
5
3
import rx .Observer ;
6
- import rx .operators .NotificationLite ;
7
4
8
5
/**
9
6
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
@@ -22,137 +19,166 @@ public class SerializedObserver<T> implements Observer<T> {
22
19
23
20
private boolean emitting = false ;
24
21
private boolean terminated = false ;
25
- private ArrayList <Object > queue = new ArrayList <Object >();
26
- private NotificationLite <T > on = NotificationLite .instance ();
22
+ private FastList queue ;
23
+
24
+ private static final int MAX_DRAIN_ITERATION = Integer .MAX_VALUE ;
25
+ private static final Object NULL_SENTINEL = new Object ();
26
+ private static final Object COMPLETE_SENTINEL = new Object ();
27
+
28
+ static final class FastList {
29
+ Object [] array ;
30
+ int size ;
31
+
32
+ public void add (Object o ) {
33
+ int s = size ;
34
+ Object [] a = array ;
35
+ if (a == null ) {
36
+ a = new Object [16 ];
37
+ array = a ;
38
+ } else if (s == a .length ) {
39
+ Object [] array2 = new Object [s + (s >> 2 )];
40
+ System .arraycopy (a , 0 , array2 , 0 , s );
41
+ a = array2 ;
42
+ array = a ;
43
+ }
44
+ a [s ] = o ;
45
+ size = s + 1 ;
46
+ }
47
+ }
48
+
49
+ private static final class ErrorSentinel {
50
+ final Throwable e ;
51
+
52
+ ErrorSentinel (Throwable e ) {
53
+ this .e = e ;
54
+ }
55
+ }
27
56
28
57
public SerializedObserver (Observer <? super T > s ) {
29
58
this .actual = s ;
30
59
}
31
60
32
61
@ Override
33
62
public void onCompleted () {
34
- boolean canEmit = false ;
35
- ArrayList <Object > list = null ;
63
+ FastList list ;
36
64
synchronized (this ) {
37
65
if (terminated ) {
38
66
return ;
39
67
}
40
68
terminated = true ;
41
- if (!emitting ) {
42
- // emit immediately
43
- emitting = true ;
44
- canEmit = true ;
45
- if (queue .size () > 0 ) {
46
- list = queue ; // copy reference
47
- queue = new ArrayList <Object >(); // new version;
48
- }
49
- } else {
50
- // someone else is already emitting so just queue it
51
- queue .add (on .completed ());
52
- }
53
- }
54
-
55
- if (canEmit ) {
56
- // we won the right to emit
57
- try {
58
- drainQueue (list );
59
- actual .onCompleted ();
60
- } finally {
61
- synchronized (this ) {
62
- emitting = false ;
69
+ if (emitting ) {
70
+ if (queue == null ) {
71
+ queue = new FastList ();
63
72
}
73
+ queue .add (COMPLETE_SENTINEL );
74
+ return ;
64
75
}
76
+ emitting = true ;
77
+ list = queue ;
78
+ queue = null ;
65
79
}
80
+ drainQueue (list );
81
+ actual .onCompleted ();
66
82
}
67
83
68
84
@ Override
69
85
public void onError (final Throwable e ) {
70
- boolean canEmit = false ;
71
- ArrayList <Object > list = null ;
86
+ FastList list ;
72
87
synchronized (this ) {
73
88
if (terminated ) {
74
89
return ;
75
90
}
76
91
terminated = true ;
77
- if (!emitting ) {
78
- // emit immediately
79
- emitting = true ;
80
- canEmit = true ;
81
- if (queue .size () > 0 ) {
82
- list = queue ; // copy reference
83
- queue = new ArrayList <Object >(); // new version;
84
- }
85
- } else {
86
- // someone else is already emitting so just queue it ... after eliminating the queue to shortcut
87
- queue .clear ();
88
- queue .add (on .error (e ));
89
- }
90
- }
91
- if (canEmit ) {
92
- // we won the right to emit
93
- try {
94
- drainQueue (list );
95
- actual .onError (e );
96
- } finally {
97
- synchronized (this ) {
98
- emitting = false ;
92
+ if (emitting ) {
93
+ if (queue == null ) {
94
+ queue = new FastList ();
99
95
}
96
+ queue .add (new ErrorSentinel (e ));
97
+ return ;
100
98
}
99
+ emitting = true ;
100
+ list = queue ;
101
+ queue = null ;
101
102
}
103
+ drainQueue (list );
104
+ actual .onError (e );
102
105
}
103
106
104
107
@ Override
105
108
public void onNext (T t ) {
106
- boolean canEmit = false ;
107
- ArrayList < Object > list = null ;
109
+ FastList list ;
110
+
108
111
synchronized (this ) {
109
112
if (terminated ) {
110
113
return ;
111
114
}
112
- if (!emitting ) {
113
- // emit immediately
114
- emitting = true ;
115
- canEmit = true ;
116
- if (queue .size () > 0 ) {
117
- list = queue ; // copy reference
118
- queue = new ArrayList <Object >(); // new version;
115
+ if (emitting ) {
116
+ if (queue == null ) {
117
+ queue = new FastList ();
119
118
}
120
- } else {
121
- // someone else is already emitting so just queue it
122
- queue . add ( on . next ( t )) ;
119
+ queue . add ( t != null ? t : NULL_SENTINEL );
120
+ // another thread is emitting so we add to the queue and return
121
+ return ;
123
122
}
123
+ // we can emit
124
+ emitting = true ;
125
+ // reference to the list to drain before emitting our value
126
+ list = queue ;
127
+ queue = null ;
124
128
}
125
- if (canEmit ) {
126
- // we won the right to emit
127
- try {
129
+
130
+ // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
131
+ try {
132
+ int iter = MAX_DRAIN_ITERATION ;
133
+ do {
128
134
drainQueue (list );
129
- actual .onNext (t );
130
- } finally {
131
- synchronized (this ) {
132
- if (terminated ) {
133
- list = queue ; // copy reference
134
- queue = new ArrayList <Object >(); // new version;
135
- } else {
136
- // release this thread
137
- emitting = false ;
138
- canEmit = false ;
135
+ if (iter == MAX_DRAIN_ITERATION ) {
136
+ // after the first draining we emit our own value
137
+ actual .onNext (t );
138
+ }
139
+ --iter ;
140
+ if (iter > 0 ) {
141
+ synchronized (this ) {
142
+ list = queue ;
143
+ queue = null ;
144
+ }
145
+ if (list == null ) {
146
+ break ;
139
147
}
140
148
}
149
+ } while (iter > 0 );
150
+ } finally {
151
+ synchronized (this ) {
152
+ if (terminated ) {
153
+ list = queue ;
154
+ queue = null ;
155
+ } else {
156
+ emitting = false ;
157
+ list = null ;
158
+ }
141
159
}
142
- }
143
-
144
- // if terminated this will still be true so let's drain the rest of the queue
145
- if (canEmit ) {
160
+ // this will only drain if terminated (done here outside of synchronized block)
146
161
drainQueue (list );
147
162
}
148
163
}
149
164
150
- public void drainQueue (ArrayList < Object > list ) {
151
- if (list == null || list .size () == 0 ) {
165
+ void drainQueue (FastList list ) {
166
+ if (list == null || list .size == 0 ) {
152
167
return ;
153
168
}
154
- for (Object v : list ) {
155
- on .accept (actual , v );
169
+ for (Object v : list .array ) {
170
+ if (v == null ) {
171
+ break ;
172
+ }
173
+ if (v == NULL_SENTINEL ) {
174
+ actual .onNext (null );
175
+ } else if (v == COMPLETE_SENTINEL ) {
176
+ actual .onCompleted ();
177
+ } else if (v .getClass () == ErrorSentinel .class ) {
178
+ actual .onError (((ErrorSentinel ) v ).e );
179
+ } else {
180
+ actual .onNext ((T ) v );
181
+ }
156
182
}
157
183
}
158
- }
184
+ }
0 commit comments