Skip to content

Commit 6a1156e

Browse files
committed
feat: Adding support for SSE on the WebClient
Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent a02afbb commit 6a1156e

File tree

3 files changed

+757
-0
lines changed

3 files changed

+757
-0
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package io.vertx.ext.web.client.sse;
2+
3+
import io.vertx.core.AsyncResult;
4+
import io.vertx.core.Future;
5+
import io.vertx.core.Handler;
6+
import io.vertx.core.Promise;
7+
import io.vertx.core.buffer.Buffer;
8+
import io.vertx.core.streams.WriteStream;
9+
import io.vertx.ext.web.codec.BodyCodec;
10+
import io.vertx.ext.web.codec.spi.BodyStream;
11+
import java.nio.charset.StandardCharsets;
12+
13+
public class SseBodyCodec implements BodyCodec<Void> {
14+
15+
private final WriteStream<SseEvent> eventHandler;
16+
17+
private SseBodyCodec(WriteStream<SseEvent> eventHandler) {
18+
this.eventHandler = eventHandler;
19+
}
20+
21+
private static final int MAX_BUFFER = 8000000;
22+
23+
@Override
24+
public void create(Handler<AsyncResult<BodyStream<Void>>> handler) {
25+
handler.handle(Future.succeededFuture(new BodyStream<Void>() {
26+
private Buffer lineBuffer = Buffer.buffer();
27+
private SseEvent.Builder eventBuilder = SseEvent.builder();
28+
private Handler<Throwable> exceptionHandler;
29+
private Handler<Void> drainHandler;
30+
private int maxQueueSize = 8192;
31+
private boolean ended = false;
32+
33+
@Override
34+
public Future<Void> result() {
35+
return Future.succeededFuture();
36+
}
37+
38+
@Override
39+
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
40+
this.exceptionHandler = handler;
41+
return this;
42+
}
43+
44+
@Override
45+
public Future<Void> write(Buffer data) {
46+
Promise<Void> promise = Promise.promise();
47+
write(data, promise);
48+
return promise.future();
49+
}
50+
51+
@Override
52+
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
53+
if (ended) {
54+
handler.handle(Future.failedFuture("Stream is ended"));
55+
return;
56+
}
57+
58+
try {
59+
for (byte b : data.getBytes()) {
60+
if (b == '\n' || b == '\r') {
61+
String line = lineBuffer.toString(StandardCharsets.UTF_8);
62+
lineBuffer = Buffer.buffer();
63+
if (line.isEmpty()) {
64+
// Empty line dispatches the event
65+
if (eventHandler != null && eventBuilder.isReady()) {
66+
eventHandler.write(eventBuilder.build());
67+
}
68+
eventBuilder = SseEvent.builder();
69+
} else {
70+
parseLine(line, eventBuilder);
71+
}
72+
} else {
73+
if (lineBuffer.length() > MAX_BUFFER) {
74+
handler.handle(Future.failedFuture("Data is too big"));
75+
}
76+
lineBuffer.appendByte(b);
77+
}
78+
}
79+
handler.handle(Future.succeededFuture());
80+
} catch (Exception e) {
81+
if (exceptionHandler != null) {
82+
exceptionHandler.handle(e);
83+
}
84+
handler.handle(Future.failedFuture(e));
85+
}
86+
}
87+
88+
@Override
89+
public void end(Handler<AsyncResult<Void>> handler) {
90+
ended = true;
91+
// Process any remaining data in buffer
92+
if (lineBuffer.length() > 0) {
93+
String line = lineBuffer.toString(StandardCharsets.UTF_8);
94+
if (!line.isEmpty()) {
95+
parseLine(line, eventBuilder);
96+
}
97+
}
98+
// Dispatch final event if there's data
99+
if (eventHandler != null && eventBuilder.isReady()) {
100+
eventHandler.end(eventBuilder.build());
101+
}
102+
handler.handle(Future.succeededFuture());
103+
}
104+
105+
@Override
106+
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
107+
this.maxQueueSize = maxSize;
108+
return this;
109+
}
110+
111+
@Override
112+
public boolean writeQueueFull() {
113+
return lineBuffer.length() >= maxQueueSize;
114+
}
115+
116+
@Override
117+
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
118+
this.drainHandler = handler;
119+
return this;
120+
}
121+
122+
@Override
123+
public void handle(Throwable event) {
124+
if (exceptionHandler != null) {
125+
exceptionHandler.handle(event);
126+
}
127+
}
128+
}));
129+
}
130+
131+
void parseLine(String line, SseEvent.Builder builder) {
132+
int colonIndex = line.indexOf(':');
133+
if (colonIndex == 0) {
134+
return;
135+
}
136+
if (colonIndex == -1) {
137+
processField(builder, line, "");
138+
return;
139+
}
140+
String field = line.substring(0, colonIndex);
141+
String value = line.substring(colonIndex + 1);
142+
// Remove leading space from value if present (SSE spec)
143+
if (value.startsWith(" ")) {
144+
value = value.substring(1);
145+
}
146+
processField(builder, field, value);
147+
}
148+
149+
void processField(SseEvent.Builder builder, String field, String value) {
150+
// Field names must be compared literally, with no case folding performed.
151+
switch (field) {
152+
case "event":
153+
builder.event(value);
154+
break;
155+
case "data":
156+
builder.data(value);
157+
break;
158+
case "id":
159+
builder.id(value);
160+
break;
161+
case "retry":
162+
// If the field value consists of only ASCII digits, then interpret the field value as an
163+
// integer in base ten, and set the event stream's reconnection time to that integer.
164+
// Otherwise, ignore the field.
165+
try {
166+
builder.retry(Integer.parseInt(value));
167+
} catch (NumberFormatException ex) {
168+
throw new RuntimeException("Invalid \"retry\" value:" + value);
169+
}
170+
break;
171+
default:
172+
// Ignore unknown fields as per SSE spec
173+
break;
174+
}
175+
}
176+
177+
public static BodyCodec<Void> sseStream(WriteStream<SseEvent> handler) {
178+
return new SseBodyCodec(handler);
179+
}
180+
181+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package io.vertx.ext.web.client.sse;
2+
3+
public class SseEvent {
4+
5+
private final String id;
6+
private final String event;
7+
private final String data;
8+
private final int retry;
9+
10+
public SseEvent(String id, String event, String data, int retry) {
11+
this.id = id;
12+
this.event = event;
13+
this.data = data;
14+
this.retry = retry;
15+
}
16+
17+
public String id() {
18+
return id;
19+
}
20+
21+
public String event() {
22+
return event;
23+
}
24+
25+
public String data() {
26+
return data;
27+
}
28+
29+
public int retry() {
30+
return retry;
31+
}
32+
33+
@Override
34+
public boolean equals(Object obj) {
35+
if (this == obj) {
36+
return true;
37+
}
38+
if (obj == null || getClass() != obj.getClass()) {
39+
return false;
40+
}
41+
SseEvent sseEvent = (SseEvent) obj;
42+
return retry == sseEvent.retry
43+
&& java.util.Objects.equals(id, sseEvent.id)
44+
&& java.util.Objects.equals(event, sseEvent.event)
45+
&& java.util.Objects.equals(data, sseEvent.data);
46+
}
47+
48+
@Override
49+
public int hashCode() {
50+
return java.util.Objects.hash(id, event, data, retry);
51+
}
52+
53+
@Override
54+
public String toString() {
55+
return "SseEvent{"
56+
+ "id='" + id + '\''
57+
+ ", event='" + event + '\''
58+
+ ", data='" + data + '\''
59+
+ ", retry=" + retry
60+
+ '}';
61+
}
62+
63+
public static Builder builder() {
64+
return new Builder();
65+
}
66+
67+
public static class Builder {
68+
69+
private String id;
70+
private String event;
71+
private StringBuilder data = new StringBuilder();
72+
private int retry;
73+
74+
public Builder id(String id) {
75+
this.id = id;
76+
return this;
77+
}
78+
79+
public Builder event(String event) {
80+
this.event = event;
81+
return this;
82+
}
83+
84+
public Builder data(String data) {
85+
this.data.append(data);
86+
return this;
87+
}
88+
89+
public Builder retry(int retry) {
90+
this.retry = retry;
91+
return this;
92+
}
93+
94+
public SseEvent build() {
95+
return new SseEvent(this.id, this.event, this.data.toString(), this.retry);
96+
}
97+
98+
public boolean isReady() {
99+
return id != null || event != null || data.length() > 0 || retry > 0;
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)