Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- summary: |
Expose SSE event `id` and `retry` metadata in the TypeScript SDK's `Stream` class.
Users can now access `stream.lastEventId` during iteration, or use `stream.events()`
to iterate over `Stream.ServerSentEvent<T>` objects containing `data`, `event`,
`eventId`, and `retry` fields. This achieves parity with the Python SDK's
`ServerSentEvent` dataclass.
type: feat
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,22 @@ export declare namespace Stream {
streamTerminator?: string;
eventDiscriminator?: string;
}

/**
* Represents a parsed Server-Sent Event with metadata.
*/
interface ServerSentEvent<T> {
data: T;
event?: string;
eventId?: string;
retry?: number;
}
}

const DATA_PREFIX = "data:";
const EVENT_PREFIX = "event:";
const ID_PREFIX = "id:";
const RETRY_PREFIX = "retry:";

export class Stream<T> implements AsyncIterable<T> {
<% if (streamType === "wrapper") { %>
Expand All @@ -57,6 +69,7 @@ export class Stream<T> implements AsyncIterable<T> {
private eventDiscriminator: string | undefined;
private controller: AbortController = new AbortController();
private decoder: TextDecoder | undefined;
private _lastEventId: string | undefined;

constructor({ stream, parse, eventShape, signal }: Stream.Args & { parse: (val: unknown) => Promise<T> }) {
this.stream = stream;
Expand All @@ -77,18 +90,41 @@ export class Stream<T> implements AsyncIterable<T> {
}
}

/**
* The ID of the last SSE event received, per the SSE spec.
* Persists across events and is updated when an `id:` field is parsed.
*/
get lastEventId(): string | undefined {
return this._lastEventId;
}

/**
* Iterates over full SSE events including metadata (event, id, retry).
* This is the SSE-aware counterpart of the default async iteration which only yields parsed data.
*/
async *events(): AsyncGenerator<Stream.ServerSentEvent<T>, void> {
yield* this.iterFullEvents();
}

private async *iterMessages(): AsyncGenerator<T, void> {
for await (const event of this.iterFullEvents()) {
yield event.data;
}
}

private async *iterFullEvents(): AsyncGenerator<Stream.ServerSentEvent<T>, void> {
if (this.eventDiscriminator != null) {
yield* this.iterSseEvents();
} else {
yield* this.iterDataMessages();
}
}

private async *iterDataMessages(): AsyncGenerator<T, void> {
private async *iterDataMessages(): AsyncGenerator<Stream.ServerSentEvent<T>, void> {
const stream = readableStreamAsyncIterable<any>(this.stream);
let buf = "";
let prefixSeen = false;
let retryValue: number | undefined;
for await (const chunk of stream) {
buf += this.decodeChunk(chunk);

Expand All @@ -101,6 +137,22 @@ export class Stream<T> implements AsyncIterable<T> {
continue;
}

if (this.prefix != null && line.startsWith(ID_PREFIX)) {
const val = line.slice(ID_PREFIX.length).trim();
if (!val.includes("\0")) {
this._lastEventId = val;
}
continue;
}

if (this.prefix != null && line.startsWith(RETRY_PREFIX)) {
const val = parseInt(line.slice(RETRY_PREFIX.length).trim(), 10);
if (!isNaN(val)) {
retryValue = val;
}
continue;
}

if (!prefixSeen && this.prefix != null) {
const prefixIndex = line.indexOf(this.prefix);
if (prefixIndex === -1) {
Expand All @@ -114,17 +166,23 @@ export class Stream<T> implements AsyncIterable<T> {
return;
}
const message = await this.parse(fromJson(line));
yield message;
yield {
data: message,
eventId: this._lastEventId,
retry: retryValue,
};
prefixSeen = false;
Comment on lines 157 to 161

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 event: field is not tracked in the non-discriminated SSE path

In iterDataMessages, event: lines are not explicitly recognized — they fall through to the data-prefix check at line 151-158 and get silently skipped (since event: ... doesn't contain data:). The yielded ServerSentEvent<T> from this path never includes the event field. This is architecturally consistent with how iterDataMessages works (line-by-line without event-block semantics), but it means events() on a non-discriminated SSE stream provides less metadata than the discriminated path. The event: line association requires event-block boundaries (blank-line separation) to work correctly, so fixing this would require refactoring iterDataMessages to understand event blocks, effectively merging it with iterSseEvents. This is related to but distinct from the reported retry bug.

(Refers to lines 123-168)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — this is an inherent architectural limitation of iterDataMessages. This path processes data: lines independently (no blank-line event block accumulation), so there's no way to associate an event: line with a specific data: line without refactoring to full event-block parsing. Since non-discriminated SSE doesn't use the event: field for dispatch (that's what iterSseEvents with eventDiscriminator is for), this is by design. Users who need event: metadata should configure eventDiscriminator in their API definition, which routes to the full-featured iterSseEvents path.

retryValue = undefined;
}
}
}

private async *iterSseEvents(): AsyncGenerator<T, void> {
private async *iterSseEvents(): AsyncGenerator<Stream.ServerSentEvent<T>, void> {
const stream = readableStreamAsyncIterable<any>(this.stream);
let buf = "";
let eventType: string | undefined;
let dataValue: string | undefined;
let retryValue: number | undefined;

for await (const chunk of stream) {
buf += this.decodeChunk(chunk);
Expand All @@ -136,14 +194,20 @@ export class Stream<T> implements AsyncIterable<T> {

if (!line.trim()) {
if (dataValue != null) {
const message = await this.dispatchSseEvent(dataValue, eventType);
if (message == null) {
const parsed = await this.dispatchSseEvent(dataValue, eventType);
if (parsed == null) {
return;
}
yield message;
yield {
data: parsed,
event: eventType,
eventId: this._lastEventId,
retry: retryValue,
};
}
eventType = undefined;
dataValue = undefined;
retryValue = undefined;
continue;
}

Expand All @@ -152,14 +216,29 @@ export class Stream<T> implements AsyncIterable<T> {
} else if (line.startsWith(DATA_PREFIX)) {
const val = line.slice(DATA_PREFIX.length).trim();
dataValue = dataValue != null ? `${dataValue}\n${val}` : val;
} else if (line.startsWith(ID_PREFIX)) {
const val = line.slice(ID_PREFIX.length).trim();
if (!val.includes("\0")) {
this._lastEventId = val;
}
} else if (line.startsWith(RETRY_PREFIX)) {
const val = parseInt(line.slice(RETRY_PREFIX.length).trim(), 10);
if (!isNaN(val)) {
retryValue = val;
}
}
}
}

if (dataValue != null) {
const message = await this.dispatchSseEvent(dataValue, eventType);
if (message != null) {
yield message;
const parsed = await this.dispatchSseEvent(dataValue, eventType);
if (parsed != null) {
yield {
data: parsed,
event: eventType,
eventId: this._lastEventId,
retry: retryValue,
};
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,61 @@ describe("Stream", () => {

expect(messages).toEqual([{ value: 1 }, { value: 2 }]);
});

it("should track lastEventId from id: fields", async () => {
const mockStream = createReadableStream([
'id: evt-1\ndata: {"value": 1}\nid: evt-2\ndata: {"value": 2}\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val as { value: number },
eventShape: { type: "sse" },
});

for await (const _message of stream) {
// consume
}

expect(stream.lastEventId).toBe("evt-2");
});

it("should ignore id: fields containing null characters", async () => {
const mockStream = createReadableStream([
'id: valid-1\ndata: {"value": 1}\nid: bad\0id\ndata: {"value": 2}\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val as { value: number },
eventShape: { type: "sse" },
});

for await (const _message of stream) {
// consume
}

expect(stream.lastEventId).toBe("valid-1");
});

it("should expose events() with SSE metadata for non-discriminated SSE", async () => {
const mockStream = createReadableStream([
'id: evt-1\nretry: 5000\ndata: {"value": 1}\nid: evt-2\ndata: {"value": 2}\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val as { value: number },
eventShape: { type: "sse" },
});

const events: unknown[] = [];
for await (const event of stream.events()) {
events.push(event);
}

expect(events).toEqual([
{ data: { value: 1 }, eventId: "evt-1", retry: 5000 },
{ data: { value: 2 }, eventId: "evt-2", retry: undefined },
]);
});
});

describe("SSE event-level discrimination (inject discriminator)", () => {
Expand Down Expand Up @@ -385,6 +440,88 @@ describe("Stream", () => {

expect(messages).toEqual([{ type: "", content: "hello" }]);
});

it("should parse id: and retry: fields and expose via events()", async () => {
const mockStream = createReadableStream([
'event: completion\nid: evt-1\nretry: 3000\ndata: {"content": "hello"}\n\nevent: completion\nid: evt-2\ndata: {"content": "world"}\n\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val,
eventShape: { type: "sse", eventDiscriminator: "type" },
});

const events: unknown[] = [];
for await (const event of stream.events()) {
events.push(event);
}

expect(events).toEqual([
{ data: { type: "completion", content: "hello" }, event: "completion", eventId: "evt-1", retry: 3000 },
{ data: { type: "completion", content: "world" }, event: "completion", eventId: "evt-2", retry: undefined },
]);
expect(stream.lastEventId).toBe("evt-2");
});

it("should persist lastEventId across events per SSE spec", async () => {
const mockStream = createReadableStream([
'event: a\nid: persistent-id\ndata: {"v": 1}\n\nevent: b\ndata: {"v": 2}\n\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val,
eventShape: { type: "sse", eventDiscriminator: "type" },
});

const events: unknown[] = [];
for await (const event of stream.events()) {
events.push(event);
}

expect(events).toEqual([
{ data: { type: "a", v: 1 }, event: "a", eventId: "persistent-id", retry: undefined },
{ data: { type: "b", v: 2 }, event: "b", eventId: "persistent-id", retry: undefined },
]);
});

it("should ignore invalid retry values", async () => {
const mockStream = createReadableStream([
'event: a\nretry: not-a-number\ndata: {"v": 1}\n\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val,
eventShape: { type: "sse", eventDiscriminator: "type" },
});

const events: unknown[] = [];
for await (const event of stream.events()) {
events.push(event);
}

expect(events).toEqual([
{ data: { type: "a", v: 1 }, event: "a", eventId: undefined, retry: undefined },
]);
});

it("should still yield data-only from default iteration after adding metadata", async () => {
const mockStream = createReadableStream([
'event: completion\nid: evt-1\nretry: 1000\ndata: {"content": "hello"}\n\n',
]);
const stream = new Stream({
stream: mockStream,
parse: async (val: unknown) => val,
eventShape: { type: "sse", eventDiscriminator: "type" },
});

const messages: unknown[] = [];
for await (const message of stream) {
messages.push(message);
}

expect(messages).toEqual([{ type: "completion", content: "hello" }]);
expect(stream.lastEventId).toBe("evt-1");
});
});

describe("encoding and decoding", () => {
Expand Down
3 changes: 1 addition & 2 deletions seed/ts-sdk/server-sent-event-examples/.fern/metadata.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading