Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dev w serve #1991

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 211 additions & 18 deletions js/sdk/src/v3/clients/retrieval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ import {
} from "../../types";
import { ensureSnakeCase } from "../../utils";

function parseSseEvent(raw: { event: string; data: string }) {
// Some SSE servers send a "done" event at the end:
if (raw.event === "done") return null;

try {
const parsedJson = JSON.parse(raw.data);
return {
event: raw.event,
data: parsedJson,
};
} catch (err) {
console.error("Failed to parse SSE line:", raw.data, err);
return null;
}
}

export class RetrievalClient {
constructor(private client: r2rClient) {}

Expand Down Expand Up @@ -96,22 +112,149 @@ export class RetrievalClient {
}
}

private async streamRag(
private async *streamRag(
ragData: Record<string, any>,
): Promise<ReadableStream<Uint8Array>> {
return this.client.makeRequest<ReadableStream<Uint8Array>>(
"POST",
"retrieval/rag",
{
data: ragData,
headers: {
"Content-Type": "application/json",
): AsyncGenerator<any, void, unknown> {
// 1) Make the streaming request with responseType: "stream"
const responseStream =
await this.client.makeRequest<ReadableStream<Uint8Array>>(
"POST",
"retrieval/rag",
{
data: ragData,
headers: { "Content-Type": "application/json" },
responseType: "stream", // triggers streaming code in BaseClient
},
responseType: "stream",
},
);
);

if (!responseStream) {
throw new Error("No response stream received");
}

const reader = responseStream.getReader();
const textDecoder = new TextDecoder("utf-8");

let buffer = "";
let currentEventType = "unknown";

while (true) {
// 2) Read the next chunk
const { value, done } = await reader.read();
if (done) {
break; // end of the stream
}
// 3) Decode from bytes to text
const chunkStr = textDecoder.decode(value, { stream: true });
// 4) Append to our buffer (which might already have a partial line)
buffer += chunkStr;

// 5) Split by newline
const lines = buffer.split("\n");

// Keep the last partial line in `buffer`
buffer = lines.pop() || "";

// 6) Process each complete line
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(":")) {
// SSE "heartbeat" or empty line
continue;
}
if (trimmed.startsWith("event:")) {
// e.g. event: final_answer
currentEventType = trimmed.slice("event:".length).trim();
} else if (trimmed.startsWith("data:")) {
// e.g. data: {"generated_answer":"DeepSeek R1 ..."}
const dataStr = trimmed.slice("data:".length).trim();
const parsedEvent = parseSseEvent({ event: currentEventType, data: dataStr });
if (parsedEvent !== null) {
yield parsedEvent;
}
}
}
}

// End of stream, if there's leftover in buffer, handle if needed
}

// // In retrieval.ts:
// private async *streamRag(
// ragData: Record<string, any>,
// ): AsyncGenerator<any, void, unknown> {
// // 1) Make the streaming request -> returns a browser ReadableStream<Uint8Array>
// const responseStream =
// await this.client.makeRequest<ReadableStream<Uint8Array>>(
// "POST",
// "retrieval/rag",
// {
// data: ragData,
// headers: { "Content-Type": "application/json" },
// responseType: "stream",
// },
// );

// if (!responseStream) {
// throw new Error("No response stream received");
// }

// // 2) Get a reader from the stream
// const reader = responseStream.getReader();
// const textDecoder = new TextDecoder("utf-8");

// let buffer = "";
// let currentEventType = "unknown";

// // 3) Read chunks until done
// while (true) {
// const { value, done } = await reader.read();
// if (done) {
// break;
// }
// // Decode the chunk into a string
// const chunkStr = textDecoder.decode(value, { stream: true });
// buffer += chunkStr;

// // 4) Split on newlines
// const lines = buffer.split("\n");
// buffer = lines.pop() || ""; // keep the partial line in the buffer

// for (const line of lines) {
// const trimmed = line.trim();
// if (!trimmed || trimmed.startsWith(":")) {
// // SSE heartbeats or blank lines
// continue;
// }
// if (trimmed.startsWith("event:")) {
// currentEventType = trimmed.slice("event:".length).trim();
// } else if (trimmed.startsWith("data:")) {
// const dataStr = trimmed.slice("data:".length).trim();
// // Attempt to parse the SSE event
// const eventObj = parseSseEvent({ event: currentEventType, data: dataStr });
// if (eventObj != null) {
// yield eventObj;
// }
// }
// }
// }
// }

// private async streamRag(
// ragData: Record<string, any>,
// ): Promise<ReadableStream<Uint8Array>> {
// return this.client.makeRequest<ReadableStream<Uint8Array>>(
// "POST",
// "retrieval/rag",
// {
// data: ragData,
// headers: {
// "Content-Type": "application/json",
// },
// responseType: "stream",
// },
// );
// }

/**
* Engage with an intelligent RAG-powered conversational agent for complex
* information retrieval and analysis.
Expand Down Expand Up @@ -214,22 +357,72 @@ export class RetrievalClient {
}
}

private async streamAgent(
private async *streamAgent(
agentData: Record<string, any>,
): Promise<ReadableStream<Uint8Array>> {
return this.client.makeRequest<ReadableStream<Uint8Array>>(
): AsyncGenerator<any, void, unknown> {
// 1) Make a streaming request to your "retrieval/agent" endpoint
// We'll get back a browser `ReadableStream<Uint8Array>` or a Node stream (depending on environment).
const responseStream = await this.client.makeRequest<ReadableStream<Uint8Array>>(
"POST",
"retrieval/agent",
{
data: agentData,
headers: {
"Content-Type": "application/json",
},
headers: { "Content-Type": "application/json" },
responseType: "stream",
},
);

if (!responseStream) {
throw new Error("No response stream received from agent endpoint");
}

// 2) Prepare to read the SSE stream line-by-line
const reader = responseStream.getReader();
const textDecoder = new TextDecoder("utf-8");

let buffer = "";
let currentEventType = "unknown";

// 3) Read chunks until the stream closes
while (true) {
const { value, done } = await reader.read();
if (done) {
break; // end of stream
}
// Convert bytes to text
const chunkStr = textDecoder.decode(value, { stream: true });
buffer += chunkStr;

// SSE messages are separated by newlines
const lines = buffer.split("\n");
// The last element might be a partial line, so re-buffer it
buffer = lines.pop() || "";

for (const line of lines) {
const trimmed = line.trim();
// Ignore empty lines or lines starting with ":"
if (!trimmed || trimmed.startsWith(":")) {
continue;
}
if (trimmed.startsWith("event:")) {
// e.g. "event: message"
currentEventType = trimmed.slice("event:".length).trim();
} else if (trimmed.startsWith("data:")) {
// e.g. "data: {...}"
const dataStr = trimmed.slice("data:".length).trim();
const parsed = parseSseEvent({ event: currentEventType, data: dataStr });
if (parsed !== null) {
yield parsed;
}
}
}
}

// If anything remains in `buffer`, handle it if needed.
// In most SSE flows, we expect the final chunk to end with a newline.
}


/**
* Generate completions for a list of messages.
*
Expand Down
2 changes: 2 additions & 0 deletions py/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@


__all__ = [
"CitationEvent",
"Citation",
"R2RAgent",
"R2RStreamingAgent",
"SearchResultsCollector",
Expand Down
Loading
Loading