-
Notifications
You must be signed in to change notification settings - Fork 188
Expand file tree
/
Copy pathactions.py
More file actions
415 lines (353 loc) · 14 KB
/
actions.py
File metadata and controls
415 lines (353 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
"""
JARVIS Action Executor — AppleScript-based system actions.
Execute actions IMMEDIATELY, before generating any LLM response.
Each function returns {"success": bool, "confirmation": str}.
"""
import asyncio
import logging
import os
import re
import time
from pathlib import Path
from urllib.parse import quote
log = logging.getLogger("jarvis.actions")
DESKTOP_PATH = Path.home() / "Desktop"
_SKIP_PERMISSIONS = os.getenv("JARVIS_SKIP_PERMISSIONS", "true").lower() not in ("0", "false", "no")
async def _mark_terminal_as_jarvis(revert_after: float = 5.0):
"""Temporarily set the front Terminal window to Ocean theme, then revert.
Shows the user JARVIS is active in that terminal. Reverts after revert_after seconds.
"""
# Save the current profile, switch to Ocean, then revert
script_save = (
'tell application "Terminal"\n'
' return name of current settings of front window\n'
'end tell'
)
try:
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script_save,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
original_profile = stdout.decode().strip()
# Switch to Ocean
script_set = (
'tell application "Terminal"\n'
' set current settings of front window to settings set "Ocean"\n'
'end tell'
)
proc2 = await asyncio.create_subprocess_exec(
"osascript", "-e", script_set,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc2.communicate()
# Schedule revert
if original_profile and original_profile != "Ocean":
asyncio.get_event_loop().call_later(
revert_after,
lambda: asyncio.ensure_future(_revert_terminal_theme(original_profile))
)
except Exception:
pass
async def _revert_terminal_theme(profile_name: str):
"""Revert a Terminal window back to its original profile."""
escaped = profile_name.replace('"', '\\"')
script = (
'tell application "Terminal"\n'
f' set current settings of front window to settings set "{escaped}"\n'
'end tell'
)
try:
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
except Exception:
pass
def applescript_escape(s: str) -> str:
"""Escape a string for safe embedding in an AppleScript double-quoted string."""
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\r", "").replace("\n", " ")
async def open_terminal(command: str = "") -> dict:
"""Open Terminal.app and optionally run a command. Marks it blue for JARVIS."""
if command:
escaped = applescript_escape(command)
script = (
'tell application "Terminal"\n'
" activate\n"
f' do script "{escaped}"\n'
"end tell"
)
else:
script = (
'tell application "Terminal"\n'
" activate\n"
"end tell"
)
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
success = proc.returncode == 0
if not success:
log.error(f"open_terminal failed: {stderr.decode()}")
else:
await _mark_terminal_as_jarvis()
return {
"success": success,
"confirmation": "Terminal is open, sir." if success else "I had trouble opening Terminal, sir.",
}
async def open_browser(url: str, browser: str = "chrome") -> dict:
"""Open URL in user's browser (Chrome or Firefox)."""
escaped_url = url.replace('"', '\\"')
if browser.lower() == "firefox":
app_name = "Firefox"
script = (
'tell application "Firefox"\n'
" activate\n"
f' open location "{escaped_url}"\n'
"end tell"
)
else:
app_name = "Chrome"
script = (
'tell application "Google Chrome"\n'
" activate\n"
f' open location "{escaped_url}"\n'
"end tell"
)
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
success = proc.returncode == 0
if not success:
log.error(f"open_browser ({app_name}) failed: {stderr.decode()}")
return {
"success": success,
"confirmation": f"Pulled that up in {app_name}, sir." if success else f"{app_name} ran into a problem, sir.",
}
# Keep backward compat
async def open_chrome(url: str) -> dict:
return await open_browser(url, "chrome")
async def open_claude_in_project(project_dir: str, prompt: str) -> dict:
"""Open Terminal, cd to project dir, run Claude Code interactively.
Writes the prompt to CLAUDE.md (which claude reads automatically on startup)
then launches claude in interactive mode.
No prompt escaping needed — CLAUDE.md handles context delivery.
"""
claude_md = Path(project_dir) / "CLAUDE.md"
claude_md.write_text(f"# Task\n\n{prompt}\n\nBuild this completely. If web app, make index.html work standalone.\n")
skip_flag = " --dangerously-skip-permissions" if _SKIP_PERMISSIONS else ""
escaped_dir = applescript_escape(project_dir)
script = (
'tell application "Terminal"\n'
" activate\n"
f' do script "cd {escaped_dir} && claude{skip_flag}"\n'
"end tell"
)
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
success = proc.returncode == 0
if not success:
log.error(f"open_claude_in_project failed: {stderr.decode()}")
else:
await _mark_terminal_as_jarvis()
return {
"success": success,
"confirmation": "Claude Code is running in Terminal, sir. You can watch the progress."
if success
else "Had trouble spawning Claude Code, sir.",
}
async def prompt_existing_terminal(project_name: str, prompt: str) -> dict:
"""Find a Terminal window matching a project name and type a prompt into it.
Uses System Events keystroke to type into an active Claude Code session
rather than `do script` which would open a new shell.
"""
escaped_name = applescript_escape(project_name)
escaped_prompt = applescript_escape(prompt)
# Single atomic script: find window, focus it, type into it
script = f'''
tell application "Terminal"
set matched to false
set targetWindow to missing value
repeat with w in windows
if name of w contains "{escaped_name}" then
set targetWindow to w
set matched to true
exit repeat
end if
end repeat
if not matched then
return "NOT_FOUND"
end if
-- Bring the matched window to front
set index of targetWindow to 1
set selected tab of targetWindow to selected tab of targetWindow
activate
end tell
-- Wait for window to be fully focused
delay 1
-- Now type into it
tell application "System Events"
tell process "Terminal"
set frontmost to true
delay 0.3
keystroke "{escaped_prompt}"
delay 0.2
keystroke return
end tell
end tell
return "OK"
'''
try:
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15)
result = stdout.decode().strip()
if result == "NOT_FOUND":
return {
"success": False,
"confirmation": f"Couldn't find a terminal for {project_name}, sir.",
}
success = proc.returncode == 0
if not success:
log.error(f"prompt_existing_terminal failed: {stderr.decode()[:200]}")
if success:
await _mark_terminal_as_jarvis()
return {
"success": success,
"confirmation": f"Sent that to {project_name}, sir." if success
else f"Had trouble typing into {project_name}, sir.",
}
except asyncio.TimeoutError:
return {"success": False, "confirmation": "Terminal operation timed out, sir."}
except Exception as e:
log.error(f"prompt_existing_terminal failed: {e}")
return {"success": False, "confirmation": "Something went wrong reaching that terminal, sir."}
async def get_chrome_tab_info() -> dict:
"""Read the current Chrome tab's title and URL via AppleScript."""
script = (
'tell application "Google Chrome"\n'
" set tabTitle to title of active tab of front window\n"
" set tabURL to URL of active tab of front window\n"
' return tabTitle & "|" & tabURL\n'
"end tell"
)
try:
proc = await asyncio.create_subprocess_exec(
"osascript", "-e", script,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
if proc.returncode == 0:
result = stdout.decode().strip()
parts = result.split("|", 1)
if len(parts) == 2:
return {"title": parts[0], "url": parts[1]}
return {}
except Exception as e:
log.warning(f"get_chrome_tab_info failed: {e}")
return {}
async def monitor_build(project_dir: str, ws=None, synthesize_fn=None) -> None:
"""Monitor a Claude Code build for completion. Notify via WebSocket when done."""
import base64
output_file = Path(project_dir) / ".jarvis_output.txt"
start = time.time()
timeout = 600 # 10 minutes
while time.time() - start < timeout:
await asyncio.sleep(5)
if output_file.exists():
content = output_file.read_text()
if "--- JARVIS TASK COMPLETE ---" in content:
log.info(f"Build complete in {project_dir}")
if ws and synthesize_fn:
try:
msg = "The build is complete, sir."
audio_bytes = await synthesize_fn(msg)
if audio_bytes:
encoded = base64.b64encode(audio_bytes).decode()
await ws.send_json({"type": "status", "state": "speaking"})
await ws.send_json({"type": "audio", "data": encoded, "text": msg})
await ws.send_json({"type": "status", "state": "idle"})
except Exception as e:
log.warning(f"Build notification failed: {e}")
return
log.warning(f"Build timed out in {project_dir}")
async def execute_action(intent: dict, projects: list = None) -> dict:
"""Route a classified intent to the right action function.
Args:
intent: {"action": str, "target": str} from classify_intent()
projects: list of known project dicts for resolving working dirs
Returns: {"success": bool, "confirmation": str, "project_dir": str | None}
"""
action = intent.get("action", "chat")
target = intent.get("target", "")
if action == "open_terminal":
claude_cmd = "claude --dangerously-skip-permissions" if _SKIP_PERMISSIONS else "claude"
result = await open_terminal(claude_cmd)
result["project_dir"] = None
return result
elif action == "browse":
if target.startswith("http://") or target.startswith("https://"):
url = target
else:
url = f"https://www.google.com/search?q={quote(target)}"
# Detect which browser user wants
target_lower = target.lower()
if "firefox" in target_lower:
browser = "firefox"
else:
browser = "chrome"
result = await open_browser(url, browser)
result["project_dir"] = None
return result
elif action == "build":
# Create project folder on Desktop, spawn Claude Code
project_name = _generate_project_name(target)
project_dir = str(DESKTOP_PATH / project_name)
os.makedirs(project_dir, exist_ok=True)
result = await open_claude_in_project(project_dir, target)
result["project_dir"] = project_dir
return result
else:
return {"success": False, "confirmation": "", "project_dir": None}
def _generate_project_name(prompt: str) -> str:
"""Generate a kebab-case project folder name from the prompt."""
# First: check for a quoted name like "tiktok-analytics-dashboard"
quoted = re.search(r'"([^"]+)"', prompt)
if quoted:
name = quoted.group(1).strip()
# Already kebab-case or close to it
name = re.sub(r"[^a-zA-Z0-9\s-]", "", name).strip()
if name:
return re.sub(r"[\s]+", "-", name.lower())
# Second: check for "called X" or "named X" pattern
called = re.search(r'(?:called|named)\s+(\S+(?:[-_]\S+)*)', prompt, re.IGNORECASE)
if called:
name = re.sub(r"[^a-zA-Z0-9-]", "", called.group(1))
if len(name) > 3:
return name.lower()
# Fallback: extract meaningful words
words = re.sub(r"[^a-zA-Z0-9\s]", "", prompt.lower()).split()
skip = {"a", "the", "an", "me", "build", "create", "make", "for", "with", "and",
"to", "of", "i", "want", "need", "new", "project", "directory", "called",
"on", "desktop", "that", "application", "app", "full", "stack", "simple",
"web", "page", "site", "named"}
meaningful = [w for w in words if w not in skip and len(w) > 2][:4]
return "-".join(meaningful) if meaningful else "jarvis-project"