Skip to content

Commit 9432b35

Browse files
authored
Merge pull request #4897 from andymc12/sseBroadcasterFix_18003
SseBroadcaster Fix for 18.0.0.3
2 parents 6ac82d1 + e8334cc commit 9432b35

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.cxf.jaxrs.sse;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.Set;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.CopyOnWriteArraySet;
27+
import java.util.function.BiConsumer;
28+
import java.util.function.Consumer;
29+
30+
import javax.ws.rs.sse.OutboundSseEvent;
31+
import javax.ws.rs.sse.SseBroadcaster;
32+
import javax.ws.rs.sse.SseEventSink;
33+
34+
public class SseBroadcasterImpl implements SseBroadcaster {
35+
private final Set<SseEventSink> subscribers = new CopyOnWriteArraySet<>();
36+
37+
private final Set<Consumer<SseEventSink>> closers =
38+
new CopyOnWriteArraySet<>();
39+
40+
private final Set<BiConsumer<SseEventSink, Throwable>> exceptioners =
41+
new CopyOnWriteArraySet<>();
42+
43+
@Override
44+
public void register(SseEventSink sink) {
45+
subscribers.add(sink);
46+
}
47+
48+
@Override
49+
public CompletionStage<?> broadcast(OutboundSseEvent event) {
50+
final Collection<CompletableFuture<?>> futures = new ArrayList<>();
51+
52+
for (SseEventSink sink: subscribers) {
53+
try {
54+
futures.add(sink.send(event).toCompletableFuture());
55+
} catch (final Exception ex) {
56+
exceptioners.forEach(exceptioner -> exceptioner.accept(sink, ex));
57+
}
58+
}
59+
60+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
61+
}
62+
63+
@Override
64+
public void onClose(Consumer<SseEventSink> subscriber) {
65+
closers.add(subscriber);
66+
}
67+
68+
@Override
69+
public void onError(BiConsumer<SseEventSink, Throwable> exceptioner) {
70+
exceptioners.add(exceptioner);
71+
}
72+
73+
@Override
74+
public void close() {
75+
subscribers.forEach(subscriber -> {
76+
subscriber.close();
77+
closers.forEach(closer -> closer.accept(subscriber));
78+
});
79+
}
80+
}

0 commit comments

Comments
 (0)