Skip to content

Commit add9e29

Browse files
committed
feat: Adding support for SSE on the WebClient
Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent a02afbb commit add9e29

File tree

3 files changed

+765
-0
lines changed

3 files changed

+765
-0
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright The WildFly Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package io.vertx.ext.web.client.sse;
6+
7+
import io.vertx.core.AsyncResult;
8+
import io.vertx.core.Future;
9+
import io.vertx.core.Handler;
10+
import io.vertx.core.Promise;
11+
import io.vertx.core.buffer.Buffer;
12+
import io.vertx.core.streams.WriteStream;
13+
import io.vertx.ext.web.codec.BodyCodec;
14+
import io.vertx.ext.web.codec.spi.BodyStream;
15+
import java.nio.charset.StandardCharsets;
16+
17+
public class SseBodyCodec implements BodyCodec<Void> {
18+
19+
private final WriteStream<SseEvent> eventHandler;
20+
21+
private SseBodyCodec(WriteStream<SseEvent> eventHandler) {
22+
this.eventHandler = eventHandler;
23+
}
24+
25+
private static final int MAX_BUFFER = 8000000;
26+
27+
@Override
28+
public void create(Handler<AsyncResult<BodyStream<Void>>> handler) {
29+
handler.handle(Future.succeededFuture(new BodyStream<Void>() {
30+
private Buffer lineBuffer = Buffer.buffer();
31+
private SseEvent.Builder eventBuilder = SseEvent.builder();
32+
private Handler<Throwable> exceptionHandler;
33+
private Handler<Void> drainHandler;
34+
private int maxQueueSize = 8192;
35+
private boolean ended = false;
36+
37+
@Override
38+
public Future<Void> result() {
39+
return Future.succeededFuture();
40+
}
41+
42+
@Override
43+
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
44+
this.exceptionHandler = handler;
45+
return this;
46+
}
47+
48+
@Override
49+
public Future<Void> write(Buffer data) {
50+
Promise<Void> promise = Promise.promise();
51+
write(data, promise);
52+
return promise.future();
53+
}
54+
55+
@Override
56+
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
57+
if (ended) {
58+
handler.handle(Future.failedFuture("Stream is ended"));
59+
return;
60+
}
61+
62+
try {
63+
for (byte b : data.getBytes()) {
64+
if (b == '\n' || b == '\r') {
65+
String line = lineBuffer.toString(StandardCharsets.UTF_8);
66+
lineBuffer = Buffer.buffer();
67+
if (line.isEmpty()) {
68+
// Empty line dispatches the event
69+
if (eventHandler != null && eventBuilder.isReady()) {
70+
eventHandler.write(eventBuilder.build());
71+
}
72+
eventBuilder = SseEvent.builder();
73+
} else {
74+
parseLine(line, eventBuilder);
75+
}
76+
} else {
77+
if (lineBuffer.length() > MAX_BUFFER) {
78+
handler.handle(Future.failedFuture("Data is too big"));
79+
}
80+
lineBuffer.appendByte(b);
81+
}
82+
}
83+
handler.handle(Future.succeededFuture());
84+
} catch (Exception e) {
85+
if (exceptionHandler != null) {
86+
exceptionHandler.handle(e);
87+
}
88+
handler.handle(Future.failedFuture(e));
89+
}
90+
}
91+
92+
@Override
93+
public void end(Handler<AsyncResult<Void>> handler) {
94+
ended = true;
95+
// Process any remaining data in buffer
96+
if (lineBuffer.length() > 0) {
97+
String line = lineBuffer.toString(StandardCharsets.UTF_8);
98+
if (!line.isEmpty()) {
99+
parseLine(line, eventBuilder);
100+
}
101+
}
102+
// Dispatch final event if there's data
103+
if (eventHandler != null && eventBuilder.isReady()) {
104+
eventHandler.end(eventBuilder.build());
105+
}
106+
handler.handle(Future.succeededFuture());
107+
}
108+
109+
@Override
110+
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
111+
this.maxQueueSize = maxSize;
112+
return this;
113+
}
114+
115+
@Override
116+
public boolean writeQueueFull() {
117+
return lineBuffer.length() >= maxQueueSize;
118+
}
119+
120+
@Override
121+
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
122+
this.drainHandler = handler;
123+
return this;
124+
}
125+
126+
@Override
127+
public void handle(Throwable event) {
128+
if (exceptionHandler != null) {
129+
exceptionHandler.handle(event);
130+
}
131+
}
132+
}));
133+
}
134+
135+
void parseLine(String line, SseEvent.Builder builder) {
136+
int colonIndex = line.indexOf(':');
137+
if (colonIndex == 0) {
138+
return;
139+
}
140+
if (colonIndex == -1) {
141+
processField(builder, line, "");
142+
return;
143+
}
144+
String field = line.substring(0, colonIndex);
145+
String value = line.substring(colonIndex + 1);
146+
// Remove leading space from value if present (SSE spec)
147+
if (value.startsWith(" ")) {
148+
value = value.substring(1);
149+
}
150+
processField(builder, field, value);
151+
}
152+
153+
void processField(SseEvent.Builder builder, String field, String value) {
154+
// Field names must be compared literally, with no case folding performed.
155+
switch (field) {
156+
case "event":
157+
builder.event(value);
158+
break;
159+
case "data":
160+
builder.data(value);
161+
break;
162+
case "id":
163+
builder.id(value);
164+
break;
165+
case "retry":
166+
// If the field value consists of only ASCII digits, then interpret the field value as an
167+
// integer in base ten, and set the event stream's reconnection time to that integer.
168+
// Otherwise, ignore the field.
169+
try {
170+
builder.retry(Integer.parseInt(value));
171+
} catch (NumberFormatException ex) {
172+
throw new RuntimeException("Invalid \"retry\" value:" + value);
173+
}
174+
break;
175+
default:
176+
// Ignore unknown fields as per SSE spec
177+
break;
178+
}
179+
}
180+
181+
public static BodyCodec<Void> sseStream(WriteStream<SseEvent> handler) {
182+
return new SseBodyCodec(handler);
183+
}
184+
185+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright The WildFly Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package io.vertx.ext.web.client.sse;
6+
7+
public class SseEvent {
8+
9+
private final String id;
10+
private final String event;
11+
private final String data;
12+
private final int retry;
13+
14+
public SseEvent(String id, String event, String data, int retry) {
15+
this.id = id;
16+
this.event = event;
17+
this.data = data;
18+
this.retry = retry;
19+
}
20+
21+
public String id() {
22+
return id;
23+
}
24+
25+
public String event() {
26+
return event;
27+
}
28+
29+
public String data() {
30+
return data;
31+
}
32+
33+
public int retry() {
34+
return retry;
35+
}
36+
37+
@Override
38+
public boolean equals(Object obj) {
39+
if (this == obj) {
40+
return true;
41+
}
42+
if (obj == null || getClass() != obj.getClass()) {
43+
return false;
44+
}
45+
SseEvent sseEvent = (SseEvent) obj;
46+
return retry == sseEvent.retry
47+
&& java.util.Objects.equals(id, sseEvent.id)
48+
&& java.util.Objects.equals(event, sseEvent.event)
49+
&& java.util.Objects.equals(data, sseEvent.data);
50+
}
51+
52+
@Override
53+
public int hashCode() {
54+
return java.util.Objects.hash(id, event, data, retry);
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return "SseEvent{"
60+
+ "id='" + id + '\''
61+
+ ", event='" + event + '\''
62+
+ ", data='" + data + '\''
63+
+ ", retry=" + retry
64+
+ '}';
65+
}
66+
67+
public static Builder builder() {
68+
return new Builder();
69+
}
70+
71+
public static class Builder {
72+
73+
private String id;
74+
private String event;
75+
private StringBuilder data = new StringBuilder();
76+
private int retry;
77+
78+
public Builder id(String id) {
79+
this.id = id;
80+
return this;
81+
}
82+
83+
public Builder event(String event) {
84+
this.event = event;
85+
return this;
86+
}
87+
88+
public Builder data(String data) {
89+
this.data.append(data);
90+
return this;
91+
}
92+
93+
public Builder retry(int retry) {
94+
this.retry = retry;
95+
return this;
96+
}
97+
98+
public SseEvent build() {
99+
return new SseEvent(this.id, this.event, this.data.toString(), this.retry);
100+
}
101+
102+
public boolean isReady() {
103+
return id != null || event != null || data.length() > 0 || retry > 0;
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)