Skip to content

TypeError: Object of type AsyncCallbackManagerForToolRun is not JSON serializable #30441

Open
@sei-li-miidas

Description

@sei-li-miidas

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Example Code

# -*- coding: utf-8 -*-
"""langgraph-agents-with-openai.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1S-hf5LsZciW0eTcu3hNUJ0yiQ-6DX_3P
"""

pip install python-dotenv langgraph langchain_core langchain_openai langgraph-checkpoint langgraph-checkpoint-sqlite asyncio

import os
import sys
import json
import re
import pprint
from dotenv import load_dotenv

import warnings

warnings.filterwarnings("ignore")
import logging

# Set basic configs
log_level = os.environ.get("LOG_LEVEL", "INFO").strip().upper()
logging.basicConfig(format="[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(log_level)

pprint.PrettyPrinter(indent=2, width=100)

_ = load_dotenv(".env")

from langchain_openai import ChatOpenAI

model = ChatOpenAI(
                model="gpt-4o",
                temperature=1,
                top_p=0.999,
                # model_kwargs={"top_k": 250},
            )

from langchain_core.tools import tool
from langchain_core.tools import StructuredTool
import asyncio
from typing import Any

def save_user_name(**arguments: dict[str, Any],) -> str:
    """Save user name

    Args:
        user_name: user name
    """
    print(arguments)
    user_name = arguments.get("user_name")

    print(f"User name saved: {user_name}")
    return f"User name saved: {user_name}"

async def sleep(
        **arguments: dict[str, Any],
    ) -> tuple[str | list[Any] | None]:
    """Sleep for a while

    Args:
        seconds: How long to sleep
    """
    seconds = int(arguments["seconds"])

    print(f"Sleep: {seconds} seconds")
    await asyncio.sleep(seconds)
    return "good"

my_tool = [
    StructuredTool(
        name="save_user_name",
        description="Save user name",
        args_schema={
            "type": "object",
            "required": ["user_name"],
            "properties": {"user_name": {"type": "string", "description": "User name"}},
        },
        coroutine=save_user_name,
        func=save_user_name,
        # response_format="content_and_artifact",
        response_format="content",
    ),
    StructuredTool(
        name="sleep",
        description="Sleep for a while",
        args_schema={
            "type": "object",
            "required": ["seconds"],
            "properties": {"seconds": {"type": "number", "description": "How long to sleep"}},
        },
        coroutine=sleep,
        func=sleep,
        # response_format="content_and_artifact",
        response_format="content",
    ),
]

from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage

class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

from langgraph.graph import StateGraph, END
from langchain_core.messages import SystemMessage, ToolMessage, HumanMessage

class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges(
            "llm", self.exists_action, {True: "action", False: END}
        )
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call(self, state: AgentState):
        messages = state["messages"]
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {"messages": [message]}

    def exists_action(self, state: AgentState):
        result = state["messages"][-1]
        return len(result.tool_calls) > 0

    async def take_action(self, state: AgentState):
        tool_calls = state["messages"][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = await self.tools[t["name"]].ainvoke(t["args"])
            results.append(
                ToolMessage(tool_call_id=t["id"], name=t["name"], content=str(result))
            )
        print(f"Back to the model! {result}")
        return {"messages": results}

prompt = """Help user with his/her requests"""

"""## Streaming tokens"""

async def chat(agent, message: str, thread_id: str = None):
    messages = [HumanMessage(content=message)]
    thread = {"configurable": {"thread_id": thread_id}}
    index = None
    async for event in agent.graph.astream_events({"messages": messages}, thread):
        # print(event)
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                yield content
            # if content:
            #     # Empty content in the context of Amazon Bedrock means
            #     # that the model is asking for a tool to be invoked.
            #     print(content)
            #     content = content[0]
            #     if index is None:
            #         index = content["index"]
            #     elif index != content["index"]:
            #         index = content["index"]
            #         yield "\n"
            #     if "text" in content:
            #         yield content["text"]

# from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver

# # If you are using a newer version of LangGraph, the package was separated:
# # !pip install langgraph-checkpoint-sqlite

# from langgraph.checkpoint.memory import MemorySaver
# from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

import random

thread = {"configurable": {"thread_id": random.random()}}

async with AsyncSqliteSaver.from_conn_string(":memory:") as memory:
    abot = Agent(model, my_tool, system=prompt, checkpointer=memory)

    # async for chunk in chat("こんにちは", thread_id=thread):
    #     print(chunk, end="")

    _input = "Hello!"
    while True:
        async for chunk in chat(abot, _input, thread_id=thread):
            print(chunk, end="")

        _input = input("question:")
        if _input == "exit":
          break

Error Message and Stack Trace (if applicable)

Hello! How can I assist you today?question:sleep for 2 seconds
Calling: {'name': 'sleep', 'args': {'seconds': 2}, 'id': 'call_1FEWHqyefCj1okwFldVUpHx0', 'type': 'tool_call'}
Sleep: 2 seconds
Back to the model! good
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
[<ipython-input-55-06253ef4b913>](https://localhost:8080/#) in <cell line: 1>()
     20     _input = "Hello!"
     21     while True:
---> 22         async for chunk in chat(abot, _input, thread_id=thread):
     23             print(chunk, end="")
     24 

35 frames
[/usr/lib/python3.11/json/encoder.py](https://localhost:8080/#) in default(self, o)
    178 
    179         """
--> 180         raise TypeError(f'Object of type {o.__class__.__name__} '
    181                         f'is not JSON serializable')
    182 

TypeError: Object of type AsyncCallbackManagerForToolRun is not JSON serializable

Description

When an async tool using coroutine of StructuredTool is called, error TypeError: Object of type AsyncCallbackManagerForToolRun is not JSON serializable always happens.

System Info

google colab

Metadata

Metadata

Assignees

No one assigned

    Labels

    🤖:bugRelated to a bug, vulnerability, unexpected error with an existing feature

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions