diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2926636..263cd3c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [1.0.3] - 2024-08-17
+
+### Added
+
+* `rosservice_call` tool for ROS1
+
+### Changed
+
+* Changed ROSA class methods from private to protected to allow easier overrides.
+* Updated ros1 `roslog` tools to handle multiple logging directories.
+* Upgrade dependencies:
+ * `langchain` to 0.2.13
+ * `langchain-community` to 0.2.12
+ * `langchain_core` to 0.2.32
+ * `langchain-openai` to 0.1.21
+
## [1.0.2] - 2024-08-14
### Changed
diff --git a/README.md b/README.md
index 1bf67b8..6e4d7c7 100644
--- a/README.md
+++ b/README.md
@@ -210,3 +210,10 @@ See our: [LICENSE](LICENSE)
Key points of contact are:
- [@RobRoyce](https://github.com/RobRoyce) ([email](mailto:01-laptop-voiced@icloud.com))
+
+---
+
+
+ ROSA: Robot Operating System Agent
+ Copyright (c) 2024. Jet Propulsion Laboratory. All rights reserved.
+
diff --git a/setup.py b/setup.py
index 327e39d..3d1a130 100644
--- a/setup.py
+++ b/setup.py
@@ -23,7 +23,7 @@
setup(
name="jpl-rosa",
- version="1.0.2",
+ version="1.0.3",
license="Apache 2.0",
description="ROSA: the Robot Operating System Agent",
long_description=long_description,
@@ -49,10 +49,10 @@
install_requires=[
"PyYAML==6.0.1",
"python-dotenv>=1.0.1",
- "langchain==0.2.7",
- "langchain-openai==0.1.14",
- "langchain-core==0.2.12",
- "langchain-community",
+ "langchain==0.2.13",
+ "langchain-community==0.2.12",
+ "langchain-core==0.2.32",
+ "langchain-openai==0.1.21",
"pydantic",
"pyinputplus",
"azure-identity",
diff --git a/src/rosa/rosa.py b/src/rosa/rosa.py
index 9e388d9..480ae9c 100644
--- a/src/rosa/rosa.py
+++ b/src/rosa/rosa.py
@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
from typing import Literal, Union, Optional
from langchain.agents import AgentExecutor
@@ -61,7 +60,7 @@ def __init__(
verbose: bool = False,
blacklist: Optional[list] = None,
accumulate_chat_history: bool = True,
- show_token_usage: bool = False,
+ show_token_usage: bool = True,
):
self.__chat_history = []
self.__ros_version = ros_version
@@ -71,18 +70,26 @@ def __init__(
self.__show_token_usage = show_token_usage
self.__blacklist = blacklist if blacklist else []
self.__accumulate_chat_history = accumulate_chat_history
- self.__tools = self.__get_tools(
+ self.__tools = self._get_tools(
ros_version, packages=tool_packages, tools=tools, blacklist=self.__blacklist
)
- self.__prompts = self.__get_prompts(prompts)
+ self.__prompts = self._get_prompts(prompts)
self.__llm_with_tools = llm.bind_tools(self.__tools.get_tools())
- self.__agent = self.__get_agent()
- self.__executor = self.__get_executor(verbose=verbose)
+ self.__agent = self._get_agent()
+ self.__executor = self._get_executor(verbose=verbose)
+ self.__usage = None
+
+ @property
+ def chat_history(self):
+ return self.__chat_history
+
+ @property
+ def usage(self):
+ return self.__usage
def clear_chat(self):
"""Clear the chat history."""
self.__chat_history = []
- os.system("clear")
def invoke(self, query: str) -> str:
"""Invoke the agent with a user query."""
@@ -91,33 +98,22 @@ def invoke(self, query: str) -> str:
result = self.__executor.invoke(
{"input": query, "chat_history": self.__chat_history}
)
+ self.__usage = cb
if self.__show_token_usage:
- print(f"[bold]Prompt Tokens:[/bold] {cb.prompt_tokens}")
- print(f"[bold]Completion Tokens:[/bold] {cb.completion_tokens}")
- print(f"[bold]Total Cost (USD):[/bold] ${cb.total_cost}")
+ self._print_usage()
except Exception as e:
- if f"{e}".strip() == "":
- self.__record_chat_history(
- query,
- "An error with no description occurred. This is known to happen when multiple tools are used "
- "concurrently. Please try again.",
- )
- try:
- result = self.__executor.invoke(
- {
- "input": "Please try again.",
- "chat_history": self.__chat_history,
- }
- )
- except Exception as e:
- return "An error with no description occurred. This is known to happen when multiple tools are used concurrently. Please try again."
- else:
- return f"An error occurred: {e}"
-
- self.__record_chat_history(query, result["output"])
+ return f"An error occurred: {e}"
+
+ self._record_chat_history(query, result["output"])
return result["output"]
- def __get_executor(self, verbose: bool):
+ def _print_usage(self):
+ cb = self.__usage
+ print(f"[bold]Prompt Tokens:[/bold] {cb.prompt_tokens}")
+ print(f"[bold]Completion Tokens:[/bold] {cb.completion_tokens}")
+ print(f"[bold]Total Cost (USD):[/bold] ${cb.total_cost}")
+
+ def _get_executor(self, verbose: bool):
executor = AgentExecutor(
agent=self.__agent,
tools=self.__tools.get_tools(),
@@ -126,7 +122,7 @@ def __get_executor(self, verbose: bool):
)
return executor
- def __get_agent(self):
+ def _get_agent(self):
agent = (
{
"input": lambda x: x["input"],
@@ -141,7 +137,7 @@ def __get_agent(self):
)
return agent
- def __get_tools(
+ def _get_tools(
self,
ros_version: Literal[1, 2],
packages: Optional[list],
@@ -155,7 +151,7 @@ def __get_tools(
rosa_tools.add_packages(packages, blacklist=blacklist)
return rosa_tools
- def __get_prompts(self, robot_prompts: Optional[RobotSystemPrompts] = None):
+ def _get_prompts(self, robot_prompts: Optional[RobotSystemPrompts] = None):
prompts = system_prompts
if robot_prompts:
prompts.append(robot_prompts.as_message())
@@ -169,7 +165,7 @@ def __get_prompts(self, robot_prompts: Optional[RobotSystemPrompts] = None):
)
return template
- def __record_chat_history(self, query: str, response: str):
+ def _record_chat_history(self, query: str, response: str):
if self.__accumulate_chat_history:
self.__chat_history.extend(
[HumanMessage(content=query), AIMessage(content=response)]
diff --git a/src/rosa/tools/ros1.py b/src/rosa/tools/ros1.py
index 7f1166d..9ed7e2a 100644
--- a/src/rosa/tools/ros1.py
+++ b/src/rosa/tools/ros1.py
@@ -334,7 +334,7 @@ def rostopic_echo(
topic: str,
count: int,
return_echoes: bool = False,
- delay: float = 0.0,
+ delay: float = 1.0,
timeout: float = 1.0,
) -> dict:
"""
@@ -477,6 +477,21 @@ def rosservice_info(services: List[str]) -> dict:
return details
+@tool
+def rosservice_call(service: str, args: List[str]) -> dict:
+ """Calls a specific ROS service with the provided arguments.
+
+ :param service: The name of the ROS service to call.
+ :param args: A list of arguments to pass to the service.
+ """
+ print(f"Calling ROS service '{service}' with arguments: {args}")
+ try:
+ response = rosservice.call_service(service, args)
+ return response
+ except Exception as e:
+ return {"error": f"Failed to call service '{service}': {e}"}
+
+
@tool
def rosmsg_info(msg_type: List[str]) -> dict:
"""Returns details about a specific ROS message type.
@@ -658,38 +673,71 @@ def roslog_list(min_size: int = 2048, blacklist: Optional[List[str]] = None) ->
:param min_size: The minimum size of the log file in bytes to include in the list.
"""
- rospy.loginfo("Getting ROS log files")
- log_dir = f"{rospkg.get_log_dir()}/"
- logs = os.listdir(log_dir)
- # Filter out any log files that match any of the blacklist patterns
- logs = list(
- filter(
- lambda x: not any(regex.match(f".*{pattern}", x) for pattern in blacklist),
- logs,
- )
- )
+ logs = []
+ log_dirs = get_roslog_directories.invoke({})
- # Get the log file sizes, in bytes
- log_sizes = {}
- for log in logs:
- log_path = os.path.join(log_dir, log)
- size = os.path.getsize(log_path)
- if size >= min_size:
- log_sizes[log] = size
+ for _, log_dir in log_dirs.items():
+ if not log_dir:
+ continue
- # Sort the list by size (largest first)
- log_sizes = dict(sorted(log_sizes.items(), key=lambda item: item[1], reverse=True))
+ # Get all .log files in the directory
+ log_files = [
+ os.path.join(log_dir, f)
+ for f in os.listdir(log_dir)
+ if os.path.isfile(os.path.join(log_dir, f)) and f.endswith(".log")
+ ]
- return {
- "log_file_directory": log_dir,
- "logs_with_size_in_bytes": log_sizes,
- "notes": "Recommend only displaying the top N log files when you present this list to the user.",
- }
+ # Filter out blacklisted files
+ if blacklist:
+ log_files = list(
+ filter(
+ lambda x: not any(
+ regex.match(f".*{pattern}.*", x) for pattern in blacklist
+ ),
+ log_files,
+ )
+ )
+
+ # Filter out files that are too small
+ log_files = list(filter(lambda x: os.path.getsize(x) > min_size, log_files))
+
+ # Get the size of each log file in KB or MB if it's larger than 1 MB
+ log_files = [
+ {
+ f.replace(log_dir, ""): (
+ f"{round(os.path.getsize(f) / 1024, 2)} KB"
+ if os.path.getsize(f) < 1024 * 1024
+ else f"{round(os.path.getsize(f) / (1024 * 1024), 2)} MB"
+ ),
+ }
+ for f in log_files
+ ]
+
+ if len(log_files) > 0:
+ logs.append(
+ {
+ "directory": log_dir,
+ "total": len(log_files),
+ "files": log_files,
+ }
+ )
+
+ return dict(
+ total=len(logs),
+ logs=logs,
+ )
@tool
-def roslog_get_log_directory() -> str:
- """Returns the path to the ROS log directory."""
- rospy.loginfo("Getting ROS log directory")
- return f"{rospkg.get_log_dir()}/"
+def get_roslog_directories() -> dict:
+ """Returns any available ROS log directories."""
+ default_directory = rospkg.get_log_dir()
+ latest_directory = os.path.join(default_directory, "latest")
+ from_env = os.getenv("ROS_LOG_DIR")
+
+ return dict(
+ default=default_directory,
+ latest=latest_directory,
+ from_env=from_env,
+ )
diff --git a/src/turtle_agent/scripts/turtle_agent.py b/src/turtle_agent/scripts/turtle_agent.py
index 8d5956b..494bfa8 100755
--- a/src/turtle_agent/scripts/turtle_agent.py
+++ b/src/turtle_agent/scripts/turtle_agent.py
@@ -38,7 +38,7 @@ def cool_turtle_tool():
class TurtleAgent(ROSA):
def __init__(self, verbose: bool = True):
- self.__blacklist = ["master"]
+ self.__blacklist = ["master", "docker"]
self.__prompts = get_prompts()
self.__llm = get_llm()
@@ -70,9 +70,9 @@ def run(self):
if user_input == "exit":
break
elif user_input == "help":
- output = self.invoke(self.__get_help())
+ output = self.invoke(self.get_help())
elif user_input == "examples":
- examples = self.__examples()
+ examples = self.examples()
example = pyip.inputMenu(
choices=examples,
numbered=True,
@@ -87,8 +87,8 @@ def run(self):
output = self.invoke(user_input)
console.print(Markdown(output))
- def __get_help(self) -> str:
- examples = self.__examples()
+ def get_help(self) -> str:
+ examples = self.examples()
help_text = f"""
The user has typed --help. Please provide a CLI-style help message. Use the following
@@ -122,7 +122,7 @@ def __get_help(self) -> str:
"""
return help_text
- def __examples(self):
+ def examples(self):
return [
"Give me a ROS tutorial using the turtlesim.",
"Show me how to move the turtle forward.",