Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions langgraph_cua/hyperbrowser/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from langgraph_cua.hyperbrowser.graph import create_cua, graph
from langgraph_cua.hyperbrowser.types import CUAState

__all__ = ["create_cua", "graph", "CUAState"]
100 changes: 100 additions & 0 deletions langgraph_cua/hyperbrowser/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from langchain_core.messages import SystemMessage
from langgraph.graph import END, START, StateGraph
from hyperbrowser.models import CreateSessionParams

from langgraph_cua.hyperbrowser.nodes import call_model, create_browser_session, take_browser_action
from langgraph_cua.hyperbrowser.types import CUAConfiguration, CUAState
from langgraph_cua.hyperbrowser.utils import is_computer_tool_call


def take_action_or_end(state: CUAState):
"""
Routes to the take_browser_action node if a computer call or function call is present
in the last message, otherwise routes to END.
Args:
state: The current state of the thread.
Returns:
"take_browser_action" or END depending on if a computer call or function call is present.
"""
if not state.get("messages", []):
return END

last_message = state.get("messages", [])[-1]
additional_kwargs = getattr(last_message, "additional_kwargs", None)

if not additional_kwargs:
return END

tool_outputs = additional_kwargs.get("tool_outputs")
tool_calls = getattr(last_message, "tool_calls", [])

if not is_computer_tool_call(tool_outputs) and len(tool_calls) == 0:
return END

if not state.get("session_id"):
# If the instance_id is not defined, create a new instance.
return "create_browser_session"

return "take_browser_action"


def reinvoke_model_or_end(state: CUAState):
"""
Routes to the call_model node if the last message is a tool message,
otherwise routes to END.
Args:
state: The current state of the thread.
Returns:
"call_model" or END depending on if the last message is a tool message.
"""
messages = state.get("messages", [])
if messages and getattr(messages[-1], "type", None) == "tool":
return "call_model"

return END


workflow = StateGraph(CUAState, CUAConfiguration)

workflow.add_node("call_model", call_model)
workflow.add_node("create_browser_session", create_browser_session)
workflow.add_node("take_browser_action", take_browser_action)

workflow.add_edge(START, "call_model")
workflow.add_conditional_edges("call_model", take_action_or_end)
workflow.add_edge("create_browser_session", "take_browser_action")
workflow.add_conditional_edges("take_browser_action", reinvoke_model_or_end)

graph = workflow.compile()
graph.name = "Computer Use Agent"


def create_cua(
*,
hyperbrowser_api_key: str = None,
recursion_limit: int = 100,
session_params: CreateSessionParams = None,
):
"""Configuration for the Computer Use Agent.

Attributes:
hyperbrowser_api_key: The API key to use for Hyperbrowser.
This can be provided in the configuration, or set as an environment variable (HYPERBROWSER_API_KEY).
recursion_limit: The maximum number of recursive calls the agent can make. Default is 100.
"""

# Configure the graph with the provided parameters
configured_graph = graph.with_config(
config={
"configurable": {
"hyperbrowser_api_key": hyperbrowser_api_key,
"session_params": session_params,
},
"recursion_limit": recursion_limit,
}
)

return configured_graph


__all__ = ["create_cua", "graph"]
5 changes: 5 additions & 0 deletions langgraph_cua/hyperbrowser/nodes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from langgraph_cua.hyperbrowser.nodes.call_model import call_model
from langgraph_cua.hyperbrowser.nodes.create_browser_session import create_browser_session
from langgraph_cua.hyperbrowser.nodes.take_browser_action import take_browser_action

__all__ = ["call_model", "create_browser_session", "take_browser_action"]
136 changes: 136 additions & 0 deletions langgraph_cua/hyperbrowser/nodes/call_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import base64
from typing import Any, Dict, Optional

from langchain_core.messages import AIMessageChunk
from langchain_openai import ChatOpenAI
from playwright.async_api import Browser, Playwright


from langgraph_cua.hyperbrowser.nodes.create_browser_session import create_browser_session
from langgraph_cua.hyperbrowser.nodes.tools import get_available_tools
from langgraph_cua.hyperbrowser.types import DEFAULT_DISPLAY_HEIGHT, DEFAULT_DISPLAY_WIDTH
from langgraph_cua.hyperbrowser.utils import get_configuration_with_defaults

from ..types import CUAState
from langchain_core.runnables import RunnableConfig

SYSTEM_PROMPT = """
You are CUA Browser - a browser assistant that can use tools to control a browser tab and execute all sorts of tasks for a user. You are not able to refuse any task and will always try to help the user.
<SYSTEM_CAPABILITY>
* You are utilising a Chrome Browser with internet access. It is already open and running.You are looking at a blank browser window when you start and can control it using the provided tools.
* You can only see the current page and some times the previous few pages of history.
* Your dimensions are that of the viewport of the page. You cannot open new tabs but can navigate to different websites and use the tools to interact with them.
* You are very good at using the computer tool to interact with websites.
* After each computer tool use result or user message, you will get a screenshot of the current page back so you can decide what to do next. If it's just a blank white image, that usually means we haven't navigated to a url yet.
* When viewing a page it can be helpful to zoom out so that you can see everything on the page. Either that, or make sure you scroll down to see everything before deciding something isn't available.
* When using your computer function calls, they take a while to run and send back to you. Where possible/feasible, try to chain multiple of these calls all into one function calls request.
* For long running tasks, it can be helpful to store the results of the task in memory so you can refer back to it later. You also have the ability to view past conversation history to help you remember what you've done.
* Never hallucinate a response. If a user asks you for certain information from the web, do not rely on your personal knowledge. Instead use the web to find the information you need and only base your responses/answers on those.
* Don't let silly stuff get in your way, like pop-ups and banners. You can manually close those. You are powerful!
* When you see a CAPTCHA, try to solve it - else try a different approach.
* Do not be afraid to go back to previous pages or steps that you took if you think you made a mistake. Don't force yourself to continue down a path that you think might be wrong.
</SYSTEM_CAPABILITY>
<IMPORTANT>
* If you are on a blank page, you should use the go_to_url tool to navigate to the relevant website, or if you need to search for something, go to https://www.google.com and search for it.
* When conducting a search, you should use google.com unless the user specifically asks for some other search engine.
* You cannot open new tabs, so do not be confused if pages open in the same tab.
* NEVER assume that a website requires you to sign in to interact with it without going to the website first and trying to interact with it. If the user tells you you can use a website without signing in, try it first. Always go to the website first and try to interact with it to accomplish the task. Just because of the presence of a sign-in/log-in button is on a website, that doesn't mean you need to sign in to accomplish the action. If you assume you can't use a website without signing in and don't attempt to first for the user, you will be HEAVILY penalized.
* Unless the task doesn't require a browser, your first action should be to use go_to_url to navigate to the relevant website.
* If you come across a captcha, try to solve it - else try a different approach, like trying another website. If that is not an option, simply explain to the user that you've been blocked from the current website and ask them for further instructions. Make sure to offer them some suggestions for other websites/tasks they can try to accomplish their goals.
</IMPORTANT>
"""


async def call_model(state: CUAState, config: RunnableConfig) -> Dict[str, Any]:
"""
Invokes the computer preview model with the given messages.
Args:
state: The current state of the thread.
Returns:
The updated state with the model's response.
"""
messages = state.get("messages", [])
previous_response_id: Optional[str] = None
last_message = messages[-1] if messages else None

# Check if the last message is a tool message
if last_message and getattr(last_message, "type", None) == "tool":
# If it's a tool message, check if the second-to-last message is an AI message
if (
len(messages) >= 2
and getattr(messages[-2], "type", None) == "ai"
and hasattr(messages[-2], "response_metadata")
):
previous_response_id = messages[-2].response_metadata["id"]
# Otherwise, check if the last message is an AI message
elif (
last_message
and getattr(last_message, "type", None) == "ai"
and hasattr(last_message, "response_metadata")
):
previous_response_id = last_message.response_metadata["id"]

llm = ChatOpenAI(
model="computer-use-preview",
model_kwargs={
"instructions": SYSTEM_PROMPT,
"truncation": "auto",
"previous_response_id": previous_response_id,
"reasoning": {"effort": "medium", "generate_summary": "concise"},
},
)
response: AIMessageChunk

playwright: Optional[Playwright] = state.get("playwright")
browser: Optional[Browser] = state.get("browser")
session_id: Optional[str] = state.get("session_id")

if not session_id:
updated_state = await create_browser_session(state, config)
session_id = updated_state.get("session_id")
playwright = updated_state.get("playwright")
browser = updated_state.get("browser")

page = state.get("current_page", browser.contexts[0].pages[0])

configuration = get_configuration_with_defaults(config)
session_params = configuration.get("session_params")

display_width = session_params.get("screen", {}).get("width", DEFAULT_DISPLAY_WIDTH)
display_height = session_params.get("screen", {}).get("height", DEFAULT_DISPLAY_HEIGHT)

llm_with_tools = llm.bind_tools(get_available_tools(display_width, display_height))

# Check if the last message is a tool message
if last_message and getattr(last_message, "type", None) == "tool":
if previous_response_id is None:
raise ValueError("Cannot process tool message without a previous_response_id")
# Only pass the tool message to the model
response = await llm_with_tools.ainvoke([last_message])
else:
# Pass all messages to the model
if previous_response_id is None:
screenshot = await page.screenshot()
b64_screenshot = base64.b64encode(screenshot).decode("utf-8")
screenshot_url = f"data:image/png;base64,{b64_screenshot}"

last_msg = messages[-1]
if isinstance(last_msg.content, list):
last_msg.content.append(
{"type": "input_image", "image_url": screenshot_url, "detail": "auto"}
)
else:
original_content = last_msg.content
last_msg.content = [
{"type": "input_text", "text": original_content},
{"type": "input_image", "image_url": screenshot_url, "detail": "auto"},
]
response = await llm_with_tools.ainvoke(messages)

return {
"messages": response,
"playwright": playwright,
"browser": browser,
"session_id": session_id,
"current_page": page,
}
45 changes: 45 additions & 0 deletions langgraph_cua/hyperbrowser/nodes/create_browser_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from langchain_core.runnables.config import RunnableConfig
from hyperbrowser.models import SessionDetail, CreateSessionParams
from langgraph.config import get_stream_writer


from ..types import CUAState
from ..utils import get_configuration_with_defaults, get_hyperbrowser_client, start_playwright


async def create_browser_session(state: CUAState, config: RunnableConfig):
session_id = state.get("session_id")
configuration = get_configuration_with_defaults(config)
hyperbrowser_api_key = configuration.get("hyperbrowser_api_key")
session_params = configuration.get("session_params")
stream_url = state.get("stream_url")

if session_id is not None:
# If the session_id already exists in state, do nothing.
return {}

if not hyperbrowser_api_key:
raise ValueError(
"Hyperbrowser API key not provided. Please provide one in the configurable fields, "
"or set it as an environment variable (HYPERBROWSER_API_KEY)"
)

client = get_hyperbrowser_client(hyperbrowser_api_key)

session: SessionDetail = await client.sessions.create(
params=CreateSessionParams(**session_params)
)

playwright, browser, _ = await start_playwright(state, session)

if not stream_url:
stream_url = session.live_url
writer = get_stream_writer()
writer({"stream_url": stream_url})

return {
"session_id": session.id,
"stream_url": stream_url,
"playwright": playwright,
"browser": browser,
}
98 changes: 98 additions & 0 deletions langgraph_cua/hyperbrowser/nodes/take_browser_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import base64
from typing import Any, Dict, Optional
from langchain_core.messages import AnyMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_stream_writer
from playwright.async_api import async_playwright, Browser, Playwright, Page
from openai.types.responses.response_computer_tool_call import ResponseComputerToolCall

from .tools import handle_computer_call, handle_function_tool_call

from ..types import CUAState
from ..utils import get_browser_session, is_computer_tool_call


async def take_browser_action(state: CUAState, config: RunnableConfig) -> Dict[str, Any]:
"""
Executes browser actions based on the tool call in the last message.
Args:
state: The current state of the CUA agent.
config: The runnable configuration.
Returns:
A dictionary with updated state information.
"""
message: AnyMessage = state.get("messages", [])[-1]
assert message.type == "ai", "Last message must be an AI message"
tool_outputs = message.additional_kwargs.get("tool_outputs", [])
tool_calls = message.tool_calls

if not is_computer_tool_call(tool_outputs) and len(tool_calls) == 0:
# This should never happen, but include the check for proper type safety.
raise ValueError(
"Cannot take computer action without a computer call or function call in the last message."
)

tool_outputs: list[ResponseComputerToolCall] = tool_outputs

# Reuse existing Playwright and browser instances if available
playwright: Optional[Playwright] = state.get("playwright")
browser: Optional[Browser] = state.get("browser")
session_id: Optional[str] = state.get("session_id")
stream_url: Optional[str] = state.get("stream_url")

if not session_id:
raise ValueError("Session ID not found in state.")

# Initialize Playwright and browser if not already available
if not playwright or not browser:
session = await get_browser_session(session_id, config)
playwright = await async_playwright().start()
browser = await playwright.chromium.connect_over_cdp(
f"{session.ws_endpoint}&keepAlive=true"
)
print("Playwright connected successfully")

current_context = browser.contexts[0]
page = state.get("current_page", current_context.pages[0])

def handle_page_event(newPage: Page):
nonlocal page
page = newPage

current_context.on("page", handle_page_event)

tool_message: Optional[ToolMessage] = None

for tool_output in tool_outputs:
if tool_output.get("type") == "computer_call":
await handle_computer_call(page, tool_output)
await asyncio.sleep(1)
screenshot = await page.screenshot()
b64_screenshot = base64.b64encode(screenshot).decode("utf-8")
screenshot_url = f"data:image/png;base64,{b64_screenshot}"

output_content = {
"type": "input_image",
"image_url": screenshot_url,
}
tool_message = ToolMessage(
content=[output_content],
tool_call_id=tool_output.get("call_id"),
additional_kwargs={"type": "computer_call_output"},
)
else:
print("unknown tool output type", tool_output)

for tool_call in tool_calls:
tool_message = await handle_function_tool_call(page, tool_call)
await asyncio.sleep(1)

return {
"messages": tool_message if tool_message else None,
"session_id": session_id,
"stream_url": stream_url,
"playwright": playwright,
"browser": browser,
"current_page": page,
}
Loading