Skip to content

Commit 40605ad

Browse files
committed
fix: SSE 在 AUTO 模式下无法识别某些字符串格式的问题
1 parent d9a651e commit 40605ad

File tree

3 files changed

+256
-10
lines changed

3 files changed

+256
-10
lines changed

forest-core/src/main/java/com/dtflys/forest/http/ForestSSE.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,16 +925,69 @@ protected EventSource readMultiLines(SSEEventList eventList, final ForestRespons
925925
}
926926
continue;
927927
}
928-
final EventSource eventSource = parseEventSource(eventList, response, line);
928+
929+
final char firstChar = line.trim().charAt(0);
930+
EventSource eventSource = null;
931+
if (firstChar == '[' || firstChar == '{' || firstChar == '<' || firstChar == '"' || firstChar == '\'') {
932+
eventSource = new EventSource(eventList, this, "", request, response, line, line);
933+
} else {
934+
final String[] group = line.split("\\:", 2);
935+
if (group.length == 1) {
936+
eventSource = new EventSource(eventList, this, "", request, response, line, line);
937+
} else {
938+
final String name = group[0].trim();
939+
final String data = StringUtils.trimBegin(group[1]);
940+
if (eventList.contains(name)) {
941+
if (lastEventSource != null) {
942+
doOnMessage(lastEventSource, lastEventSource.name(), lastEventSource.value(), sink);
943+
if (state != SSEState.LISTENING || SSEMessageResult.CLOSE.equals(lastEventSource.messageResult())) {
944+
return null;
945+
}
946+
eventList = new SSEEventList(this, request, response);
947+
EventSource thisEvent = new EventSource(eventList, this, name, request, response, line, data);
948+
doOnReceiveEventSource(eventList, thisEvent, thisEvent.name(), thisEvent.value(), sink);
949+
if (state != SSEState.LISTENING || SSEMessageResult.CLOSE.equals(thisEvent.messageResult())) {
950+
return thisEvent;
951+
}
952+
lastEventSource = null;
953+
continue;
954+
} else {
955+
EventSource event = eventList.lastEventSource();
956+
if (event != null) {
957+
doOnMessage(event, event.name(), event.value(), sink);
958+
if (state != SSEState.LISTENING || SSEMessageResult.CLOSE.equals(event.messageResult())) {
959+
return null;
960+
}
961+
}
962+
eventList = new SSEEventList(this, request, response);
963+
EventSource thisEvent = new EventSource(eventList, this, name, request, response, line, data);
964+
doOnReceiveEventSource(eventList, thisEvent, thisEvent.name(), thisEvent.value(), sink);
965+
if (state != SSEState.LISTENING || SSEMessageResult.CLOSE.equals(thisEvent.messageResult())) {
966+
return thisEvent;
967+
}
968+
lastEventSource = null;
969+
continue;
970+
}
971+
} else {
972+
eventSource = new EventSource(eventList, this, name, request, response, line, data);
973+
}
974+
}
975+
}
976+
929977
doOnReceiveEventSource(eventList, eventSource, eventSource.name(), eventSource.value(), sink);
930978
lastEventSource = eventSource;
931979
if (SSEState.LISTENING != state) {
932980
break;
933981
}
934982
}
983+
if (lastEventSource == null && !eventList.isEmpty()) {
984+
return eventList.lastEventSource();
985+
}
935986
return lastEventSource;
936987
}
937988

989+
990+
938991
protected void decodeLinesWithMultiLinesMode(final ForestResponse<InputStream> response, final InputStream in, final String charset, final SSESink sink) {
939992
EventSource lastEventSource = null;
940993

forest-core/src/main/java/com/dtflys/forest/sse/SSEEventList.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
import com.dtflys.forest.utils.TypeReference;
77

88
import java.util.ArrayList;
9+
import java.util.LinkedHashMap;
910
import java.util.List;
11+
import java.util.Map;
12+
import java.util.stream.Collectors;
1013

1114
public class SSEEventList {
1215

@@ -18,7 +21,7 @@ public class SSEEventList {
1821

1922
private final ForestResponse response;
2023

21-
private final List<EventSource> eventSources = new ArrayList<>(MAX_EVENTS_CAPACITY);
24+
private final Map<String, EventSource> eventSources = new LinkedHashMap<>(MAX_EVENTS_CAPACITY);
2225

2326
public ForestSSE sse() {
2427
return sse;
@@ -39,16 +42,19 @@ public SSEEventList(ForestSSE sse, ForestRequest request, ForestResponse respons
3942
}
4043

4144
void addEventSource(EventSource eventSource) {
42-
eventSources.add(eventSource);
45+
eventSources.put(eventSource.name(), eventSource);
4346
}
4447

4548
public EventSource eventSource(String name) {
46-
for (EventSource eventSource : eventSources) {
47-
if (eventSource.name().equals(name)) {
48-
return eventSource;
49-
}
50-
}
51-
return null;
49+
return eventSources.get(name);
50+
}
51+
52+
public EventSource lastEventSource() {
53+
return new ArrayList<>(eventSources.values()).get(size() - 1);
54+
}
55+
56+
public boolean contains(String name) {
57+
return eventSources.containsKey(name);
5258
}
5359

5460
public int size() {

forest-reactor/src/test/java/com/dtflys/forest/reactor/test/FluxTest.java

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void testFlux_sse() {
5959
Forest.get("http://localhost:{}", server.getPort())
6060
.execute(new TypeReference<Flux<String>>() {})
6161
.subscribe(data -> {
62-
System.out.print(data);
62+
System.out.println(data);
6363
});
6464
}
6565

@@ -100,6 +100,193 @@ public void testFlux_sse_toFlux() {
100100
);
101101
}
102102

103+
@Test
104+
public void testFlux_sse_toFlux_multilines() {
105+
int len = "{\"name\": \"a\"}\n".getBytes().length;
106+
server.enqueue(new MockResponse().setResponseCode(200)
107+
.setBody(
108+
"id:1\n" +
109+
"event:name\n" +
110+
"data:a\n" +
111+
"\n" +
112+
"id:2\n" +
113+
"event:name\n" +
114+
"data:b\n" +
115+
"\n" +
116+
"id:3\n" +
117+
"event:name\n" +
118+
"data:c\n" +
119+
"\n" +
120+
"id:4\n" +
121+
"event:name\n" +
122+
"data:d\n" +
123+
"\n" +
124+
"id:5\n" +
125+
"event:name\n" +
126+
"data:e\n" +
127+
"\n" +
128+
"id:6\n" +
129+
"event:name\n" +
130+
"data:f\n"
131+
).throttleBody(len, 1, TimeUnit.SECONDS));
132+
StringBuffer buffer = new StringBuffer();
133+
134+
Forest.get("http://localhost:{}", server.getPort())
135+
.sse(ReactorSSE.class)
136+
.setOnMessage((event, sink) -> {
137+
String id = event.id();
138+
String name = event.data(String.class);
139+
sink.next("id: " + id + ", name: " + name);
140+
})
141+
.toFlux(String.class)
142+
.subscribe(name -> {
143+
buffer.append("data -> " + name + "\n");
144+
System.out.println("data -> " + name);
145+
});
146+
147+
assertThat(buffer.toString()).isEqualTo(
148+
"data -> id: 1, name: a\n" +
149+
"data -> id: 2, name: b\n" +
150+
"data -> id: 3, name: c\n" +
151+
"data -> id: 4, name: d\n" +
152+
"data -> id: 5, name: e\n" +
153+
"data -> id: 6, name: f\n"
154+
);
155+
}
156+
157+
@Test
158+
public void testFlux_sse_toFlux_auto_multilines() {
159+
int len = ("id:1\n" +
160+
"event:name\n" +
161+
"data:a\n").getBytes().length;
162+
server.enqueue(new MockResponse().setResponseCode(200)
163+
.setBody(
164+
"id:1\n" +
165+
"event:name\n" +
166+
"data:a\n" +
167+
"id:2\n" +
168+
"event:name\n" +
169+
"data:b\n" +
170+
"id:3\n" +
171+
"event:name\n" +
172+
"data:c\n" +
173+
"id:4\n" +
174+
"event:name\n" +
175+
"data:d\n" +
176+
"id:5\n" +
177+
"event:name\n" +
178+
"data:e\n" +
179+
"id:6\n" +
180+
"event:name\n" +
181+
"data:f\n"
182+
).throttleBody(len, 1, TimeUnit.SECONDS));
183+
StringBuffer buffer = new StringBuffer();
184+
185+
Forest.get("http://localhost:{}", server.getPort())
186+
.sse(ReactorSSE.class)
187+
.setOnMessage((event, sink) -> {
188+
String id = event.id();
189+
String type = event.event();
190+
String value = event.data(String.class);
191+
sink.next("id: " + id + ", event: " + type + ", data: " + value);
192+
})
193+
.toFlux(String.class)
194+
.subscribe(name -> {
195+
buffer.append("value -> " + name + "\n");
196+
System.out.println("value -> " + name);
197+
});
198+
199+
assertThat(buffer.toString()).isEqualTo(
200+
"value -> id: 1, event: name, data: a\n" +
201+
"value -> id: 2, event: name, data: b\n" +
202+
"value -> id: 3, event: name, data: c\n" +
203+
"value -> id: 4, event: name, data: d\n" +
204+
"value -> id: 5, event: name, data: e\n" +
205+
"value -> id: 6, event: name, data: f\n"
206+
);
207+
}
208+
209+
210+
@Test
211+
public void testFlux_sse_toFlux_auto_2lines() {
212+
int len = ("id:1\n" +
213+
"data:a\n").getBytes().length;
214+
server.enqueue(new MockResponse().setResponseCode(200)
215+
.setBody(
216+
"id:1\n" +
217+
"data:a\n" +
218+
"id:2\n" +
219+
"data:b\n" +
220+
"id:3\n" +
221+
"data:c\n" +
222+
"id:4\n" +
223+
"data:d\n" +
224+
"id:5\n" +
225+
"data:e\n" +
226+
"id:6\n" +
227+
"data:f\n"
228+
).throttleBody(len, 1, TimeUnit.SECONDS));
229+
StringBuffer buffer = new StringBuffer();
230+
231+
Forest.get("http://localhost:{}", server.getPort())
232+
.sse(ReactorSSE.class)
233+
.setOnMessage((event, sink) -> {
234+
String id = event.id();
235+
String value = event.data(String.class);
236+
sink.next("id: " + id + ", data: " + value);
237+
})
238+
.toFlux(String.class)
239+
.subscribe(name -> {
240+
buffer.append("value -> " + name + "\n");
241+
System.out.println("value -> " + name);
242+
});
243+
244+
assertThat(buffer.toString()).isEqualTo(
245+
"value -> id: 1, data: a\n" +
246+
"value -> id: 2, data: b\n" +
247+
"value -> id: 3, data: c\n" +
248+
"value -> id: 4, data: d\n" +
249+
"value -> id: 5, data: e\n" +
250+
"value -> id: 6, data: f\n"
251+
);
252+
}
253+
254+
255+
@Test
256+
public void testFlux_sse_toFlux_auto_singleLine() {
257+
int len = ("data:a\n").getBytes().length;
258+
server.enqueue(new MockResponse().setResponseCode(200)
259+
.setBody(
260+
"data:a\n" +
261+
"data:b\n" +
262+
"data:c\n" +
263+
"data:d\n" +
264+
"data:e\n" +
265+
"data:f\n"
266+
).throttleBody(len, 1, TimeUnit.SECONDS));
267+
StringBuffer buffer = new StringBuffer();
268+
269+
Forest.get("http://localhost:{}", server.getPort())
270+
.sse(ReactorSSE.class)
271+
.setOnMessage((event, sink) -> {
272+
String data = event.data(String.class);
273+
sink.next("data: " + data);
274+
})
275+
.toFlux(String.class)
276+
.subscribe(name -> {
277+
buffer.append("value -> " + name + "\n");
278+
System.out.println("value -> " + name);
279+
});
280+
281+
assertThat(buffer.toString()).isEqualTo(
282+
"value -> data: a\n" +
283+
"value -> data: b\n" +
284+
"value -> data: c\n" +
285+
"value -> data: d\n" +
286+
"value -> data: e\n" +
287+
"value -> data: f\n"
288+
);
289+
}
103290

104291

105292
@Test

0 commit comments

Comments
 (0)