Skip to content

Commit 0c013f4

Browse files
feat: add task stream_callback (#3446)
Co-authored-by: Waleed Alzarooni <[email protected]>
1 parent 77b48c7 commit 0c013f4

File tree

2 files changed

+52
-8
lines changed

2 files changed

+52
-8
lines changed

camel/societies/workforce/workforce.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from .workforce_metrics import WorkforceMetrics
4242

4343
if TYPE_CHECKING:
44+
from camel.responses import ChatAgentResponse
4445
from camel.utils.context_utils import ContextUtility, WorkflowSummary
4546

4647
from colorama import Fore
@@ -1316,13 +1317,20 @@ def _cleanup_task_tracking(self, task_id: str) -> None:
13161317
del self._assignees[task_id]
13171318

13181319
def _decompose_task(
1319-
self, task: Task
1320+
self,
1321+
task: Task,
1322+
stream_callback: Optional[
1323+
Callable[["ChatAgentResponse"], None]
1324+
] = None,
13201325
) -> Union[List[Task], Generator[List[Task], None, None]]:
13211326
r"""Decompose the task into subtasks. This method will also set the
13221327
relationship between the task and its subtasks.
13231328
13241329
Args:
13251330
task (Task): The task to decompose.
1331+
stream_callback (Callable[[ChatAgentResponse], None], optional): A
1332+
callback function that receives each chunk (ChatAgentResponse)
1333+
during streaming decomposition.
13261334
13271335
Returns:
13281336
Union[List[Task], Generator[List[Task], None, None]]:
@@ -1341,7 +1349,9 @@ def _decompose_task(
13411349
)
13421350
)
13431351
self.task_agent.reset()
1344-
result = task.decompose(self.task_agent, decompose_prompt)
1352+
result = task.decompose(
1353+
self.task_agent, decompose_prompt, stream_callback=stream_callback
1354+
)
13451355

13461356
# Handle both streaming and non-streaming results
13471357
if isinstance(result, Generator):

camel/tasks/task.py

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
import re
1616
from enum import Enum
17+
from types import GeneratorType
1718
from typing import (
1819
TYPE_CHECKING,
1920
Any,
2021
Callable,
2122
Dict,
2223
Generator,
24+
Iterable,
2325
List,
2426
Literal,
2527
Optional,
@@ -31,7 +33,7 @@
3133

3234
if TYPE_CHECKING:
3335
from camel.agents import ChatAgent
34-
from camel.agents.chat_agent import StreamingChatAgentResponse
36+
from camel.responses import ChatAgentResponse
3537
import uuid
3638

3739
from camel.logger import get_logger
@@ -409,6 +411,9 @@ def decompose(
409411
agent: "ChatAgent",
410412
prompt: Optional[str] = None,
411413
task_parser: Callable[[str, str], List["Task"]] = parse_response,
414+
stream_callback: Optional[
415+
Callable[["ChatAgentResponse"], None]
416+
] = None,
412417
) -> Union[List["Task"], Generator[List["Task"], None, None]]:
413418
r"""Decompose a task to a list of sub-tasks. Automatically detects
414419
streaming or non-streaming based on agent configuration.
@@ -420,6 +425,10 @@ def decompose(
420425
task_parser (Callable[[str, str], List[Task]], optional): A
421426
function to extract Task from response. If not provided,
422427
the default parse_response will be used.
428+
stream_callback (Callable[[ChatAgentResponse], None], optional): A
429+
callback function that receives each chunk (ChatAgentResponse)
430+
during streaming. This allows tracking the decomposition
431+
progress in real-time.
423432
424433
Returns:
425434
Union[List[Task], Generator[List[Task], None, None]]: If agent is
@@ -441,21 +450,38 @@ def decompose(
441450
# Auto-detect streaming based on response type
442451
from camel.agents.chat_agent import StreamingChatAgentResponse
443452

444-
if isinstance(response, StreamingChatAgentResponse):
445-
return self._decompose_streaming(response, task_parser)
446-
else:
447-
return self._decompose_non_streaming(response, task_parser)
453+
is_streaming = isinstance(
454+
response, StreamingChatAgentResponse
455+
) or isinstance(response, GeneratorType)
456+
if (
457+
not is_streaming
458+
and hasattr(response, "__iter__")
459+
and not hasattr(response, "msg")
460+
):
461+
is_streaming = True
462+
463+
if is_streaming:
464+
return self._decompose_streaming(
465+
response, task_parser, stream_callback=stream_callback
466+
)
467+
return self._decompose_non_streaming(response, task_parser)
448468

449469
def _decompose_streaming(
450470
self,
451-
response: "StreamingChatAgentResponse",
471+
response: Iterable,
452472
task_parser: Callable[[str, str], List["Task"]],
473+
stream_callback: Optional[
474+
Callable[["ChatAgentResponse"], None]
475+
] = None,
453476
) -> Generator[List["Task"], None, None]:
454477
r"""Handle streaming response for task decomposition.
455478
456479
Args:
457480
response: Streaming response from agent
458481
task_parser: Function to parse tasks from response
482+
stream_callback (Callable[[ChatAgentResponse], None], optional): A
483+
callback function that receives each chunk (ChatAgentResponse)
484+
during streaming.
459485
460486
Yields:
461487
List[Task]: New tasks as they are parsed from streaming response
@@ -466,6 +492,14 @@ def _decompose_streaming(
466492
# Process streaming response
467493
for chunk in response:
468494
accumulated_content = chunk.msg.content
495+
if stream_callback:
496+
try:
497+
stream_callback(chunk)
498+
except Exception:
499+
logger.warning(
500+
"stream_callback failed during decomposition",
501+
exc_info=True,
502+
)
469503

470504
# Try to parse partial tasks from accumulated content
471505
try:

0 commit comments

Comments
 (0)