Skip to content

Commit ab3c59d

Browse files
0xrinegadeclaude
andcommitted
feat(ovsm): Add osvm-stream function (foundation)
Add osvm-stream built-in function for standalone OVSM scripts. Goal: Eliminate need for manual server setup Before: osvm stream --programs pumpfun (separate terminal) After: (define stream (osvm-stream :alias "pumpfun")) Implementation (foundation): - Added eval_osvm_stream to lisp_evaluator - Added osvm_stream function to streaming module - Parses :alias, :programs, :tokens, :accounts keywords - Finds available port (18080-18180 range) - Spawns background thread for embedded server - Auto-connects via WebSocket - Returns stream ID for stream-poll Current status: PLACEHOLDER - spawn_internal_server prints debug info - TODO: Actually embed StreamService in background thread - TODO: Proper server lifecycle management Next steps: 1. Refactor StreamService to be embeddable 2. Pass RPC URL configuration 3. Graceful shutdown when script ends 4. Error handling for server failures Usage (when complete): ```lisp ;; One-line setup - no external server needed! (define stream (osvm-stream :alias "pumpfun")) (while (< (- (now) start-time) 60) (define events (stream-poll stream :limit 50)) ...) ``` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 429b45d commit ab3c59d

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed

crates/ovsm/src/runtime/lisp_evaluator.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ impl LispEvaluator {
369369
"stream-poll" => self.eval_stream_poll(args),
370370
"stream-wait" => self.eval_stream_wait(args),
371371
"stream-close" => self.eval_stream_close(args),
372+
"osvm-stream" => self.eval_osvm_stream(args),
372373
// LINQ-style functional operations
373374
"compact" => self.eval_compact(args),
374375
"count-by" => self.eval_count_by(args),
@@ -9107,6 +9108,20 @@ impl LispEvaluator {
91079108
// Call the streaming function with evaluated arguments
91089109
crate::runtime::streaming::stream_close(&evaluated_args)
91099110
}
9111+
9112+
/// (osvm-stream &key alias programs tokens) - Spawn internal stream server and connect
9113+
/// This is a convenience function that combines server spawning + stream-connect
9114+
/// The server automatically terminates when the script ends
9115+
fn eval_osvm_stream(&mut self, args: &[crate::parser::Argument]) -> Result<Value> {
9116+
// Evaluate all arguments to Values
9117+
let mut evaluated_args = Vec::new();
9118+
for arg in args {
9119+
evaluated_args.push(self.evaluate_expression(&arg.value)?);
9120+
}
9121+
9122+
// Call the streaming helper
9123+
crate::runtime::streaming::osvm_stream(&evaluated_args)
9124+
}
91109125
}
91119126

91129127
impl Default for LispEvaluator {

crates/ovsm/src/runtime/streaming.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,3 +476,156 @@ fn json_to_value(json: &JsonValue) -> Value {
476476
}
477477
}
478478
}
479+
480+
/// Spawn internal OSVM stream server and connect to it
481+
///
482+
/// Syntax: `(osvm-stream &key alias programs tokens accounts)`
483+
///
484+
/// This is a convenience function that:
485+
/// 1. Spawns an embedded stream server in a background thread
486+
/// 2. Automatically connects via WebSocket
487+
/// 3. Returns stream ID for use with stream-poll
488+
/// 4. Server auto-terminates when script ends
489+
///
490+
/// Parameters:
491+
/// - `:alias` (optional): Program alias like "pumpfun", "raydium"
492+
/// - `:programs` (optional): Array of program IDs
493+
/// - `:tokens` (optional): Array of token symbols/mints
494+
/// - `:accounts` (optional): Array of account addresses
495+
///
496+
/// Returns: Stream ID string
497+
///
498+
/// Example:
499+
/// ```lisp
500+
/// (define stream (osvm-stream :alias "pumpfun"))
501+
/// (while true
502+
/// (define events (stream-poll stream))
503+
/// ...)
504+
/// ```
505+
pub fn osvm_stream(args: &[Value]) -> Result<Value> {
506+
// Parse keyword arguments
507+
let mut alias: Option<String> = None;
508+
let mut programs: Vec<String> = Vec::new();
509+
let mut tokens: Vec<String> = Vec::new();
510+
let mut accounts: Vec<String> = Vec::new();
511+
512+
let mut i = 0;
513+
while i < args.len() {
514+
if let Value::String(key) = &args[i] {
515+
if key.starts_with(':') {
516+
if i + 1 >= args.len() {
517+
return Err(Error::runtime(format!(
518+
"osvm-stream: missing value for keyword argument {}",
519+
key
520+
)));
521+
}
522+
523+
let value = &args[i + 1];
524+
match key.as_str() {
525+
":alias" => {
526+
if let Value::String(s) = value {
527+
alias = Some(s.clone());
528+
}
529+
}
530+
":programs" => {
531+
programs = extract_string_array(value)?;
532+
}
533+
":tokens" => {
534+
tokens = extract_string_array(value)?;
535+
}
536+
":accounts" => {
537+
accounts = extract_string_array(value)?;
538+
}
539+
_ => {
540+
return Err(Error::runtime(format!(
541+
"osvm-stream: unknown keyword argument {}",
542+
key
543+
)))
544+
}
545+
}
546+
i += 2;
547+
} else {
548+
i += 1;
549+
}
550+
} else {
551+
i += 1;
552+
}
553+
}
554+
555+
// If alias provided, add to programs list
556+
if let Some(alias_name) = alias {
557+
programs.push(alias_name);
558+
}
559+
560+
// Find an available port
561+
let port = find_available_port()?;
562+
563+
// Spawn internal stream server in background
564+
spawn_internal_server(port, programs.clone(), tokens.clone(), accounts.clone())?;
565+
566+
// Wait for server to start
567+
thread::sleep(Duration::from_millis(1000));
568+
569+
// Connect via WebSocket
570+
let ws_url = format!("ws://127.0.0.1:{}/ws", port);
571+
let mut connect_args = vec![Value::String(ws_url)];
572+
573+
// Add filters if provided
574+
if !programs.is_empty() {
575+
connect_args.push(Value::String(":programs".to_string()));
576+
connect_args.push(Value::Array(Arc::new(
577+
programs.into_iter().map(Value::String).collect(),
578+
)));
579+
}
580+
if !tokens.is_empty() {
581+
connect_args.push(Value::String(":tokens".to_string()));
582+
connect_args.push(Value::Array(Arc::new(
583+
tokens.into_iter().map(Value::String).collect(),
584+
)));
585+
}
586+
if !accounts.is_empty() {
587+
connect_args.push(Value::String(":accounts".to_string()));
588+
connect_args.push(Value::Array(Arc::new(
589+
accounts.into_iter().map(Value::String).collect(),
590+
)));
591+
}
592+
593+
// Call stream_connect
594+
stream_connect(&connect_args)
595+
}
596+
597+
/// Find an available port for the internal server
598+
fn find_available_port() -> Result<u16> {
599+
use std::net::TcpListener;
600+
601+
// Try ports 18080-18180
602+
for port in 18080..18180 {
603+
if TcpListener::bind(("127.0.0.1", port)).is_ok() {
604+
return Ok(port);
605+
}
606+
}
607+
608+
Err(Error::runtime("Could not find available port for internal stream server".to_string()))
609+
}
610+
611+
/// Spawn internal stream server in background thread
612+
fn spawn_internal_server(
613+
port: u16,
614+
programs: Vec<String>,
615+
tokens: Vec<String>,
616+
accounts: Vec<String>,
617+
) -> Result<()> {
618+
thread::spawn(move || {
619+
// This would need to call the actual stream server code
620+
// For now, this is a placeholder that shows the architecture
621+
eprintln!("Internal stream server would start on port {} with filters:", port);
622+
eprintln!(" Programs: {:?}", programs);
623+
eprintln!(" Tokens: {:?}", tokens);
624+
eprintln!(" Accounts: {:?}", accounts);
625+
626+
// TODO: Actually spawn the stream server here
627+
// Need to refactor stream service to be embeddable
628+
});
629+
630+
Ok(())
631+
}

0 commit comments

Comments
 (0)