-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Expand file tree
/
Copy pathhandler.py
More file actions
401 lines (334 loc) · 15.4 KB
/
handler.py
File metadata and controls
401 lines (334 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
"""Action handler for processing AI model outputs."""
import ast
import re
import subprocess
import time
from dataclasses import dataclass
from typing import Any, Callable
from phone_agent.config.timing import TIMING_CONFIG
from phone_agent.device_factory import get_device_factory
@dataclass
class ActionResult:
"""Result of an action execution."""
success: bool
should_finish: bool
message: str | None = None
requires_confirmation: bool = False
class ActionHandler:
"""
Handles execution of actions from AI model output.
Args:
device_id: Optional ADB device ID for multi-device setups.
confirmation_callback: Optional callback for sensitive action confirmation.
Should return True to proceed, False to cancel.
takeover_callback: Optional callback for takeover requests (login, captcha).
"""
def __init__(
self,
device_id: str | None = None,
confirmation_callback: Callable[[str], bool] | None = None,
takeover_callback: Callable[[str], None] | None = None,
):
self.device_id = device_id
self.confirmation_callback = confirmation_callback or self._default_confirmation
self.takeover_callback = takeover_callback or self._default_takeover
def execute(
self, action: dict[str, Any], screen_width: int, screen_height: int
) -> ActionResult:
"""
Execute an action from the AI model.
Args:
action: The action dictionary from the model.
screen_width: Current screen width in pixels.
screen_height: Current screen height in pixels.
Returns:
ActionResult indicating success and whether to finish.
"""
action_type = action.get("_metadata")
if action_type == "finish":
return ActionResult(
success=True, should_finish=True, message=action.get("message")
)
if action_type != "do":
return ActionResult(
success=False,
should_finish=True,
message=f"Unknown action type: {action_type}",
)
action_name = action.get("action")
handler_method = self._get_handler(action_name)
if handler_method is None:
return ActionResult(
success=False,
should_finish=False,
message=f"Unknown action: {action_name}",
)
try:
return handler_method(action, screen_width, screen_height)
except Exception as e:
return ActionResult(
success=False, should_finish=False, message=f"Action failed: {e}"
)
def _get_handler(self, action_name: str) -> Callable | None:
"""Get the handler method for an action."""
handlers = {
"Launch": self._handle_launch,
"Tap": self._handle_tap,
"Type": self._handle_type,
"Type_Name": self._handle_type,
"Swipe": self._handle_swipe,
"Back": self._handle_back,
"Home": self._handle_home,
"Double Tap": self._handle_double_tap,
"Long Press": self._handle_long_press,
"Wait": self._handle_wait,
"Take_over": self._handle_takeover,
"Note": self._handle_note,
"Call_API": self._handle_call_api,
"Interact": self._handle_interact,
}
return handlers.get(action_name)
def _convert_relative_to_absolute(
self, element: list[int], screen_width: int, screen_height: int
) -> tuple[int, int]:
"""Convert relative coordinates (0-1000) to absolute pixels."""
x = int(element[0] / 1000 * screen_width)
y = int(element[1] / 1000 * screen_height)
return x, y
def _handle_launch(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle app launch action."""
app_name = action.get("app")
if not app_name:
return ActionResult(False, False, "No app name specified")
device_factory = get_device_factory()
success = device_factory.launch_app(app_name, self.device_id)
if success:
return ActionResult(True, False)
return ActionResult(False, False, f"App not found: {app_name}")
def _handle_tap(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle tap action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
# Check for sensitive operation
if "message" in action:
if not self.confirmation_callback(action["message"]):
return ActionResult(
success=False,
should_finish=True,
message="User cancelled sensitive operation",
)
device_factory = get_device_factory()
device_factory.tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_type(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle text input action."""
text = action.get("text", "")
device_factory = get_device_factory()
# Switch to ADB keyboard
original_ime = device_factory.detect_and_set_adb_keyboard(self.device_id)
time.sleep(TIMING_CONFIG.action.keyboard_switch_delay)
# Clear existing text and type new text
device_factory.clear_text(self.device_id)
time.sleep(TIMING_CONFIG.action.text_clear_delay)
# Handle multiline text by splitting on newlines
device_factory.type_text(text, self.device_id)
time.sleep(TIMING_CONFIG.action.text_input_delay)
# Restore original keyboard
device_factory.restore_keyboard(original_ime, self.device_id)
time.sleep(TIMING_CONFIG.action.keyboard_restore_delay)
return ActionResult(True, False)
def _handle_swipe(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle swipe action."""
start = action.get("start")
end = action.get("end")
if not start or not end:
return ActionResult(False, False, "Missing swipe coordinates")
start_x, start_y = self._convert_relative_to_absolute(start, width, height)
end_x, end_y = self._convert_relative_to_absolute(end, width, height)
device_factory = get_device_factory()
device_factory.swipe(start_x, start_y, end_x, end_y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_back(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle back button action."""
device_factory = get_device_factory()
device_factory.back(self.device_id)
return ActionResult(True, False)
def _handle_home(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle home button action."""
device_factory = get_device_factory()
device_factory.home(self.device_id)
return ActionResult(True, False)
def _handle_double_tap(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle double tap action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
device_factory = get_device_factory()
device_factory.double_tap(x, y, self.device_id)
return ActionResult(True, False)
def _handle_long_press(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle long press action."""
element = action.get("element")
if not element:
return ActionResult(False, False, "No element coordinates")
x, y = self._convert_relative_to_absolute(element, width, height)
device_factory = get_device_factory()
device_factory.long_press(x, y, device_id=self.device_id)
return ActionResult(True, False)
def _handle_wait(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle wait action."""
duration_str = action.get("duration", "1 seconds")
try:
duration = float(duration_str.replace("seconds", "").strip())
except ValueError:
duration = 1.0
time.sleep(duration)
return ActionResult(True, False)
def _handle_takeover(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle takeover request (login, captcha, etc.)."""
message = action.get("message", "User intervention required")
self.takeover_callback(message)
return ActionResult(True, False)
def _handle_note(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle note action (placeholder for content recording)."""
# This action is typically used for recording page content
# Implementation depends on specific requirements
return ActionResult(True, False)
def _handle_call_api(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle API call action (placeholder for summarization)."""
# This action is typically used for content summarization
# Implementation depends on specific requirements
return ActionResult(True, False)
def _handle_interact(self, action: dict, width: int, height: int) -> ActionResult:
"""Handle interaction request (user choice needed)."""
# This action signals that user input is needed
return ActionResult(True, False, message="User interaction required")
def _send_keyevent(self, keycode: str) -> None:
"""Send a keyevent to the device."""
from phone_agent.device_factory import DeviceType, get_device_factory
from phone_agent.hdc.connection import _run_hdc_command
device_factory = get_device_factory()
# Handle HDC devices with HarmonyOS-specific keyEvent command
if device_factory.device_type == DeviceType.HDC:
hdc_prefix = ["hdc", "-t", self.device_id] if self.device_id else ["hdc"]
# Map common keycodes to HarmonyOS keyEvent codes
# KEYCODE_ENTER (66) -> 2054 (HarmonyOS Enter key code)
if keycode == "KEYCODE_ENTER" or keycode == "66":
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
else:
# For other keys, try to use the numeric code directly
# If keycode is a string like "KEYCODE_ENTER", convert it
try:
# Try to extract numeric code from string or use as-is
if keycode.startswith("KEYCODE_"):
# For now, only handle ENTER, other keys may need mapping
if "ENTER" in keycode:
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", "2054"],
capture_output=True,
text=True,
)
else:
# Fallback to ADB-style command for unsupported keys
subprocess.run(
hdc_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
else:
# Assume it's a numeric code
_run_hdc_command(
hdc_prefix + ["shell", "uitest", "uiInput", "keyEvent", str(keycode)],
capture_output=True,
text=True,
)
except Exception:
# Fallback to ADB-style command
subprocess.run(
hdc_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
else:
# ADB devices use standard input keyevent command
cmd_prefix = ["adb", "-s", self.device_id] if self.device_id else ["adb"]
subprocess.run(
cmd_prefix + ["shell", "input", "keyevent", keycode],
capture_output=True,
text=True,
)
@staticmethod
def _default_confirmation(message: str) -> bool:
"""Default confirmation callback using console input."""
response = input(f"Sensitive operation: {message}\nConfirm? (Y/N): ")
return response.upper() == "Y"
@staticmethod
def _default_takeover(message: str) -> None:
"""Default takeover callback using console input."""
input(f"{message}\nPress Enter after completing manual operation...")
def parse_action(response: str) -> dict[str, Any]:
"""
Parse action from model response.
Args:
response: Raw response string from the model.
Returns:
Parsed action dictionary.
Raises:
ValueError: If the response cannot be parsed.
"""
print(f"Parsing action: {response}")
try:
response = response.strip()
if response.startswith('do(action="Type"') or response.startswith(
'do(action="Type_Name"'
):
text = response.split("text=", 1)[1][1:-2]
action = {"_metadata": "do", "action": "Type", "text": text}
return action
elif response.startswith("do"):
# Use AST parsing instead of eval for safety
try:
# Escape special characters (newlines, tabs, etc.) for valid Python syntax
response = response.replace('\n', '\\n')
response = response.replace('\r', '\\r')
response = response.replace('\t', '\\t')
response = response.replace("</answer>", "")
response = response.replace("<answer>", "")
tree = ast.parse(response, mode="eval")
if not isinstance(tree.body, ast.Call):
raise ValueError("Expected a function call")
call = tree.body
# Extract keyword arguments safely
action = {"_metadata": "do"}
for keyword in call.keywords:
key = keyword.arg
value = ast.literal_eval(keyword.value)
action[key] = value
return action
except (SyntaxError, ValueError) as e:
raise ValueError(f"Failed to parse do() action: {e}")
elif response.startswith("finish"):
action = {
"_metadata": "finish",
"message": response.replace("finish(message=", "")[1:-2],
}
else:
raise ValueError(f"Failed to parse action: {response}")
return action
except Exception as e:
raise ValueError(f"Failed to parse action: {e}")
def do(**kwargs) -> dict[str, Any]:
"""Helper function for creating 'do' actions."""
kwargs["_metadata"] = "do"
return kwargs
def finish(**kwargs) -> dict[str, Any]:
"""Helper function for creating 'finish' actions."""
kwargs["_metadata"] = "finish"
return kwargs