Skip to content

Commit c268b7e

Browse files
feat: add eventstreams optionaldata endpoint (#39)
1 parent 6d34d96 commit c268b7e

File tree

2 files changed

+62
-22
lines changed

2 files changed

+62
-22
lines changed

cmd/server/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func main() {
6969
r.HandleFunc("/eventstreams/differentdataschemas-flat", eventstreams.HandleEventStreamDifferentDataSchemasFlatten).Methods(http.MethodPost)
7070
r.HandleFunc("/eventstreams/stayopen", eventstreams.HandleEventStreamStayOpen).Methods(http.MethodPost)
7171
r.HandleFunc("/eventstreams/partial-with-comments", eventstreams.HandleEventStreamPartialWithComments).Methods(http.MethodPost)
72+
r.HandleFunc("/eventstreams/optionaldata", eventstreams.HandleEventStreamOptionalData).Methods(http.MethodPost)
7273
r.HandleFunc("/jsonl", jsonLines.HandleJSONLinesRich).Methods(http.MethodGet)
7374
r.HandleFunc("/jsonl/deserialization_verification", jsonLines.HandleJsonLinesDeserializationVerification).Methods(http.MethodGet)
7475
r.HandleFunc("/jsonl/chunks", jsonLines.HandleJSONLinesChunksRich).Methods(http.MethodGet)

internal/eventstreams/service.go

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func HandleEventStreamStayOpen(rw http.ResponseWriter, r *http.Request) {
237237
fmt.Fprintln(rw, "")
238238
fmt.Fprintln(rw, "data: event 3")
239239
fmt.Fprintln(rw, "")
240-
240+
241241
if f, ok := rw.(http.Flusher); ok {
242242
f.Flush()
243243
}
@@ -246,7 +246,7 @@ func HandleEventStreamStayOpen(rw http.ResponseWriter, r *http.Request) {
246246
time.Sleep(100 * time.Millisecond)
247247
fmt.Fprintln(rw, "data: event 4")
248248
fmt.Fprintln(rw, "")
249-
249+
250250
if f, ok := rw.(http.Flusher); ok {
251251
f.Flush()
252252
}
@@ -255,7 +255,7 @@ func HandleEventStreamStayOpen(rw http.ResponseWriter, r *http.Request) {
255255
time.Sleep(100 * time.Millisecond)
256256
fmt.Fprintln(rw, "data: [SENTINEL]")
257257
fmt.Fprintln(rw, "")
258-
258+
259259
if f, ok := rw.(http.Flusher); ok {
260260
f.Flush()
261261
}
@@ -271,35 +271,35 @@ func HandleEventStreamPartialWithComments(rw http.ResponseWriter, _ *http.Reques
271271
// Send the first packet with a partial message and a comment
272272
fmt.Fprint(rw, ": This is a comment\n")
273273
fmt.Fprint(rw, "data: {\"message\": \"Hello ")
274-
274+
275275
if f, ok := rw.(http.Flusher); ok {
276276
f.Flush()
277277
}
278-
278+
279279
time.Sleep(100 * time.Millisecond)
280-
280+
281281
// Complete the first message with LF,LF boundary and add another comment
282282
fmt.Fprint(rw, "from SSE\"}\n\n")
283283
fmt.Fprint(rw, ": Another comment line\n")
284-
284+
285285
if f, ok := rw.(http.Flusher); ok {
286286
f.Flush()
287287
}
288-
288+
289289
time.Sleep(100 * time.Millisecond)
290-
290+
291291
// Send a complete event with CR,CR boundary
292292
fmt.Fprint(rw, "id: msg-2\n")
293293
fmt.Fprint(rw, "event: update\n")
294294
fmt.Fprint(rw, ": Comment before data\n")
295295
fmt.Fprint(rw, "data: {\"status\": \"processing\", \"progress\": 50}\r\r")
296-
296+
297297
if f, ok := rw.(http.Flusher); ok {
298298
f.Flush()
299299
}
300-
300+
301301
time.Sleep(100 * time.Millisecond)
302-
302+
303303
// Send with CR,LF,CR,LF boundary
304304
fmt.Fprint(rw, ": This is a multiline\r\n")
305305
fmt.Fprint(rw, ": comment that spans\r\n")
@@ -308,41 +308,80 @@ func HandleEventStreamPartialWithComments(rw http.ResponseWriter, _ *http.Reques
308308
fmt.Fprint(rw, "data: {\"status\": \"complete\",\r\n")
309309
fmt.Fprint(rw, "data: \"progress\": 100,\r\n")
310310
fmt.Fprint(rw, "data: \"result\": \"Success\"}\r\n\r\n")
311-
311+
312312
if f, ok := rw.(http.Flusher); ok {
313313
f.Flush()
314314
}
315-
315+
316316
time.Sleep(100 * time.Millisecond)
317-
317+
318318
// Mix boundaries within same message group - CR for lines, LF,LF for message end
319319
fmt.Fprint(rw, ": Mixed line endings\r")
320320
fmt.Fprint(rw, "event: mixed\n")
321321
fmt.Fprint(rw, "id: msg-4\r")
322322
fmt.Fprint(rw, "data: {\"test\": \"mixed boundaries\"}\n\n")
323-
323+
324324
if f, ok := rw.(http.Flusher); ok {
325325
f.Flush()
326326
}
327-
327+
328328
time.Sleep(100 * time.Millisecond)
329-
329+
330330
// Another variant with CR,CR ending
331331
fmt.Fprint(rw, "data: {\"another\": \"test\"}\r")
332332
fmt.Fprint(rw, ": Comment with CR\r")
333333
fmt.Fprint(rw, "id: msg-5\r\r")
334-
334+
335335
if f, ok := rw.(http.Flusher); ok {
336336
f.Flush()
337337
}
338-
338+
339339
time.Sleep(100 * time.Millisecond)
340-
340+
341341
// Send a final comment and done signal with standard LF,LF
342342
fmt.Fprint(rw, ": Stream ending\n")
343343
fmt.Fprint(rw, "data: [DONE]\n\n")
344-
344+
345345
if f, ok := rw.(http.Flusher); ok {
346346
f.Flush()
347347
}
348348
}
349+
350+
func HandleEventStreamOptionalData(rw http.ResponseWriter, _ *http.Request) {
351+
rw.Header().Add("Content-Type", "text/event-stream")
352+
353+
pushEvents(rw, [][]string{
354+
{
355+
// Event with data field present
356+
`event: message`,
357+
`data: {"content": "Hello, this event has data"}`,
358+
`id: event-1`,
359+
},
360+
361+
{
362+
// Event without data field (data is optional)
363+
`event: heartbeat`,
364+
`id: event-2`,
365+
},
366+
367+
{
368+
// Event with data field present
369+
`event: message`,
370+
`data: {"content": "Another message with data"}`,
371+
`id: event-3`,
372+
},
373+
374+
{
375+
// Event without data field (data is optional)
376+
`event: ping`,
377+
`id: event-4`,
378+
},
379+
380+
{
381+
// Final event with data
382+
`event: complete`,
383+
`data: {"content": "Stream finished"}`,
384+
`id: event-5`,
385+
},
386+
})
387+
}

0 commit comments

Comments
 (0)