Skip to content

Getting as async generator object instead of data being to be received as an output through streamable-Http #784

Open
@koteswaran2186

Description

@koteswaran2186

Hi ,

I am working on getting a functionality using mcp tool where i need pass the raw data at client and retrieve it as chunk with configurable Streamable-http. Below is the code for my poc:-

server.py

import asyncio
from typing import AsyncGenerator
from fastmcp import FastMCP

mcp = FastMCP(name="MyServer")

@mcp.tool()
async def generateReport(params: dict = None) -> AsyncGenerator[str, None]:
"""Generate a report with streaming response using yield.

This processes the raw_data and returns it in chunks for streaming.
"""

# Convert the raw data to string format
raw_data_str = str(params)

# Define chunk size
chunk_size = 100

# Stream the content in chunks with index
chunk_index = 0
for i in range(0, len(raw_data_str), chunk_size):
    chunk = raw_data_str[i:i + chunk_size]
    yield f"chunk {chunk_index}: {chunk}\n"
    chunk_index += 1
    # Add delay to simulate processing time
    await asyncio.sleep(0.05)

if name == "main":
mcp.run(transport="streamable-http",
host="127.0.0.1",
port=8000)

democlient.py:-

import asyncio
from fastmcp import Client # type: ignore
from fastmcp.client.transports import StreamableHttpTransport # type: ignore

async def example():
transport = StreamableHttpTransport("http://127.0.0.1:8000/mcp/")
async with Client(transport=transport) as client:
await client.ping()
print("Ping is successful !")

    raw_data = {
        "title": "Sales Performance Report",
        "period": "Q4 2024",
        "metrics": ["revenue", "growth", "customer_satisfaction"],
        "department": ["Sales", "Marketing", "Customer Service"],
        "format": "detailed_analysis"
    }
    
    print(f"\nCalling generateReport with streaming response:")
    print(f"Parameters: {raw_data}")
    print("\nStreaming response...")
    print("-" * 50)
    
    try:
        # Call the streaming tool
        report_stream = await client.call_tool("generateReport", {"params": raw_data})
        
        print("Tool call successful!")
        print(f"Response type: {type(report_stream)}")
        
        # Handle different response types
        if hasattr(report_stream, '__aiter__'):
            print("Detected async iterator - processing stream...")
            chunk_count = 0
            full_output = ""
            async for chunk in report_stream:
                chunk_count += 1
                full_output += str(chunk)
                # Print chunk with better formatting
                print(f"Received: {str(chunk).strip()}")
                
            print(f"\nTotal chunks received: {chunk_count}")
            print(f"Total output length: {len(full_output)} characters")
            
        elif hasattr(report_stream, 'content') and hasattr(report_stream.content, '__aiter__'):
            print("Detected content stream - processing...")
            chunk_count = 0
            full_output = ""
            async for chunk in report_stream.content:
                chunk_count += 1
                full_output += str(chunk)
                print(f"Received: {str(chunk).strip()}")
                
            print(f"\nTotal chunks received: {chunk_count}")
            print(f"Total output length: {len(full_output)} characters")
            
        elif isinstance(report_stream, (list, tuple)):
            print("Received list/tuple response:")
            for i, item in enumerate(report_stream):
                if hasattr(item, 'text'):
                    print(f"Item {i}: {item.text}")
                else:
                    print(f"Item {i}: {str(item)}")
                    
        else:
            # Handle non-streaming response
            response_str = str(report_stream)
            print(f"Received non-streaming response: {len(response_str)} characters")
            print(f"Content: {response_str}")
                
    except Exception as e:
        print(f"Error during streaming: {e}")
        print(f"Error type: {type(e)}")

async def render_to_output():
"""UI Client like function to fetch and render the streamed response."""
transport = StreamableHttpTransport("http://127.0.0.1:8000/mcp/")
async with Client(transport=transport) as client:
print("\n" + "="*60)
print("RENDERING STREAMED RESPONSE TO OUTPUT")
print("="*60)

    # More comprehensive request data
    raw_request_data = {
        "title": "Comprehensive Business Analysis Report",
        "report_type": "comprehensive_analysis",
        "data_sources": ["sales_db", "customer_feedback", "market_research"],
        "time_range": {
            "start": "2024-01-01",
            "end": "2024-12-31"
        },
        "metrics": ["revenue", "customer_acquisition", "market_share", "efficiency"],
        "department": ["Sales", "Marketing", "Operations", "Finance"],
        "include_charts": True,
        "detail_level": "high",
        "output_format": "markdown",
        "format": "detailed_analysis"
    }
    
    print(f"Request parameters: {raw_request_data}")
    print("\nStreaming output:")
    print("-" * 40)
    
    output = ""
    chunk_count = 0
    
    try:
        # Call the streaming tool
        report_stream = await client.call_tool("generateReport", {"params": raw_request_data})
        
        print("Tool call successful!")
        print(f"Response type: {type(report_stream)}")
        
        # Handle different response types
        if hasattr(report_stream, '__aiter__'):
            print("Detected async iterator - processing stream...")
            async for chunk in report_stream:
                chunk_count += 1
                output += str(chunk)
                
                # Print the actual chunk content
                print(f"Received: {str(chunk).strip()}")
                    
        elif hasattr(report_stream, 'content') and hasattr(report_stream.content, '__aiter__'):
            print("Detected content stream - processing...")
            async for chunk in report_stream.content:
                chunk_count += 1
                output += str(chunk)
                
                print(f"Received: {str(chunk).strip()}")
                    
        elif isinstance(report_stream, (list, tuple)):
            print("Received list/tuple response:")
            for i, item in enumerate(report_stream):
                if hasattr(item, 'text'):
                    print(f"Item {i}: {item.text}")
                    output += item.text
                else:
                    print(f"Item {i}: {str(item)}")
                    output += str(item)
                    
        else:
            # Handle non-streaming response
            if isinstance(report_stream, str):
                output = report_stream
            else:
                output = str(report_stream)
            print(f"Non-streaming output received: {len(output)} characters")
            print(f"Content: {output}")
            
    except Exception as e:
        print(f"Error during streaming: {e}")
        print(f"Error type: {type(e)}")
    
    # Final output summary
    print("\n" + "-" * 40)
    print(f"FINAL SUMMARY:")
    print(f"Total chunks: {chunk_count}")
    print(f"Total output length: {len(output)} characters")
    
    if output:
        preview_len = 200
        print(f"\nOutput preview (first {preview_len} chars):")
        print("'" + output[:preview_len] + "'" + ("..." if len(output) > preview_len else ""))

if name == "main":
asyncio.run(example())
asyncio.run(render_to_output())

Getting output as below instead of raw data as incremental text chunks

Received non-streaming response: [TextContext(type="text", text="<async_generator object generateReport" at 0x0001C5809504480", annotations= None]"

Any help would be highly appreciated?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions