17
17
package org .springframework .integration .channel ;
18
18
19
19
import java .time .Duration ;
20
+ import java .util .ArrayList ;
21
+ import java .util .List ;
20
22
import java .util .concurrent .TimeUnit ;
21
23
import java .util .concurrent .atomic .AtomicReference ;
22
24
import java .util .concurrent .locks .LockSupport ;
30
32
import reactor .core .publisher .Sinks ;
31
33
import reactor .util .context .ContextView ;
32
34
35
+ import org .springframework .context .Lifecycle ;
33
36
import org .springframework .core .log .LogMessage ;
34
37
import org .springframework .integration .IntegrationMessageHeaderAccessor ;
35
38
import org .springframework .integration .StaticMessageHeaderAccessor ;
42
45
/**
43
46
* The {@link AbstractMessageChannel} implementation for the
44
47
* Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
48
+ * <p>
49
+ * This class implements {@link Lifecycle} to control subscriptions to publishers
50
+ * attached via {@link #subscribeTo(Publisher)}, when this channel is restarted.
45
51
*
46
52
* @author Artem Bilan
47
53
* @author Gary Russell
50
56
* @since 5.0
51
57
*/
52
58
public class FluxMessageChannel extends AbstractMessageChannel
53
- implements Publisher <Message <?>>, ReactiveStreamsSubscribableChannel {
59
+ implements Publisher <Message <?>>, ReactiveStreamsSubscribableChannel , Lifecycle {
54
60
55
61
private final Sinks .Many <Message <?>> sink = Sinks .many ().multicast ().onBackpressureBuffer (1 , false );
56
62
57
- private final Disposable .Composite upstreamSubscriptions = Disposables .composite ();
63
+ private final List <Publisher <? extends Message <?>>> sourcePublishers = new ArrayList <>();
64
+
65
+ private volatile Disposable .Composite upstreamSubscriptions = Disposables .composite ();
58
66
59
67
private volatile boolean active = true ;
60
68
@@ -111,19 +119,22 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
111
119
.subscribe (subscriber );
112
120
}
113
121
114
- private void addPublisherToSubscribe (Flux <?> publisher ) {
115
- AtomicReference <Disposable > disposableReference = new AtomicReference <>();
122
+ @ Override
123
+ public void start () {
124
+ this .active = true ;
125
+ this .upstreamSubscriptions = Disposables .composite ();
126
+ this .sourcePublishers .forEach (this ::doSubscribeTo );
127
+ }
116
128
117
- Disposable disposable =
118
- publisher
119
- .doOnTerminate (() -> disposeUpstreamSubscription (disposableReference ))
120
- .subscribe ();
129
+ @ Override
130
+ public void stop () {
131
+ this .active = false ;
132
+ this .upstreamSubscriptions .dispose ();
133
+ }
121
134
122
- if (!disposable .isDisposed ()) {
123
- if (this .upstreamSubscriptions .add (disposable )) {
124
- disposableReference .set (disposable );
125
- }
126
- }
135
+ @ Override
136
+ public boolean isRunning () {
137
+ return this .active ;
127
138
}
128
139
129
140
private void disposeUpstreamSubscription (AtomicReference <Disposable > disposableReference ) {
@@ -136,8 +147,14 @@ private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableR
136
147
137
148
@ Override
138
149
public void subscribeTo (Publisher <? extends Message <?>> publisher ) {
150
+ this .sourcePublishers .add (publisher );
151
+ doSubscribeTo (publisher );
152
+ }
153
+
154
+ private void doSubscribeTo (Publisher <? extends Message <?>> publisher ) {
139
155
Flux <Object > upstreamPublisher =
140
156
Flux .from (publisher )
157
+ .doOnComplete (() -> this .sourcePublishers .remove (publisher ))
141
158
.delaySubscription (
142
159
Mono .fromCallable (this .sink ::currentSubscriberCount )
143
160
.filter ((value ) -> value > 0 )
@@ -152,6 +169,21 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
152
169
addPublisherToSubscribe (upstreamPublisher );
153
170
}
154
171
172
+ private void addPublisherToSubscribe (Flux <?> publisher ) {
173
+ AtomicReference <Disposable > disposableReference = new AtomicReference <>();
174
+
175
+ Disposable disposable =
176
+ publisher
177
+ .doOnTerminate (() -> disposeUpstreamSubscription (disposableReference ))
178
+ .subscribe ();
179
+
180
+ if (!disposable .isDisposed ()) {
181
+ if (this .upstreamSubscriptions .add (disposable )) {
182
+ disposableReference .set (disposable );
183
+ }
184
+ }
185
+ }
186
+
155
187
private void sendReactiveMessage (Message <?> message ) {
156
188
Message <?> messageToSend = message ;
157
189
// We have just restored Reactor context, so no need in a header anymore.
@@ -169,14 +201,15 @@ private void sendReactiveMessage(Message<?> message) {
169
201
}
170
202
}
171
203
catch (Exception ex ) {
172
- logger .warn (ex , LogMessage .format ("Error during processing event: %s" , messageToSend ));
204
+ logger .error (ex , LogMessage .format ("Error during processing event: %s" , messageToSend ));
173
205
}
174
206
}
175
207
176
208
@ Override
177
209
public void destroy () {
178
210
this .active = false ;
179
211
this .upstreamSubscriptions .dispose ();
212
+ this .sourcePublishers .clear ();
180
213
this .sink .emitComplete (Sinks .EmitFailureHandler .busyLooping (Duration .ofSeconds (1 )));
181
214
super .destroy ();
182
215
}
0 commit comments