Skip to content

Commit 9274840

Browse files
committed
Refactoring to remove the sse package
Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent ae27dfd commit 9274840

File tree

10 files changed

+111
-94
lines changed

10 files changed

+111
-94
lines changed

vertx-web-client/src/main/asciidoc/index.adoc

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -374,19 +374,11 @@ WARNING: this is only valid for the response decoded as a buffer.
374374

375375
=== Server-Sent Events
376376

377-
Using a specific body codec: {@link io.vertx.ext.web.codec.sse.SseBodyCodec}, you can decode a Server-Sent Events stream into a list of events:
378-
377+
Using a specific body codec: {@link io.vertx.ext.web.codec.impl.SseBodyCodec}, you can decode a Server-Sent Events stream into a list of events:
379378
[source,$lang]
380379
----
381-
client.get("/eventsource").as(SseBodyCodec.sseStream(stream -> {
382-
stream.handler(event -> //process the event);
383-
stream.endHandler(v -> {
384-
async.complete();
385-
});
386-
})).send();
380+
{@link examples.WebClientExamples#receiveResponseAsServerSentEvents(Vertx vertx, int servicePort)}
387381
----
388-
389-
390382
[[http-response-expectations]]
391383
=== Response expectations
392384

vertx-web-client/src/main/java/examples/WebClientExamples.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,11 @@
3838
import io.vertx.ext.web.client.WebClient;
3939
import io.vertx.ext.web.client.WebClientOptions;
4040
import io.vertx.ext.web.codec.BodyCodec;
41-
import io.vertx.ext.web.codec.sse.SseBodyCodec;
42-
import io.vertx.ext.web.codec.sse.SseEvent;
4341
import io.vertx.ext.web.multipart.MultipartForm;
4442
import io.vertx.uritemplate.ExpandOptions;
4543
import io.vertx.uritemplate.UriTemplate;
46-
import java.util.ArrayList;
4744

4845
import java.util.HashMap;
49-
import java.util.List;
5046
import java.util.Map;
5147

5248
/**
@@ -766,7 +762,7 @@ public static void clientSideConsistentHashing(Vertx vertx, int servicePort) {
766762
server.listen(servicePort);
767763
}
768764

769-
public static void receiveServerSideEvents(Vertx vertx, int servicePort) {
765+
public static void receiveResponseAsServerSentEvents(Vertx vertx, int servicePort) {
770766
WebClient client = WebClient.create(vertx, new WebClientOptions().setDefaultPort(servicePort).setDefaultHost("localhost"));
771767

772768
HttpServer server = vertx.createHttpServer()
@@ -795,9 +791,9 @@ public void handle(Long timerId) {
795791
});
796792
});
797793
server.listen(servicePort);
798-
final List<SseEvent> events = new ArrayList<>();
799-
client.get("/basic?count=5").as(SseBodyCodec.sseStream(stream -> {
800-
stream.handler(events::add);
794+
795+
client.get("/basic?count=5").as(BodyCodec.sseStream(stream -> {
796+
stream.handler(v -> System.out.println("Event received " + v));
801797
stream.endHandler(v -> System.out.println("End of stream " + v));
802798
})).send().expecting(HttpResponseExpectation.SC_OK)
803799
.onSuccess(res ->

vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/SseClientTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
import io.vertx.ext.unit.junit.VertxUnitRunner;
99
import io.vertx.ext.web.client.WebClient;
1010
import io.vertx.ext.web.client.WebClientOptions;
11-
import io.vertx.ext.web.codec.sse.SseBodyCodec;
12-
import io.vertx.ext.web.codec.sse.SseEvent;
11+
import io.vertx.ext.web.codec.spi.SseEvent;
1312
import java.util.ArrayList;
1413
import java.util.List;
1514
import java.util.concurrent.atomic.AtomicInteger;

vertx-web-common/src/main/java/io/vertx/ext/web/codec/BodyCodec.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717

1818
import io.vertx.codegen.annotations.GenIgnore;
1919
import io.vertx.codegen.annotations.VertxGen;
20+
import io.vertx.core.Handler;
2021
import io.vertx.core.buffer.Buffer;
2122
import io.vertx.core.json.JsonArray;
2223
import io.vertx.core.json.JsonObject;
2324
import io.vertx.core.parsetools.JsonParser;
25+
import io.vertx.core.streams.ReadStream;
2426
import io.vertx.core.streams.WriteStream;
2527
import io.vertx.ext.web.codec.impl.BodyCodecImpl;
2628
import io.vertx.ext.web.codec.impl.JsonStreamBodyCodec;
29+
import io.vertx.ext.web.codec.impl.SseBodyCodec;
2730
import io.vertx.ext.web.codec.impl.StreamingBodyCodec;
2831
import io.vertx.ext.web.codec.spi.BodyStream;
32+
import io.vertx.ext.web.codec.impl.SseEventImpl;
2933

3034
import java.util.function.Function;
3135

@@ -137,6 +141,16 @@ static BodyCodec<Void> jsonStream(JsonParser parser) {
137141
return new JsonStreamBodyCodec(parser);
138142
}
139143

144+
/**
145+
* A body codec that parse the response as a Server-SentEvent stream.
146+
*
147+
* @param handler the non-null hander to handle the stream of Server-Sent Events.
148+
* @return the body codec for a write stream
149+
*/
150+
static BodyCodec<Void> sseStream(Handler<ReadStream<SseEventImpl>> handler) {
151+
return new SseBodyCodec(handler);
152+
}
153+
140154
/**
141155
* Create the {@link BodyStream}.
142156
* <p>

vertx-web-common/src/main/java/io/vertx/ext/web/codec/sse/SseBodyCodecImpl.java renamed to vertx-web-common/src/main/java/io/vertx/ext/web/codec/impl/SseBodyCodec.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
1-
package io.vertx.ext.web.codec.sse;
1+
package io.vertx.ext.web.codec.impl;
22

33
import io.vertx.codegen.annotations.Nullable;
44
import io.vertx.core.Future;
55
import io.vertx.core.Handler;
66
import io.vertx.core.buffer.Buffer;
77
import io.vertx.core.streams.ReadStream;
88
import io.vertx.core.streams.WriteStream;
9+
import io.vertx.ext.web.codec.BodyCodec;
910
import io.vertx.ext.web.codec.spi.BodyStream;
1011
import java.util.concurrent.atomic.AtomicLong;
1112

1213
/**
1314
* A codec for processing and decoding Server Sent Event from streaming HTTP body content .
1415
*/
15-
public class SseBodyCodecImpl implements SseBodyCodec<Void> {
16+
public class SseBodyCodec implements BodyCodec<Void> {
1617

17-
private final Handler<ReadStream<SseEvent>> handler;
18+
private final Handler<ReadStream<SseEventImpl>> handler;
1819

19-
SseBodyCodecImpl(Handler<ReadStream<SseEvent>> handler) {
20+
public SseBodyCodec(Handler<ReadStream<SseEventImpl>> handler) {
2021
this.handler = handler;
2122
}
2223

@@ -27,12 +28,12 @@ public BodyStream<Void> stream() throws Exception {
2728
return stream;
2829
}
2930

30-
static class SseBodyStream implements BodyStream<Void>, ReadStream<SseEvent> {
31+
static class SseBodyStream implements BodyStream<Void>, ReadStream<SseEventImpl> {
3132

3233
private static final int LOW_WATERMARK = 1024;
3334
private static final int HIGH_WATERMARK = 4 * 1024;
3435

35-
private Handler<SseEvent> handler;
36+
private Handler<SseEventImpl> handler;
3637
private Handler<Void> endHandler;
3738
private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
3839
private Buffer content = Buffer.buffer();
@@ -43,26 +44,26 @@ static class SseBodyStream implements BodyStream<Void>, ReadStream<SseEvent> {
4344
private volatile boolean failed;
4445

4546
@Override
46-
public ReadStream<SseEvent> handler(@Nullable Handler<SseEvent> handler) {
47+
public ReadStream<SseEventImpl> handler(@Nullable Handler<SseEventImpl> handler) {
4748
this.handler = handler;
4849
return this;
4950
}
5051

5152
@Override
52-
public ReadStream<SseEvent> pause() {
53+
public ReadStream<SseEventImpl> pause() {
5354
demand.set(0L);
5455
return this;
5556
}
5657

5758
@Override
58-
public ReadStream<SseEvent> resume() {
59+
public ReadStream<SseEventImpl> resume() {
5960
demand.set(Long.MAX_VALUE);
6061
check();
6162
return this;
6263
}
6364

6465
@Override
65-
public ReadStream<SseEvent> fetch(long l) {
66+
public ReadStream<SseEventImpl> fetch(long l) {
6667
if (l <= 0) {
6768
return this;
6869
}
@@ -72,13 +73,13 @@ public ReadStream<SseEvent> fetch(long l) {
7273
}
7374

7475
@Override
75-
public ReadStream<SseEvent> endHandler(@Nullable Handler<Void> handler) {
76+
public ReadStream<SseEventImpl> endHandler(@Nullable Handler<Void> handler) {
7677
this.endHandler = handler;
7778
return this;
7879
}
7980

80-
SseEvent nextSseEvent() {
81-
SseEvent.Builder eventBuilder = SseEvent.builder();
81+
SseEventImpl nextSseEvent() {
82+
SseEventImpl.Builder eventBuilder = SseEventImpl.builder();
8283
int lineStart = 0;
8384
byte[] bytes = content.getBytes();
8485

@@ -110,7 +111,7 @@ void check() {
110111
if (demand.get() == 0L) {
111112
break;
112113
}
113-
SseEvent event;
114+
SseEventImpl event;
114115
try {
115116
synchronized (this) {
116117
event = nextSseEvent();
@@ -129,7 +130,7 @@ void check() {
129130
break;
130131
}
131132
demand.updateAndGet(d -> d == Long.MAX_VALUE ? d : d - 1);
132-
Handler<SseEvent> h = handler;
133+
Handler<SseEventImpl> h = handler;
133134
if (h != null) {
134135
h.handle(event);
135136
}

vertx-web-common/src/main/java/io/vertx/ext/web/codec/sse/SseEvent.java renamed to vertx-web-common/src/main/java/io/vertx/ext/web/codec/impl/SseEventImpl.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
1-
package io.vertx.ext.web.codec.sse;
1+
package io.vertx.ext.web.codec.impl;
22

33
import io.vertx.codegen.annotations.DataObject;
4+
import io.vertx.ext.web.codec.spi.SseEvent;
45

56
/**
67
* This represents a Server-Sent Event.
78
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html
89
*/
910
@DataObject
10-
public class SseEvent {
11+
public class SseEventImpl implements SseEvent {
1112

1213
private final String id;
1314
private final String event;
1415
private final String data;
1516
private final int retry;
1617

17-
public SseEvent(String id, String event, String data, int retry) {
18+
public SseEventImpl(String id, String event, String data, int retry) {
1819
this.id = id;
1920
this.event = event;
2021
this.data = data;
@@ -25,6 +26,7 @@ public SseEvent(String id, String event, String data, int retry) {
2526
* Returns the event id.
2627
* @return the event id.
2728
*/
29+
@Override
2830
public String id() {
2931
return id;
3032
}
@@ -33,6 +35,7 @@ public String id() {
3335
* Returns the type of the event.
3436
* @return the type of the event.
3537
*/
38+
@Override
3639
public String event() {
3740
return event;
3841
}
@@ -41,6 +44,7 @@ public String event() {
4144
* Returns the payload of the event.
4245
* @return the payload of the event.
4346
*/
47+
@Override
4448
public String data() {
4549
return data;
4650
}
@@ -49,6 +53,7 @@ public String data() {
4953
* Returns the reconnection time.
5054
* @return the reconnection time.
5155
*/
56+
@Override
5257
public int retry() {
5358
return retry;
5459
}
@@ -61,7 +66,7 @@ public boolean equals(Object o) {
6166
if (o == null || getClass() != o.getClass()) {
6267
return false;
6368
}
64-
SseEvent sseEvent = (SseEvent) o;
69+
SseEventImpl sseEvent = (SseEventImpl) o;
6570
return retry == sseEvent.retry
6671
&& java.util.Objects.equals(id, sseEvent.id)
6772
&& java.util.Objects.equals(event, sseEvent.event)
@@ -82,33 +87,33 @@ public String toString() {
8287
+ '\n';
8388
}
8489

85-
public static Builder builder() {
90+
static Builder builder() {
8691
return new Builder();
8792
}
8893

89-
public static class Builder {
94+
static class Builder {
9095

9196
private String id;
9297
private String event = "message";
9398
private StringBuilder data = new StringBuilder();
9499
private int retry;
95100

96-
public Builder id(String id) {
101+
Builder id(String id) {
97102
this.id = id;
98103
return this;
99104
}
100105

101-
public Builder event(String event) {
106+
Builder event(String event) {
102107
this.event = event;
103108
return this;
104109
}
105110

106-
public Builder data(String data) {
111+
Builder data(String data) {
107112
this.data.append(data);
108113
return this;
109114
}
110115

111-
public Builder retry(int retry) {
116+
Builder retry(int retry) {
112117
this.retry = retry;
113118
return this;
114119
}
@@ -159,8 +164,8 @@ private void processField(String field, String value) {
159164
}
160165
}
161166

162-
public SseEvent build() {
163-
return new SseEvent(this.id, this.event, this.data.toString(), this.retry);
167+
public SseEventImpl build() {
168+
return new SseEventImpl(this.id, this.event, this.data.toString(), this.retry);
164169
}
165170
}
166171
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.vertx.ext.web.codec.spi;
2+
3+
import io.vertx.codegen.annotations.DataObject;
4+
5+
/**
6+
* This represents a Server-Sent Event.
7+
* @see https://html.spec.whatwg.org/multipage/server-sent-events.html
8+
*/
9+
@DataObject
10+
public interface SseEvent {
11+
12+
/**
13+
* Returns the payload of the event.
14+
* @return the payload of the event.
15+
*/
16+
String data();
17+
18+
/**
19+
* Returns the type of the event.
20+
* @return the type of the event.
21+
*/
22+
String event();
23+
24+
/**
25+
* Returns the event id.
26+
* @return the event id.
27+
*/
28+
String id();
29+
30+
/**
31+
* Returns the reconnection time.
32+
* @return the reconnection time.
33+
*/
34+
int retry();
35+
36+
}

vertx-web-common/src/main/java/io/vertx/ext/web/codec/sse/SseBodyCodec.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)