-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathrunprompt
More file actions
executable file
·2600 lines (2274 loc) · 92.5 KB
/
runprompt
File metadata and controls
executable file
·2600 lines (2274 loc) · 92.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
runprompt: A single-file runner for .prompt (Dotprompt) files.
This utility parses .prompt files containing YAML frontmatter and Handlebars
templates, executes pre-processing shell commands, attaches file context,
and manages LLM interactions with support for tool calling and caching.
Key Functions:
- main(): Entry point handling CLI arguments and the execution loop.
- render_template(): A subset of Handlebars for dynamic prompt generation.
- load_tools(): Dynamically imports Python and builtin functions as LLM tools.
- make_request(): Handles API communication across multiple providers.
- get_conf(): Implements the configuration priority cascade.
"""
import sys
import os
import json
import re
import hashlib
import time
import urllib.request
import urllib.error
import importlib.util
import inspect
import argparse
import glob
import base64
import mimetypes
import socket
# Try to use PyYAML if available, otherwise use minimal parser
try:
import yaml as _yaml
_HAS_PYYAML = True
except ImportError:
_HAS_PYYAML = False
# Global config state (loaded once at startup)
CONFIG = {
"files": {}, # Merged config from all config files
"env": {}, # Parsed from RUNPROMPT_* env vars
"args": {}, # From command line arguments
}
# Config keys that support the cascade
CONFIG_KEYS = {
"model", "default_model", "tool_path", "base_url", "ollama_base_url", "cache", "cache_dir",
"safe_yes", "verbose", "anthropic_api_key", "openai_api_key",
"google_api_key", "openrouter_api_key", "chat",
}
PROVIDERS = {
"openrouter": {
"url": "https://openrouter.ai/api/v1/chat/completions",
"env": "OPENROUTER_API_KEY",
},
"googleai": {
"url": "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions",
"env": "GOOGLE_API_KEY",
},
"anthropic": {
"url": "https://api.anthropic.com/v1/messages",
"env": "ANTHROPIC_API_KEY",
},
"openai": {
"url": "https://api.openai.com/v1/chat/completions",
"env": "OPENAI_API_KEY",
},
}
RED = "\033[31m"
YELLOW = "\033[33m"
GREEN = "\033[32m"
CYAN = "\033[36m"
RESET = "\033[0m"
TIMEOUT = 120
# Session cost accumulator (None means unknown)
SESSION_COST = 0.0
SESSION_COST_KNOWN = True
# === CONFIGURATION CASCADE ===
def load_config_files():
"""Load config from cascade of config file locations (lowest to highest priority)."""
config = {}
locations = [
os.path.join(os.path.expanduser("~"), ".runprompt", "config.yml"),
os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
"runprompt", "config.yml"
),
os.path.join(".", ".runprompt", "config.yml"),
]
for path in locations:
if os.path.exists(path):
try:
with open(path, "r") as f:
file_config = parse_yaml(f.read())
if file_config:
for key, value in file_config.items():
config[normalize_key(key)] = value
except Exception as e:
print("%sWarning: Could not load config from %s: %s%s" %
(YELLOW, path, e, RESET), file=sys.stderr)
return config
def load_config_env():
"""Load config from RUNPROMPT_* environment variables."""
config = {}
for env_key, env_value in os.environ.items():
if env_key.startswith("RUNPROMPT_"):
key = normalize_key(env_key[10:])
if key in ("cache_dir",):
config[key] = env_value
else:
config[key] = parse_yaml_value(env_value)
return config
def normalize_key(key):
"""Normalize config key: lowercase, underscores instead of hyphens."""
return key.lower().replace("-", "_")
def get_conf(key, default=None):
"""Get config value with cascade priority: files < env < args."""
key = normalize_key(key)
# Check args first (highest priority)
if key in CONFIG["args"]:
return CONFIG["args"][key]
# Check env vars
if key in CONFIG["env"]:
return CONFIG["env"][key]
# Check config files
if key in CONFIG["files"]:
return CONFIG["files"][key]
return default
def get_api_key(provider):
"""Get API key for provider, checking config cascade then native env var."""
key_name = provider + "_api_key"
# First check config cascade
from_conf = get_conf(key_name)
if from_conf:
return from_conf
# Fall back to native env var (e.g. ANTHROPIC_API_KEY)
env_var = PROVIDERS.get(provider, {}).get("env")
if env_var:
return os.environ.get(env_var)
return None
def init_config(args):
"""Initialize global config from all sources."""
CONFIG["files"] = load_config_files()
CONFIG["env"] = load_config_env()
# Convert args to dict
args_dict = {}
if args.verbose:
args_dict["verbose"] = True
if args.cache:
args_dict["cache"] = True
if args.safe_yes:
args_dict["safe_yes"] = True
if args.base_url:
args_dict["base_url"] = args.base_url
if args.ollama_base_url:
args_dict["ollama_base_url"] = args.ollama_base_url
if args.tool_path:
args_dict["tool_path"] = args.tool_path
if args.chat:
args_dict["chat"] = True
# Include overrides from --key=value
for key, value in args.overrides.items():
args_dict[normalize_key(key)] = value
CONFIG["args"] = args_dict
# === MAIN ENTRY POINT ===
def main():
args = parse_args(sys.argv[1:])
init_config(args)
if args.clear_cache:
clear_cache()
sys.exit(0)
use_cache = get_conf("cache", False)
if len(args.remaining) < 1 and not get_conf("chat"):
print("Usage: runprompt [options] <prompt_file>", file=sys.stderr)
print("Try 'runprompt --help' for more information.", file=sys.stderr)
sys.exit(1)
prompt_path = args.remaining[0] if args.remaining else None
if prompt_path:
meta, template = parse_prompt_file(prompt_path)
else:
meta, template = {}, ""
meta = apply_overrides(meta)
# Apply model from config cascade if not in frontmatter
if not meta.get("model"):
if get_conf("model"):
meta["model"] = get_conf("model")
elif get_conf("default_model"):
meta["model"] = get_conf("default_model")
for key, value in args.overrides.items():
log("Override from arg --%s: %s" % (key, value))
if key == "tools" and isinstance(value, str):
# Parse comma-separated tools
meta[key] = [t.strip() for t in value.split(",")]
else:
meta[key] = value
model_str = meta.get("model", "")
if not model_str:
print("No model specified in prompt file", file=sys.stderr)
sys.exit(1)
provider, model = parse_model_string(model_str)
if not provider:
print("No provider in model string", file=sys.stderr)
sys.exit(1)
# Collect extra args after prompt file
extra_args = args.remaining[1:] if len(args.remaining) > 1 else []
args_str = " ".join(extra_args)
log("Extra args: %s" % args_str)
raw_stdin = read_stdin()
variables = {"STDIN": raw_stdin or "", "ARGS": args_str}
# Determine INPUT: prefer STDIN if provided, otherwise use ARGS
if raw_stdin:
variables["INPUT"] = raw_stdin
else:
variables["INPUT"] = args_str
# Parse STDIN as JSON if provided
if raw_stdin:
try:
parsed = json.loads(raw_stdin)
variables.update(parsed)
log("Parsed STDIN as JSON")
except ValueError:
log("STDIN is not JSON, treating as raw string")
input_schema = meta.get("input", {}).get("schema", {})
if input_schema:
first_key = list(input_schema.keys())[0]
variables[first_key] = raw_stdin
else:
variables["input"] = raw_stdin
# Parse ARGS as JSON if provided (and no STDIN to avoid conflicts)
elif args_str:
try:
parsed = json.loads(args_str)
variables.update(parsed)
log("Parsed ARGS as JSON")
except ValueError:
log("ARGS is not JSON, treating as raw string")
input_schema = meta.get("input", {}).get("schema", {})
if input_schema:
first_key = list(input_schema.keys())[0]
variables[first_key] = args_str
# Execute before: shell commands if present
before_commands = meta.get("before", {})
if before_commands:
execute_before_commands(before_commands, variables)
validate_required_inputs(meta, variables)
prompt = render_template(template, variables)
log("Rendered prompt: %s" % prompt)
output_config = meta.get("output", {})
# Build tool search paths
search_paths = []
search_paths.append(os.getcwd())
if prompt_path:
prompt_dir = os.path.dirname(os.path.abspath(prompt_path))
if prompt_dir not in search_paths:
search_paths.append(prompt_dir)
# If prompt is a symlink, also add the real file's directory
real_prompt_path = os.path.realpath(prompt_path)
if real_prompt_path != os.path.abspath(prompt_path):
real_prompt_dir = os.path.dirname(real_prompt_path)
if real_prompt_dir not in search_paths:
search_paths.append(real_prompt_dir)
# Add tool paths from config cascade
conf_tool_paths = get_conf("tool_path", [])
if isinstance(conf_tool_paths, str):
conf_tool_paths = [conf_tool_paths]
for tp in conf_tool_paths:
abs_tp = os.path.abspath(tp)
if abs_tp not in search_paths:
search_paths.append(abs_tp)
# Add default tool paths from config directories
default_tool_dirs = [
os.path.join(".", ".runprompt", "tools"),
os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
"runprompt", "tools"
),
os.path.join(os.path.expanduser("~"), ".runprompt", "tools"),
]
for td in default_tool_dirs:
abs_td = os.path.abspath(td)
if abs_td not in search_paths and os.path.isdir(abs_td):
search_paths.append(abs_td)
log("Tool search paths: %s" % search_paths)
# Load tools
tool_specs = meta.get("tools", [])
if isinstance(tool_specs, str):
tool_specs = [t.strip() for t in tool_specs.split(",")]
tools = {}
if tool_specs:
tools = load_tools(tool_specs, search_paths)
log("Loaded %d tools: %s" % (len(tools), list(tools.keys())))
# Load shell_tools
shell_tool_specs = meta.get("shell_tools", {})
if shell_tool_specs:
shell_tools = load_shell_tools(shell_tool_specs)
tools.update(shell_tools)
log("Loaded %d shell tools: %s" % (len(shell_tools), list(shell_tools.keys())))
# Determine effective provider early for file reading
base_url = (get_conf("base_url") or get_conf("ollama_base_url") or
meta.get("base_url") or meta.get("ollama_base_url") or
get_base_url())
if base_url:
effective_provider = "openai"
else:
effective_provider = provider
read_patterns = []
if meta.get("files") is not None:
if not isinstance(meta.get("files"), list):
print("%sError: 'files' frontmatter must be a list%s" %
(RED, RESET), file=sys.stderr)
sys.exit(1)
# Render template variables in files: patterns
raw_patterns = meta.get("files") or []
for pattern in raw_patterns:
rendered_pattern = render_template(pattern, variables)
read_patterns.append(rendered_pattern)
if args.read:
read_patterns.extend(args.read)
# Read files from --read flags and prompt frontmatter
read_files = []
if read_patterns:
read_files = read_files_for_context(read_patterns, effective_provider)
log("Read %d file(s) for context" % len(read_files))
is_chat = get_conf("chat") or meta.get("chat", False)
if not model_str and is_chat:
print("No model specified. Use --model or set one in config.",
file=sys.stderr)
sys.exit(1)
if provider == "test":
response = load_test_response(prompt_path)
test_provider = response.get("_provider", "openai")
result = extract_response(response, output_config, test_provider)
print(result)
return
if base_url:
url, api_key = get_provider_config(provider, base_url)
else:
url, api_key = get_provider_config(provider)
# Check cache
cached_response = None
key = None
if use_cache:
key = cache_key(prompt, meta)
log("Cache key: %s" % key)
cached_response = cache_get(key)
if cached_response:
response = cached_response
cached_provider = response.get("_provider", effective_provider)
result = extract_response(response, output_config, cached_provider)
print(result)
return
# Build initial messages with optional file attachments
messages = []
if prompt:
user_content = build_content_with_files(prompt, read_files,
effective_provider)
messages = [{"role": "user", "content": user_content}]
# In chat mode with no initial prompt, ask for first input now
if is_chat and not messages:
first_input = chat_read_input()
if first_input is None:
return
messages = [{"role": "user", "content": first_input}]
# Tool execution / Chat loop
while True:
response = make_request(url, api_key, model, messages, output_config,
effective_provider, tools if tools else None)
# Print usage info
print_usage(response)
# Print any text content immediately
text_content = extract_text_content(response, effective_provider)
if text_content:
print(text_content)
# Check for tool calls
tool_calls = extract_tool_calls(response, effective_provider)
# Filter out 'extract' tool calls - those are for structured output
user_tool_calls = [tc for tc in tool_calls if tc["name"] != "extract"]
if not user_tool_calls:
# No more tool calls
if use_cache and key:
cache_set(key, response, effective_provider)
if args.save_response:
save_response(response, effective_provider, args.save_response)
# If there was an extract tool call, output that and exit
for tc in tool_calls:
if tc["name"] == "extract":
print(json.dumps(tc["arguments"], indent=2))
return
# Otherwise we already printed text content above
if not text_content:
result = extract_response(response, output_config,
effective_provider)
if result:
print(result)
if not is_chat:
return
# In chat mode, prompt for next input
user_input = chat_read_input()
if user_input is None:
return
messages.append(build_assistant_message(response, effective_provider))
messages.append({"role": "user", "content": user_input})
continue
# Add assistant message to conversation
messages.append(build_assistant_message(response, effective_provider))
# Process each tool call
for tc in user_tool_calls:
tool_name = tc["name"]
tool_args = tc["arguments"]
if tool_name not in tools:
error_msg = "Unknown tool: %s" % tool_name
print("%s%s%s" % (RED, error_msg, RESET), file=sys.stderr)
messages.append(build_tool_result_message(
tc, None, error_msg, effective_provider))
continue
tool_func = tools[tool_name]["func"]
# Always print tool call summary
print_tool_call(tool_name, tool_args)
# Check if safe-yes is enabled and tool is safe
if get_conf("safe_yes") and is_tool_safe(tool_func):
log("Auto-approving safe tool: %s" % tool_name)
approved = True
else:
# Prompt user for confirmation
approved = prompt_user_for_tool(tool_name, tool_args)
if not approved:
error_msg = "Tool execution declined by user"
messages.append(build_tool_result_message(
tc, None, error_msg, effective_provider))
continue
# Execute the tool
result, error = execute_tool(tool_func, tool_args)
if error:
print("%s%s%s" % (RED, error, RESET), file=sys.stderr)
messages.append(build_tool_result_message(
tc, result, error, effective_provider))
HELP_EPILOG = """\
Input:
Pipe JSON to set template variables: echo '{"name": "World"}' | runprompt hello.prompt
Pipe text for {{STDIN}} variable: echo "some text" | runprompt summarize.prompt
Pass args for {{ARGS}} variable: runprompt hello.prompt Some text here
Config files (lowest to highest priority):
~/.runprompt/config.yml
$XDG_CONFIG_HOME/runprompt/config.yml (default: ~/.config/runprompt/config.yml)
./.runprompt/config.yml
Config file example:
model: openai/gpt-4o
tool_path:
- ./tools
- /shared/tools
cache: true
safe_yes: true
openai_api_key: sk-...
Environment:
ANTHROPIC_API_KEY API key for Anthropic models
OPENAI_API_KEY API key for OpenAI models
GOOGLE_API_KEY API key for Google AI models
OPENROUTER_API_KEY API key for OpenRouter models
OLLAMA_BASE_URL Custom Ollama endpoint URL (e.g. http://localhost:11434/v1)
OPENAI_BASE_URL Custom OpenAI-compatible endpoint URL
OPENAI_API_BASE Custom endpoint URL (legacy OpenAI SDK v0.x style)
BASE_URL Custom endpoint URL (fallback)
RUNPROMPT_<KEY> Override config (e.g. RUNPROMPT_MODEL=openai/gpt-4o)
Config priority (highest wins): CLI flags > env vars > ./.runprompt > ~/.config > ~/.runprompt
Tools:
Mark a function as safe by setting fn.safe = True after the definition:
def my_safe_tool(x: str):
\"\"\"A safe tool that only reads data.\"\"\"
return "result"
my_safe_tool.safe = True
Safe tools are auto-approved when --safe-yes is passed.
Examples:
runprompt hello.prompt
runprompt hello.prompt Some text to process
echo '{"name": "World"}' | runprompt hello.prompt
runprompt --model openai/gpt-4o hello.prompt
runprompt -v --save-response out.json hello.prompt
runprompt --cache hello.prompt
runprompt --safe-yes tool_prompt.prompt
runprompt --read "src/*.py" review.prompt
runprompt --read "*.png" describe.prompt
runprompt --read README.md --read "src/**/*.py" analyze.prompt
OPENAI_BASE_URL=http://localhost:11434/v1 runprompt hello.prompt
Overrides:
--<key>=<value> Override frontmatter value (e.g. --model=openai/gpt-4o)
--<key> <value> Override frontmatter value (e.g. --model openai/gpt-4o)
"""
def parse_args(args):
parser = argparse.ArgumentParser(
description="Run Dotprompt (.prompt) files from the command line.",
add_help=True, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=HELP_EPILOG)
parser.add_argument("-v", "--verbose", action="store_true",
help="Show request/response details")
parser.add_argument("-c", "--cache", action="store_true",
help="Enable response caching")
parser.add_argument("--clear-cache", action="store_true",
help="Clear the response cache and exit")
parser.add_argument("--safe-yes", action="store_true",
help="Auto-approve tool calls for safe functions")
parser.add_argument("--save-response", metavar="FILE",
help="Save raw API response to file")
parser.add_argument("--base-url", "--openai-base-url", metavar="URL",
help="Use custom OpenAI-compatible endpoint")
parser.add_argument("--ollama-base-url", metavar="URL",
help="Use custom Ollama endpoint")
parser.add_argument("--tool-path", action="append", default=[],
metavar="PATH", help="Add directory to tool import path")
parser.add_argument("--chat", action="store_true",
help="Interactive chat mode: prompt for input after each response")
parser.add_argument("--read", "--file", action="append", default=[],
metavar="FILE",
help="Read file(s) into context (supports globs)")
parsed, unknown = parser.parse_known_args(args)
# Parse unknown args for --key=value overrides and extra positional args
parsed.overrides = {}
parsed.remaining = []
i = 0
while i < len(unknown):
arg = unknown[i]
if arg.startswith("--"):
if "=" in arg:
key, val = arg[2:].split("=", 1)
parsed.overrides[normalize_key(key)] = parse_yaml_value(val)
elif i + 1 < len(unknown) and not unknown[i + 1].startswith("-"):
parsed.overrides[normalize_key(arg[2:])] = parse_yaml_value(
unknown[i + 1])
i += 1
else:
parsed.overrides[normalize_key(arg[2:])] = True
else:
parsed.remaining.append(arg)
i += 1
return parsed
def read_stdin():
if sys.stdin.isatty():
return None
content = sys.stdin.read()
if not content:
return None
return content.strip() or None
# === PROMPT FILE HANDLING ===
def parse_prompt_file(path):
with open(path, "r") as f:
content = f.read()
# Skip shebang line if present
if content.startswith("#!"):
content = content.split("\n", 1)[1] if "\n" in content else ""
# Handle ---frontmatter---template format
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
meta_str = parts[1].strip()
template = parts[2].strip()
meta = parse_yaml(meta_str)
return meta, template
return {}, content.strip()
# Handle frontmatter---template format (no opening ---)
if "---" in content:
parts = content.split("---", 1)
meta_str = parts[0].strip()
template = parts[1].strip()
meta = parse_yaml(meta_str)
return meta, template
# No frontmatter delimiter found
return {}, content.strip()
def _parse_yaml_minimal(s):
"""Minimal YAML parser for dotprompt frontmatter."""
result = {}
stack = [(result, -1)]
current_list = None
current_list_indent = -1
current_multiline = None
current_multiline_indent = -1
current_multiline_base_indent = -1
lines = s.split("\n")
i = 0
while i < len(lines):
line = lines[i]
if not line.strip() or line.strip().startswith("#"):
i += 1
continue
indent = len(line) - len(line.lstrip())
# Handle multiline string continuation
if current_multiline is not None:
if indent > current_multiline_indent or not line.strip():
# First content line - establish base indent
if current_multiline_base_indent == -1 and line.strip():
current_multiline_base_indent = indent
# Continuation of multiline string
if line.strip():
current_multiline["lines"].append(line[current_multiline_base_indent:])
else:
current_multiline["lines"].append("")
i += 1
continue
else:
# End of multiline string
parent = current_multiline["parent"]
key = current_multiline["key"]
parent[key] = "\n".join(current_multiline["lines"])
current_multiline = None
current_multiline_indent = -1
current_multiline_base_indent = -1
# Don't increment i, process this line normally
# Check if this is a list item
list_match = re.match(r"^(\s*)-\s*(.*)", line)
if list_match:
item_value = list_match.group(2).strip()
if current_list is not None and indent >= current_list_indent:
current_list.append(parse_yaml_value(item_value) if item_value else item_value)
i += 1
continue
# Not a list item - reset list tracking if we've dedented
if current_list is not None and indent <= current_list_indent:
current_list = None
current_list_indent = -1
# Pop stack for dedented lines
while stack and indent <= stack[-1][1]:
stack.pop()
if not stack:
stack = [(result, -1)]
match = re.match(r"^(\s*)([^:]+):\s*(.*)", line)
if not match:
i += 1
continue
key = match.group(2).strip()
value = match.group(3).strip()
parent = stack[-1][0]
# Check for multiline string indicator (| or >)
if value in ("|", ">"):
current_multiline = {
"parent": parent,
"key": key,
"lines": [],
"style": value
}
current_multiline_indent = indent
current_multiline_base_indent = -1
i += 1
continue
if value:
parent[key] = parse_yaml_value(value)
# Reset list tracking when we have a value
current_list = None
current_list_indent = -1
else:
# Check if next non-empty line is a list item
is_list_parent = False
if i + 1 < len(lines):
for next_line in lines[i + 1:]:
if not next_line.strip() or next_line.strip().startswith("#"):
continue
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent <= indent:
break
if re.match(r"^\s*-\s*", next_line):
is_list_parent = True
break
if is_list_parent:
parent[key] = []
current_list = parent[key]
current_list_indent = indent
else:
parent[key] = {}
stack.append((parent[key], indent))
current_list = None
current_list_indent = -1
i += 1
# Handle any remaining multiline string
if current_multiline is not None:
parent = current_multiline["parent"]
key = current_multiline["key"]
parent[key] = "\n".join(current_multiline["lines"])
current_multiline_base_indent = -1
return result
def parse_yaml(s):
"""Parse YAML string. Uses PyYAML if available, otherwise minimal parser."""
if _HAS_PYYAML:
return _yaml.safe_load(s) or {}
return _parse_yaml_minimal(s)
def parse_yaml_value(s):
s = s.strip()
if not s:
return None
if s.lower() == "true":
return True
if s.lower() == "false":
return False
if re.match(r"^-?\d+$", s):
return int(s)
if re.match(r"^-?\d+\.\d+$", s):
return float(s)
if "\n" in s or s.startswith("{"):
try:
return json.loads(s)
except ValueError:
pass
parsed = parse_yaml(s)
if parsed:
return parsed
# Strip surrounding quotes (single or double) to match PyYAML behavior
if len(s) >= 2 and s[0] in ('"', "'") and s[-1] == s[0]:
return s[1:-1]
return s
def get_required_input_fields(meta):
"""Extract required field names from input schema (fields without ? suffix)."""
input_schema = meta.get("input", {}).get("schema", {})
required = []
for key in input_schema:
if not key.endswith("?"):
required.append(key)
return required
def validate_required_inputs(meta, variables):
"""Check that all required input fields are present. Exit with error if not."""
log("DEBUG validate_required_inputs meta: %s" % meta)
log("DEBUG validate_required_inputs variables: %s" % variables)
required = get_required_input_fields(meta)
log("DEBUG required fields: %s" % required)
missing = []
for field in required:
if field not in variables or variables[field] == "":
missing.append(field)
if missing:
print("%sError: Missing required input field(s): %s%s" %
(RED, ", ".join(missing), RESET), file=sys.stderr)
input_schema = meta.get("input", {}).get("schema", {})
print("Expected input schema:", file=sys.stderr)
for key, value in input_schema.items():
opt = " (optional)" if key.endswith("?") else " (required)"
clean_key = key.rstrip("?")
print(" %s: %s%s" % (clean_key, value, opt), file=sys.stderr)
sys.exit(1)
def apply_overrides(meta):
"""Apply RUNPROMPT_* env vars to prompt metadata (for prompt-specific overrides)."""
for key in CONFIG["env"]:
# Skip config-level keys, only apply prompt-specific overrides
if key in CONFIG_KEYS:
continue
value = CONFIG["env"][key]
if value is not None:
log("Override from env RUNPROMPT_%s: %s" % (key.upper(), value))
meta[key] = value
return meta
# === TEMPLATE RENDERING ===
def render_template(template, variables):
def lookup(name, ctx):
name = name.strip()
if name == ".":
return ctx
# Handle @index, @first, @last, @key
if name.startswith("@"):
return ctx.get(name, "")
for part in name.split("."):
if isinstance(ctx, dict):
ctx = ctx.get(part, "")
else:
return ""
return ctx
def render(tmpl, ctx):
# Remove comments: {{! ... }}
tmpl = re.sub(r"\{\{!.*?\}\}", "", tmpl, flags=re.DOTALL)
# Process {{#each key}}...{{/each}}
def each_replace(match):
key = match.group(1)
inner = match.group(2)
val = lookup(key, ctx)
result = []
if isinstance(val, list):
for i, item in enumerate(val):
if isinstance(item, dict):
item_ctx = dict(item)
else:
item_ctx = {}
item_ctx["@index"] = i
item_ctx["@first"] = (i == 0)
item_ctx["@last"] = (i == len(val) - 1)
item_ctx["."] = item
result.append(render(inner, item_ctx))
elif isinstance(val, dict):
keys = list(val.keys())
for i, k in enumerate(keys):
item = val[k]
if isinstance(item, dict):
item_ctx = dict(item)
else:
item_ctx = {}
item_ctx["@key"] = k
item_ctx["@index"] = i
item_ctx["@first"] = (i == 0)
item_ctx["@last"] = (i == len(keys) - 1)
item_ctx["."] = item
result.append(render(inner, item_ctx))
return "".join(result)
tmpl = re.sub(
r"\{\{#each\s+(\w+)\}\}(.*?)\{\{/each\}\}",
each_replace,
tmpl,
flags=re.DOTALL
)
# Process sections: {{#key}}...{{/key}}
def section_replace(match):
key = match.group(1)
inner = match.group(2)
val = lookup(key, ctx)
if isinstance(val, list):
result = []
for i, item in enumerate(val):
if isinstance(item, dict):
item_ctx = dict(item)
else:
item_ctx = {"_value": item}
item_ctx["@index"] = i
item_ctx["@first"] = (i == 0)
item_ctx["@last"] = (i == len(val) - 1)
item_ctx["."] = item
result.append(render(inner, item_ctx))
return "".join(result)
if val:
new_ctx = val if isinstance(val, dict) else ctx
return render(inner, new_ctx)
return ""
# Process inverted sections: {{^key}}...{{/key}}
def inverted_replace(match):
key = match.group(1)
inner = match.group(2)
val = lookup(key, ctx)
if not val or (isinstance(val, list) and len(val) == 0):
return render(inner, ctx)
return ""
# Process {{#if}} and {{#unless}} with shared logic
def conditional_replace(match, invert=False):
key = match.group(1)
inner = match.group(2)
val = lookup(key, ctx)
is_truthy = bool(val) and val != "" and \
not (isinstance(val, list) and len(val) == 0)
if invert:
is_truthy = not is_truthy
# Find {{else}} not inside nested conditionals
depth = 0
else_pos = None
i = 0
tag = "{{#unless" if invert else "{{#if"
end_tag = "{{/unless}}" if invert else "{{/if}}"
while i < len(inner):
if inner[i:].startswith(tag):
depth += 1
elif inner[i:].startswith(end_tag):
depth -= 1
elif inner[i:].startswith("{{else}}") and depth == 0:
else_pos = i
break
i += 1
if else_pos is not None:
return inner[:else_pos] if is_truthy else inner[else_pos + 8:]
return inner if is_truthy else ""
# Process if/unless in a single loop to handle mixed nesting
if_pattern = re.compile(
r"\{\{#if\s+([\w.]+)\}\}((?:(?!\{\{#if)(?!\{\{/if\}\})(?!\{\{#unless)(?!\{\{/unless\}\}).)*?)\{\{/if\}\}",
re.DOTALL
)
unless_pattern = re.compile(
r"\{\{#unless\s+([\w.]+)\}\}((?:(?!\{\{#if)(?!\{\{/if\}\})(?!\{\{#unless)(?!\{\{/unless\}\}).)*?)\{\{/unless\}\}",
re.DOTALL
)
while True:
if_match = if_pattern.search(tmpl)
unless_match = unless_pattern.search(tmpl)
if not if_match and not unless_match:
break
if if_match and (not unless_match or if_match.start() < unless_match.start()):
tmpl = if_pattern.sub(lambda m: conditional_replace(m, False), tmpl, count=1)
else:
tmpl = unless_pattern.sub(lambda m: conditional_replace(m, True), tmpl, count=1)
# Process sections first (innermost first via non-greedy)
tmpl = re.sub(
r"\{\{#(@?\w+)\}\}(.*?)\{\{/\1\}\}",
section_replace,
tmpl,
flags=re.DOTALL
)
tmpl = re.sub(
r"\{\{\^(@?\w+)\}\}(.*?)\{\{/\1\}\}",
inverted_replace,
tmpl,
flags=re.DOTALL
)
# Process variables
def var_replace(match):