1- import axios from 'axios ' ;
1+ import { z } from 'zod ' ;
22import { Message , TriggerPayload } from '../types/proPresenter' ;
33
4- const isValidUrl = ( inputUrl : string ) : boolean => {
4+ // Custom error class for stream failures with typed status code
5+ export class StreamError extends Error {
6+ readonly statusCode : number ;
7+ readonly name = 'StreamError' ;
8+
9+ constructor ( message : string , statusCode : number ) {
10+ super ( message ) ;
11+ this . statusCode = statusCode ;
12+ Object . setPrototypeOf ( this , StreamError . prototype ) ;
13+ }
14+ }
15+
16+ // Zod schema for Message validation
17+ const messageIdSchema = z . object ( {
18+ uuid : z . string ( ) ,
19+ index : z . number ( ) ,
20+ name : z . string ( )
21+ } ) ;
22+
23+ const messageObjectSchema = z . object ( {
24+ id : messageIdSchema ,
25+ message : z . string ( ) ,
26+ tokens : z . array ( z . any ( ) ) , // tokens have complex optional structure, validate as array of any
27+ visible_on_network : z . boolean ( )
28+ } ) ;
29+
30+ // Accept either a single message or an array of messages
31+ const messageSchema = z . union ( [
32+ messageObjectSchema ,
33+ z . array ( messageObjectSchema )
34+ ] ) ;
35+
36+ const validateMessage = ( data : unknown ) => {
37+ const result = messageSchema . safeParse ( data ) ;
38+ // Cast to proper Message type since we've validated the structure
39+ return {
40+ success : result . success ,
41+ data : result . success ? ( result . data as Message | Message [ ] ) : undefined ,
42+ error : result . error
43+ } ;
44+ } ;
45+
46+ const isValidUrl = ( inputUrl : string ) : boolean => {
547 try {
648 // Use URL constructor for basic format validation
749 new URL ( inputUrl ) ;
@@ -11,50 +53,216 @@ import { Message, TriggerPayload } from '../types/proPresenter';
1153 }
1254 } ;
1355
14- export const getMessages = async ( url : string ) : Promise < Message [ ] > => {
15- if ( ! isValidUrl ( url ) ) {
16- throw new Error ( 'Invalid URL format' ) ;
56+ // Detect if an error is an AbortError
57+ const isAbortError = ( err : unknown ) : boolean => {
58+ if ( err instanceof DOMException && err . name === 'AbortError' ) {
59+ return true ;
1760 }
18-
19- const resource = `${ url } /v1/messages` ;
20- try {
21- const response = await axios . get < Message [ ] > ( resource ) ;
22- return response . data ;
23- } catch ( error ) {
24- handleApiError ( error ) ;
25- throw new Error ( `Failed to fetch messages from ${ resource } ` ) ;
61+ if ( ( err as { name ?: string } ) ?. name === 'AbortError' ) {
62+ return true ;
2663 }
64+ return false ;
2765} ;
2866
67+ // Stream messages using the chunked endpoint. Returns an AbortController to stop the stream.
68+ // The function returns the controller synchronously and runs the async connection logic in the background.
69+ export const streamMessages = (
70+ url : string ,
71+ onChunk : ( data : Message [ ] | Message ) => void ,
72+ onOpen ?: ( ) => void ,
73+ onClose ?: ( ) => void ,
74+ onError ?: ( err : unknown ) => void ,
75+ ) : AbortController => {
76+ const controller = new AbortController ( ) ;
77+
78+ // Run connection logic asynchronously in the background without awaiting
79+ ( async ( ) => {
80+ // Validate URL synchronously before starting async work
81+ if ( ! isValidUrl ( url ) ) {
82+ // Defer error/close callbacks to after controller is returned to caller
83+ queueMicrotask ( ( ) => {
84+ onError && onError ( new Error ( 'Invalid URL format' ) ) ;
85+ onClose && onClose ( ) ;
86+ } ) ;
87+ return ;
88+ }
89+
90+ const resource = `${ url } /v1/messages?chunked=true` ;
91+
92+ try {
93+ const response = await fetch ( resource , { signal : controller . signal } ) ;
94+ if ( ! response . ok ) {
95+ throw new StreamError ( `Failed to stream messages: ${ response . status } ` , response . status ) ;
96+ }
97+
98+ onOpen && onOpen ( ) ;
99+
100+ const reader = response . body ?. getReader ( ) ;
101+ if ( ! reader ) {
102+ throw new Error ( 'Stream reader not available' ) ;
103+ }
104+
105+ const decoder = new TextDecoder ( ) ;
106+ let buffer = '' ;
107+
108+ const extractNextJson = ( buf : string ) : { json : string ; rest : string } | null => {
109+ // Find first non-whitespace char
110+ const startIdx = buf . search ( / \S / ) ;
111+ if ( startIdx === - 1 ) return null ;
112+ const startChar = buf [ startIdx ] ;
113+ if ( startChar !== '{' && startChar !== '[' ) return null ;
114+
115+ let depth = 0 ;
116+ let inString = false ;
117+ let escape = false ;
118+ for ( let i = startIdx ; i < buf . length ; i ++ ) {
119+ const ch = buf [ i ] ;
120+ if ( inString ) {
121+ if ( escape ) {
122+ escape = false ;
123+ } else if ( ch === '\\' ) {
124+ escape = true ;
125+ } else if ( ch === '"' ) {
126+ inString = false ;
127+ }
128+ continue ;
129+ }
130+
131+ if ( ch === '"' ) {
132+ inString = true ;
133+ continue ;
134+ }
135+
136+ if ( ch === '{' || ch === '[' ) {
137+ depth ++ ;
138+ } else if ( ch === '}' || ch === ']' ) {
139+ depth -- ;
140+ if ( depth === 0 ) {
141+ const json = buf . slice ( startIdx , i + 1 ) ;
142+ const rest = buf . slice ( i + 1 ) ;
143+ return { json, rest } ;
144+ }
145+ }
146+ }
147+
148+ return null ;
149+ } ;
150+
151+ // Helper to process an extracted JSON chunk
152+ const processChunkBuffer = (
153+ extracted : { json : string ; rest : string } | null ,
154+ errorContext : string
155+ ) : { parsedData : Message | Message [ ] | undefined ; rest : string } => {
156+ let parsedData : Message | Message [ ] | undefined ;
157+
158+ if ( ! extracted ) {
159+ return { parsedData : undefined , rest : '' } ;
160+ }
161+
162+ try {
163+ const parsed = JSON . parse ( extracted . json ) ;
164+ const validation = validateMessage ( parsed ) ;
165+ if ( validation . success && validation . data ) {
166+ parsedData = validation . data ;
167+ } else {
168+ console . error ( `Invalid message format in ${ errorContext } ` , validation . error ) ;
169+ }
170+ } catch ( err ) {
171+ console . error ( `Failed to parse ${ errorContext } ` , err ) ;
172+ }
173+
174+ // Call onChunk outside try/catch to handle callback errors separately
175+ if ( parsedData !== undefined ) {
176+ try {
177+ onChunk ( parsedData ) ;
178+ } catch ( err ) {
179+ onError && onError ( err ) ;
180+ }
181+ }
182+
183+ return { parsedData, rest : extracted . rest } ;
184+ } ;
185+
186+ const pump = async ( ) => {
187+ try {
188+ while ( true ) {
189+ const { value, done } = await reader . read ( ) ;
190+ if ( done ) {
191+ // Flush the TextDecoder to get any remaining bytes
192+ buffer += decoder . decode ( ) ;
193+
194+ // Drain the entire buffer using the same while-loop as streaming
195+ let extracted = extractNextJson ( buffer ) ;
196+ while ( extracted ) {
197+ const { rest } = processChunkBuffer ( extracted , 'chunk' ) ;
198+ buffer = rest ;
199+ extracted = extractNextJson ( buffer ) ;
200+ }
201+ onClose && onClose ( ) ;
202+ break ;
203+ }
204+
205+ buffer += decoder . decode ( value , { stream : true } ) ;
206+
207+ let extracted = extractNextJson ( buffer ) ;
208+ while ( extracted ) {
209+ const { rest } = processChunkBuffer ( extracted , 'stream' ) ;
210+ buffer = rest ;
211+ extracted = extractNextJson ( buffer ) ;
212+ }
213+ }
214+ } catch ( err ) {
215+ // Only call onError if this is not an abort error (user/code explicitly cancelled)
216+ if ( ! isAbortError ( err ) ) {
217+ onError && onError ( err ) ;
218+ }
219+ }
220+ } ;
221+
222+ // Start background pump (don't await)
223+ void pump ( ) ;
224+ } catch ( error ) {
225+ // Only call onError if this is not an abort error (user/code explicitly cancelled)
226+ if ( ! isAbortError ( error ) ) {
227+ onError && onError ( error ) ;
228+ }
229+ }
230+ } ) ( ) ;
231+
232+ return controller ;
233+ } ;
29234export const triggerMessage = async ( url : string , id : string , payload : TriggerPayload ) : Promise < void > => {
30235 const resource = `${ url } /v1/message/${ id } /trigger` ;
31236 try {
32- await axios . post ( resource , payload ) ;
237+ const response = await fetch ( resource , {
238+ method : 'POST' ,
239+ headers : { 'Content-Type' : 'application/json' } ,
240+ body : JSON . stringify ( payload )
241+ } ) ;
242+ if ( ! response . ok ) {
243+ throw new StreamError ( `Failed to trigger message: ${ response . status } ` , response . status ) ;
244+ }
33245 } catch ( error ) {
34- handleApiError ( error ) ;
246+ if ( error instanceof StreamError ) {
247+ throw error ;
248+ }
249+ console . error ( 'Error triggering message:' , error ) ;
35250 throw new Error ( `Failed to trigger message at ${ resource } with payload: ${ JSON . stringify ( payload ) } ` ) ;
36251 }
37252} ;
38253
39254export const clearMessage = async ( url : string , id : string ) : Promise < void > => {
40255 const resource = `${ url } /v1/message/${ id } /clear` ;
41256 try {
42- await axios . get ( resource ) ;
257+ const response = await fetch ( resource , { method : 'GET' } ) ;
258+ if ( ! response . ok ) {
259+ throw new StreamError ( `Failed to clear message: ${ response . status } ` , response . status ) ;
260+ }
43261 } catch ( error ) {
44- handleApiError ( error ) ;
45- throw new Error ( `Failed to clear message at ${ resource } ` ) ;
46- }
47- } ;
48-
49- const handleApiError = ( error : unknown ) => {
50- if ( axios . isAxiosError ( error ) ) {
51- // Handle Axios-specific errors
52- console . error ( 'API Error:' , error . message ) ;
53- if ( error . response ) {
54- console . error ( 'Response Data:' , error . response . data ) ;
262+ if ( error instanceof StreamError ) {
263+ throw error ;
55264 }
56- } else {
57- // Handle other types of errors
58- console . error ( 'Unexpected Error:' , error ) ;
265+ console . error ( 'Error clearing message:' , error ) ;
266+ throw new Error ( `Failed to clear message at ${ resource } ` ) ;
59267 }
60268} ;
0 commit comments