1414
1515import re
1616from enum import Enum
17+ from types import GeneratorType
1718from typing import (
1819 TYPE_CHECKING ,
1920 Any ,
2021 Callable ,
2122 Dict ,
2223 Generator ,
24+ Iterable ,
2325 List ,
2426 Literal ,
2527 Optional ,
3133
3234if TYPE_CHECKING :
3335 from camel .agents import ChatAgent
34- from camel .agents .chat_agent import StreamingChatAgentResponse
3536import uuid
3637
3738from camel .logger import get_logger
@@ -409,6 +410,7 @@ def decompose(
409410 agent : "ChatAgent" ,
410411 prompt : Optional [str ] = None ,
411412 task_parser : Callable [[str , str ], List ["Task" ]] = parse_response ,
413+ stream_callback : Optional [Callable [[str ], None ]] = None ,
412414 ) -> Union [List ["Task" ], Generator [List ["Task" ], None , None ]]:
413415 r"""Decompose a task to a list of sub-tasks. Automatically detects
414416 streaming or non-streaming based on agent configuration.
@@ -441,15 +443,27 @@ def decompose(
441443 # Auto-detect streaming based on response type
442444 from camel .agents .chat_agent import StreamingChatAgentResponse
443445
444- if isinstance (response , StreamingChatAgentResponse ):
445- return self ._decompose_streaming (response , task_parser )
446- else :
447- return self ._decompose_non_streaming (response , task_parser )
446+ is_streaming = isinstance (
447+ response , StreamingChatAgentResponse
448+ ) or isinstance (response , GeneratorType )
449+ if (
450+ not is_streaming
451+ and hasattr (response , "__iter__" )
452+ and not hasattr (response , "msg" )
453+ ):
454+ is_streaming = True
455+
456+ if is_streaming :
457+ return self ._decompose_streaming (
458+ response , task_parser , stream_callback = stream_callback
459+ )
460+ return self ._decompose_non_streaming (response , task_parser )
448461
449462 def _decompose_streaming (
450463 self ,
451- response : "StreamingChatAgentResponse" ,
464+ response : Iterable ,
452465 task_parser : Callable [[str , str ], List ["Task" ]],
466+ stream_callback : Optional [Callable [[str ], None ]] = None ,
453467 ) -> Generator [List ["Task" ], None , None ]:
454468 r"""Handle streaming response for task decomposition.
455469
@@ -466,6 +480,14 @@ def _decompose_streaming(
466480 # Process streaming response
467481 for chunk in response :
468482 accumulated_content = chunk .msg .content
483+ if stream_callback :
484+ try :
485+ stream_callback (accumulated_content )
486+ except Exception :
487+ logger .warning (
488+ "stream_callback failed during decomposition" ,
489+ exc_info = True ,
490+ )
469491
470492 # Try to parse partial tasks from accumulated content
471493 try :
0 commit comments