11/// Streaming support for OVSM LISP
22///
3- /// This module provides built-in functions for real-time blockchain event streaming:
4- /// - `(stream-connect url :programs ["pumpfun"] :tokens ["USDC"])` - Connect to streaming server
5- /// - `(stream-poll stream-id)` - Poll for new events (non-blocking)
6- /// - `(stream-wait stream-id timeout)` - Wait for next event (blocking with timeout)
7- /// - `(stream-close stream-id)` - Close streaming connection
3+ /// This module provides built-in functions for real-time blockchain event streaming via WebSocket :
4+ /// - `(stream-connect url :programs ["pumpfun"] :tokens ["USDC"])` - Connect to WebSocket stream
5+ /// - `(stream-poll stream-id :limit 50 )` - Poll buffered events (non-blocking)
6+ /// - `(stream-wait stream-id : timeout 30 )` - Wait for next event (blocking with timeout)
7+ /// - `(stream-close stream-id)` - Close WebSocket connection
88///
99/// Example usage:
1010/// ```lisp
11- /// ;; Connect to Pump.fun event stream
12- /// (define stream (stream-connect "http ://localhost:8080" :programs ["pumpfun"]))
11+ /// ;; Connect to Pump.fun event stream via WebSocket
12+ /// (define stream (stream-connect "ws ://localhost:8080/ws " :programs ["pumpfun"]))
1313///
1414/// ;; Poll for events in a loop
1515/// (while true
16- /// (define events (stream-poll stream))
16+ /// (define events (stream-poll stream :limit 50 ))
1717/// (for (event events)
18- /// (if (= (get event "type") "log_message ")
19- /// (log :message "Transaction :" :value (get event "signature "))
18+ /// (if (= (get event "type") "token_transfer ")
19+ /// (log :message "Transfer :" :value (get event "amount "))
2020/// null)))
2121/// ```
2222
2323use crate :: error:: { Error , Result } ;
2424use crate :: runtime:: Value ;
25+ use futures_util:: { SinkExt , StreamExt } ;
2526use serde_json:: Value as JsonValue ;
2627use std:: collections:: HashMap ;
2728use std:: sync:: { Arc , Mutex } ;
28- use std:: time:: { Duration , SystemTime } ;
29+ use std:: thread;
30+ use std:: time:: Duration ;
31+ use tokio_tungstenite:: { connect_async, tungstenite:: protocol:: Message } ;
2932
3033/// Stream connection handle
3134#[ derive( Clone , Debug ) ]
3235pub struct StreamHandle {
3336 pub id : String ,
3437 pub url : String ,
3538 pub filters : StreamFilters ,
36- pub last_poll : SystemTime ,
3739 pub event_buffer : Arc < Mutex < Vec < JsonValue > > > ,
40+ pub is_connected : Arc < Mutex < bool > > ,
3841}
3942
4043/// Stream filtering options
@@ -61,12 +64,12 @@ fn generate_stream_id() -> String {
6164 format ! ( "stream_{}" , id)
6265}
6366
64- /// Connect to streaming server
67+ /// Connect to WebSocket streaming server
6568///
6669/// Syntax: `(stream-connect url &key programs tokens accounts event-types success-only)`
6770///
6871/// Parameters:
69- /// - `url`: Server URL (e.g., "http ://localhost:8080")
72+ /// - `url`: WebSocket URL (e.g., "ws ://localhost:8080/ws ")
7073/// - `:programs` (optional): Array of program aliases or IDs
7174/// - `:tokens` (optional): Array of token symbols or mint addresses
7275/// - `:accounts` (optional): Array of account addresses
@@ -135,23 +138,100 @@ pub fn stream_connect(args: &[Value]) -> Result<Value> {
135138
136139 // Create stream handle
137140 let stream_id = generate_stream_id ( ) ;
141+ let event_buffer = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
142+ let is_connected = Arc :: new ( Mutex :: new ( true ) ) ;
143+
138144 let handle = StreamHandle {
139145 id : stream_id. clone ( ) ,
140146 url : url. clone ( ) ,
141- filters,
142- last_poll : SystemTime :: now ( ) ,
143- event_buffer : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
147+ filters : filters . clone ( ) ,
148+ event_buffer : event_buffer . clone ( ) ,
149+ is_connected : is_connected . clone ( ) ,
144150 } ;
145151
146152 // Register stream
147153 {
148154 let mut registry = STREAM_REGISTRY . lock ( ) . unwrap ( ) ;
149- registry. insert ( stream_id. clone ( ) , handle) ;
155+ registry. insert ( stream_id. clone ( ) , handle. clone ( ) ) ;
150156 }
151157
158+ // Start WebSocket connection in background thread
159+ let url_clone = url. clone ( ) ;
160+ let buffer_clone = event_buffer. clone ( ) ;
161+ let connected_clone = is_connected. clone ( ) ;
162+ let filters_clone = filters. clone ( ) ;
163+
164+ thread:: spawn ( move || {
165+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
166+ rt. block_on ( async move {
167+ if let Err ( e) = websocket_client_loop (
168+ & url_clone,
169+ buffer_clone,
170+ connected_clone,
171+ filters_clone,
172+ )
173+ . await
174+ {
175+ eprintln ! ( "WebSocket error: {}" , e) ;
176+ }
177+ } ) ;
178+ } ) ;
179+
180+ // Wait a bit for connection to establish
181+ thread:: sleep ( Duration :: from_millis ( 500 ) ) ;
182+
152183 Ok ( Value :: String ( stream_id) )
153184}
154185
186+ /// WebSocket client loop (runs in background)
187+ async fn websocket_client_loop (
188+ url : & str ,
189+ event_buffer : Arc < Mutex < Vec < JsonValue > > > ,
190+ is_connected : Arc < Mutex < bool > > ,
191+ filters : StreamFilters ,
192+ ) -> Result < ( ) > {
193+ let ( ws_stream, _) = connect_async ( url)
194+ . await
195+ . map_err ( |e| Error :: runtime ( format ! ( "WebSocket connection failed: {}" , e) ) ) ?;
196+
197+ let ( _write, mut read) = ws_stream. split ( ) ;
198+
199+ while let Some ( message) = read. next ( ) . await {
200+ match message {
201+ Ok ( Message :: Text ( text) ) => {
202+ // Parse JSON event
203+ if let Ok ( json_value) = serde_json:: from_str :: < JsonValue > ( & text) {
204+ // Apply filters
205+ if filter_event ( & json_value, & filters) {
206+ // Add to buffer
207+ let mut buffer = event_buffer. lock ( ) . unwrap ( ) ;
208+ buffer. push ( json_value) ;
209+
210+ // Limit buffer size to prevent memory issues
211+ if buffer. len ( ) > 10000 {
212+ buffer. drain ( 0 ..5000 ) ; // Remove oldest 5000 events
213+ }
214+ }
215+ }
216+ }
217+ Ok ( Message :: Close ( _) ) => {
218+ let mut connected = is_connected. lock ( ) . unwrap ( ) ;
219+ * connected = false ;
220+ break ;
221+ }
222+ Err ( e) => {
223+ eprintln ! ( "WebSocket read error: {}" , e) ;
224+ let mut connected = is_connected. lock ( ) . unwrap ( ) ;
225+ * connected = false ;
226+ break ;
227+ }
228+ _ => { }
229+ }
230+ }
231+
232+ Ok ( ( ) )
233+ }
234+
155235/// Poll for new events (non-blocking)
156236///
157237/// Syntax: `(stream-poll stream-id &key limit)`
@@ -194,8 +274,20 @@ pub fn stream_poll(args: &[Value]) -> Result<Value> {
194274 . ok_or_else ( || Error :: runtime ( format ! ( "stream-poll: stream not found: {}" , stream_id) ) ) ?
195275 } ;
196276
197- // Poll HTTP endpoint
198- let events = poll_events_sync ( & handle, limit) ?;
277+ // Check if still connected
278+ {
279+ let connected = handle. is_connected . lock ( ) . unwrap ( ) ;
280+ if !* connected {
281+ return Err ( Error :: runtime ( "stream-poll: WebSocket connection closed" . to_string ( ) ) ) ;
282+ }
283+ }
284+
285+ // Drain events from buffer
286+ let events = {
287+ let mut buffer = handle. event_buffer . lock ( ) . unwrap ( ) ;
288+ let drain_count = buffer. len ( ) . min ( limit) ;
289+ buffer. drain ( 0 ..drain_count) . collect :: < Vec < _ > > ( )
290+ } ;
199291
200292 // Convert events to OVSM Value array
201293 let event_values: Vec < Value > = events
@@ -248,23 +340,34 @@ pub fn stream_wait(args: &[Value]) -> Result<Value> {
248340 . ok_or_else ( || Error :: runtime ( format ! ( "stream-wait: stream not found: {}" , stream_id) ) ) ?
249341 } ;
250342
251- // Poll with retries until event or timeout
252- let start = SystemTime :: now ( ) ;
343+ // Wait for event with timeout
344+ let start = std :: time :: Instant :: now ( ) ;
253345 let timeout_duration = Duration :: from_secs ( timeout_secs) ;
254346
255- loop {
256- let events = poll_events_sync ( & handle, 1 ) ?;
257- if !events. is_empty ( ) {
258- return Ok ( json_to_value ( & events[ 0 ] ) ) ;
347+ while start. elapsed ( ) < timeout_duration {
348+ // Check buffer
349+ {
350+ let mut buffer = handle. event_buffer . lock ( ) . unwrap ( ) ;
351+ if !buffer. is_empty ( ) {
352+ let event = buffer. remove ( 0 ) ;
353+ return Ok ( json_to_value ( & event) ) ;
354+ }
259355 }
260356
261- if start. elapsed ( ) . unwrap_or ( Duration :: ZERO ) >= timeout_duration {
262- return Ok ( Value :: Null ) ;
357+ // Check if still connected
358+ {
359+ let connected = handle. is_connected . lock ( ) . unwrap ( ) ;
360+ if !* connected {
361+ return Err ( Error :: runtime ( "stream-wait: WebSocket connection closed" . to_string ( ) ) ) ;
362+ }
263363 }
264364
265- // Sleep for 100ms before retry
266- std :: thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
365+ // Sleep briefly before checking again
366+ thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
267367 }
368+
369+ // Timeout - return null
370+ Ok ( Value :: Null )
268371}
269372
270373/// Close streaming connection
@@ -274,7 +377,7 @@ pub fn stream_wait(args: &[Value]) -> Result<Value> {
274377/// Parameters:
275378/// - `stream-id`: Stream ID returned from stream-connect
276379///
277- /// Returns: true on success
380+ /// Returns: Boolean indicating success
278381pub fn stream_close ( args : & [ Value ] ) -> Result < Value > {
279382 if args. is_empty ( ) {
280383 return Err ( Error :: runtime ( "stream-close requires stream-id argument" . to_string ( ) ) ) ;
@@ -286,10 +389,12 @@ pub fn stream_close(args: &[Value]) -> Result<Value> {
286389 } ;
287390
288391 // Remove from registry
289- let mut registry = STREAM_REGISTRY . lock ( ) . unwrap ( ) ;
290- registry. remove ( & stream_id) ;
392+ let removed = {
393+ let mut registry = STREAM_REGISTRY . lock ( ) . unwrap ( ) ;
394+ registry. remove ( & stream_id) . is_some ( )
395+ } ;
291396
292- Ok ( Value :: Bool ( true ) )
397+ Ok ( Value :: Bool ( removed ) )
293398}
294399
295400/// Helper: Extract string array from Value
@@ -315,43 +420,6 @@ fn extract_string_array(value: &Value) -> Result<Vec<String>> {
315420 }
316421}
317422
318- /// Helper: Poll events from HTTP endpoint (synchronous)
319- fn poll_events_sync ( handle : & StreamHandle , limit : usize ) -> Result < Vec < JsonValue > > {
320- // Use blocking reqwest client to avoid nested runtime issues
321- let client = reqwest:: blocking:: Client :: builder ( )
322- . timeout ( Duration :: from_secs ( 10 ) )
323- . build ( )
324- . map_err ( |e| Error :: runtime ( format ! ( "Failed to create HTTP client: {}" , e) ) ) ?;
325-
326- let url = format ! ( "{}/events?limit={}" , handle. url, limit) ;
327-
328- let response = client
329- . get ( & url)
330- . send ( )
331- . map_err ( |e| Error :: runtime ( format ! ( "HTTP request failed: {}" , e) ) ) ?;
332-
333- let events: Vec < JsonValue > = response
334- . json ( )
335- . map_err ( |e| Error :: runtime ( format ! ( "Failed to parse JSON response: {}" , e) ) ) ?;
336-
337- // Apply filters if any
338- let filtered = if handle. filters . programs . is_empty ( )
339- && handle. filters . tokens . is_empty ( )
340- && handle. filters . accounts . is_empty ( )
341- && handle. filters . event_types . is_empty ( )
342- && !handle. filters . success_only
343- {
344- events
345- } else {
346- events
347- . into_iter ( )
348- . filter ( |event| filter_event ( event, & handle. filters ) )
349- . collect ( )
350- } ;
351-
352- Ok ( filtered)
353- }
354-
355423/// Helper: Filter event based on StreamFilters
356424fn filter_event ( event : & JsonValue , filters : & StreamFilters ) -> bool {
357425 // Filter by event type
@@ -374,8 +442,8 @@ fn filter_event(event: &JsonValue, filters: &StreamFilters) -> bool {
374442 }
375443 }
376444
377- // Note: Program/token/account filtering is done server-side via connection parameters
378- // These filters are redundant but kept for client-side double-checking
445+ // Note: Program/token/account filtering should be done server-side
446+ // These filters are for client-side double-checking if needed
379447
380448 true
381449}
0 commit comments