11import type { AnyMessage , Stream } from "@agentclientprotocol/sdk" ;
22
3+ const ACP_CONNECTION_HEADER = "Acp-Connection-Id" ;
34const ACP_SESSION_HEADER = "Acp-Session-Id" ;
45
6+ /**
7+ * Creates an ACP Stream that communicates over the Streamable HTTP transport
8+ * defined in RFD 721.
9+ *
10+ * Protocol flow:
11+ * 1. `initialize` → POST (no Acp-Connection-Id), returns per-request SSE
12+ * with `Acp-Connection-Id` in response headers.
13+ * 2. JSON-RPC requests → POST with `Acp-Connection-Id`, returns per-request
14+ * SSE that delivers notifications + the final response, then closes.
15+ * 3. Notifications / client responses → POST with `Acp-Connection-Id`,
16+ * returns 202 Accepted (no body).
17+ * 4. `session/new`, `session/load`, `session/fork` responses carry
18+ * `Acp-Session-Id` in the response headers (informational).
19+ */
520export function createHttpStream ( serverUrl : string ) : Stream {
6- let sessionId : string | null = null ;
21+ let connectionId : string | null = null ;
722 const incoming : AnyMessage [ ] = [ ] ;
823 const waiters : Array < ( ) => void > = [ ] ;
9- const sseAbort = new AbortController ( ) ;
24+ const abortController = new AbortController ( ) ;
1025
1126 function pushMessage ( msg : AnyMessage ) {
1227 incoming . push ( msg ) ;
@@ -19,14 +34,18 @@ export function createHttpStream(serverUrl: string): Stream {
1934 return new Promise < void > ( ( r ) => waiters . push ( r ) ) ;
2035 }
2136
22- async function consumeSSE ( response : Response ) {
37+ async function consumeSSE (
38+ response : Response ,
39+ signal ?: AbortSignal ,
40+ ) : Promise < void > {
2341 if ( ! response . body ) return ;
2442 const reader = response . body . getReader ( ) ;
2543 const decoder = new TextDecoder ( ) ;
2644 let buffer = "" ;
2745
2846 try {
2947 while ( true ) {
48+ if ( signal ?. aborted ) break ;
3049 const { done, value } = await reader . read ( ) ;
3150 if ( done ) break ;
3251 buffer += decoder . decode ( value , { stream : true } ) ;
@@ -36,23 +55,39 @@ export function createHttpStream(serverUrl: string): Stream {
3655
3756 for ( const part of parts ) {
3857 for ( const line of part . split ( "\n" ) ) {
39- if ( line . startsWith ( "data: " ) ) {
40- try {
41- const msg = JSON . parse ( line . slice ( 6 ) ) as AnyMessage ;
42- pushMessage ( msg ) ;
43- } catch {
44- // ignore malformed JSON
58+ if ( line . startsWith ( "data: " ) || line . startsWith ( "data:" ) ) {
59+ const dataStr = line . startsWith ( "data: " )
60+ ? line . slice ( 6 )
61+ : line . slice ( 5 ) ;
62+ if ( dataStr . trim ( ) ) {
63+ try {
64+ pushMessage ( JSON . parse ( dataStr ) as AnyMessage ) ;
65+ } catch {
66+ // ignore malformed JSON
67+ }
4568 }
4669 }
4770 }
4871 }
4972 }
5073 } catch ( e : unknown ) {
5174 if ( e instanceof DOMException && e . name === "AbortError" ) return ;
75+ throw e ;
5276 }
5377 }
5478
55- let isFirstRequest = true ;
79+ function isJsonRpcRequest ( msg : AnyMessage ) : boolean {
80+ return (
81+ "method" in msg &&
82+ "id" in msg &&
83+ msg . id !== undefined &&
84+ msg . id !== null
85+ ) ;
86+ }
87+
88+ function isInitializeRequest ( msg : AnyMessage ) : boolean {
89+ return isJsonRpcRequest ( msg ) && "method" in msg && msg . method === "initialize" ;
90+ }
5691
5792 const readable = new ReadableStream < AnyMessage > ( {
5893 async pull ( controller ) {
@@ -65,54 +100,66 @@ export function createHttpStream(serverUrl: string): Stream {
65100
66101 const writable = new WritableStream < AnyMessage > ( {
67102 async write ( msg ) {
68- const isRequest =
69- "method" in msg &&
70- "id" in msg &&
71- msg . id !== undefined &&
72- msg . id !== null ;
73-
74103 const headers : Record < string , string > = {
75104 "Content-Type" : "application/json" ,
76105 Accept : "application/json, text/event-stream" ,
77106 } ;
78- if ( sessionId ) {
79- headers [ ACP_SESSION_HEADER ] = sessionId ;
107+ if ( connectionId ) {
108+ headers [ ACP_CONNECTION_HEADER ] = connectionId ;
80109 }
81110
82- if ( isFirstRequest && isRequest ) {
83- isFirstRequest = false ;
84-
111+ if ( isInitializeRequest ( msg ) ) {
112+ // Initialize: no Acp-Connection-Id, returns SSE with the header.
85113 const response = await fetch ( `${ serverUrl } /acp` , {
86114 method : "POST" ,
87115 headers,
88116 body : JSON . stringify ( msg ) ,
89- signal : sseAbort . signal ,
117+ signal : abortController . signal ,
90118 } ) ;
91119
92- const sid = response . headers . get ( ACP_SESSION_HEADER ) ;
93- if ( sid ) sessionId = sid ;
120+ const connId = response . headers . get ( ACP_CONNECTION_HEADER ) ;
121+ if ( connId ) connectionId = connId ;
94122
95- consumeSSE ( response ) ;
96- } else if ( isRequest ) {
97- const abort = new AbortController ( ) ;
98- fetch ( `${ serverUrl } /acp` , {
123+ await consumeSSE ( response , abortController . signal ) ;
124+ } else if ( isJsonRpcRequest ( msg ) ) {
125+ // JSON-RPC request: returns a per-request SSE stream.
126+ const response = await fetch ( `${ serverUrl } /acp` , {
99127 method : "POST" ,
100128 headers,
101129 body : JSON . stringify ( msg ) ,
102- signal : abort . signal ,
103- } ) . catch ( ( ) => { } ) ;
104- setTimeout ( ( ) => abort . abort ( ) , 200 ) ;
130+ signal : abortController . signal ,
131+ } ) ;
132+
133+ // session/new, session/load, session/fork may return Acp-Session-Id
134+ const sessionId = response . headers . get ( ACP_SESSION_HEADER ) ;
135+ if ( sessionId ) {
136+ // Informational — the SDK tracks sessionId in the response body.
137+ }
138+
139+ await consumeSSE ( response , abortController . signal ) ;
105140 } else {
141+ // Notification or client response: fire-and-forget, expect 202.
106142 await fetch ( `${ serverUrl } /acp` , {
107143 method : "POST" ,
108144 headers,
109145 body : JSON . stringify ( msg ) ,
146+ signal : abortController . signal ,
110147 } ) ;
111148 }
112149 } ,
113150
114151 close ( ) {
115- sseAbort . abort ( ) ;
152+ // Terminate the connection.
153+ if ( connectionId ) {
154+ const headers : Record < string , string > = {
155+ [ ACP_CONNECTION_HEADER ] : connectionId ,
156+ } ;
157+ fetch ( `${ serverUrl } /acp` , {
158+ method : "DELETE" ,
159+ headers,
160+ } ) . catch ( ( ) => { } ) ;
161+ }
162+ abortController . abort ( ) ;
116163 } ,
117164 } ) ;
118165
0 commit comments