diff --git a/camel/benchmarks/browsecomp.py b/camel/benchmarks/browsecomp.py index f35ca701b1..5b4e69d7ed 100644 --- a/camel/benchmarks/browsecomp.py +++ b/camel/benchmarks/browsecomp.py @@ -11,7 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= - +import asyncio import base64 import hashlib import json @@ -585,15 +585,19 @@ def process_benchmark_row(row: Dict[str, Any]) -> Dict[str, Any]: input_message = QUERY_TEMPLATE.format(question=problem) if isinstance(pipeline_template, (ChatAgent)): - pipeline = pipeline_template.clone() # type: ignore[assignment] + chat_pipeline = pipeline_template.clone() - response_text = pipeline.step( + response_text = chat_pipeline.step( input_message, response_format=QueryResponse ) elif isinstance(pipeline_template, Workforce): - pipeline = pipeline_template.clone() # type: ignore[assignment] + workforce_pipeline = asyncio.run( + pipeline_template.clone_async() + ) task = Task(content=input_message, id="0") - task = pipeline.process_task(task) # type: ignore[attr-defined] + task = asyncio.run( + workforce_pipeline.process_task_async(task) + ) # type: ignore[attr-defined] if task_json_formatter: formatter_in_process = task_json_formatter.clone() else: @@ -607,16 +611,16 @@ def process_benchmark_row(row: Dict[str, Any]) -> Dict[str, Any]: elif isinstance(pipeline_template, RolePlaying): # RolePlaying is different. - pipeline = pipeline_template.clone( # type: ignore[assignment] + rp_pipeline = pipeline_template.clone( task_prompt=input_message ) n = 0 - input_msg = pipeline.init_chat() # type: ignore[attr-defined] + input_msg = rp_pipeline.init_chat() chat_history = [] while n < chat_turn_limit: n += 1 - assistant_response, user_response = pipeline.step( + assistant_response, user_response = rp_pipeline.step( input_msg ) if assistant_response.terminated: # type: ignore[union-attr] diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index 8ff82bda37..2087bf3ce0 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -340,6 +340,9 @@ def __init__( self.snapshot_interval: float = 30.0 # Shared memory UUID tracking to prevent re-sharing duplicates self._shared_memory_uuids: Set[str] = set() + # Defer initial worker-created callbacks until an event loop is + # available in async context. + self._pending_worker_created: Deque[BaseNode] = deque(self._children) self._initialize_callbacks(callbacks) # Set up coordinator agent with default system message @@ -533,10 +536,7 @@ def _initialize_callbacks( "WorkforceLogger addition." ) - for child in self._children: - self._notify_worker_created(child) - - def _notify_worker_created( + async def _notify_worker_created( self, worker_node: BaseNode, *, @@ -552,7 +552,19 @@ def _notify_worker_created( metadata=metadata, ) for cb in self._callbacks: - cb.log_worker_created(event) + await cb.log_worker_created(event) + + async def _flush_initial_worker_created_callbacks(self) -> None: + r"""Flush pending worker-created callbacks that were queued during + initialization before an event loop was available.""" + if not self._pending_worker_created: + return + + pending = list(self._pending_worker_created) + self._pending_worker_created.clear() + + for child in pending: + await self._notify_worker_created(child) def _get_or_create_shared_context_utility( self, @@ -723,7 +735,7 @@ def set_mode(self, mode: WorkforceMode) -> Workforce: >>> workforce.process_task(task) >>> >>> # Reset to original mode - >>> workforce.reset() # Automatically resets to initial mode + >>> await workforce.reset() # Automatically resets to initial mode >>> # Or manually switch mode >>> workforce.set_mode(WorkforceMode.AUTO_DECOMPOSE) >>> workforce.process_task(another_task) @@ -1669,7 +1681,7 @@ async def _apply_recovery_strategy( subtask_ids=[st.id for st in subtasks], ) for cb in self._callbacks: - cb.log_task_decomposed(task_decomposed_event) + await cb.log_task_decomposed(task_decomposed_event) for subtask in subtasks: task_created_event = TaskCreatedEvent( task_id=subtask.id, @@ -1679,7 +1691,7 @@ async def _apply_recovery_strategy( metadata=subtask.additional_info, ) for cb in self._callbacks: - cb.log_task_created(task_created_event) + await cb.log_task_created(task_created_event) # Insert subtasks at the head of the queue self._pending_tasks.extendleft(reversed(subtasks)) @@ -2288,7 +2300,7 @@ async def handle_decompose_append_task( return [task] if reset and self._state != WorkforceState.RUNNING: - self.reset() + await self.reset_async() logger.info("Workforce reset before handling task.") # Focus on the new task @@ -2302,7 +2314,7 @@ async def handle_decompose_append_task( metadata=task.additional_info, ) for cb in self._callbacks: - cb.log_task_created(task_created_event) + await cb.log_task_created(task_created_event) # The agent tend to be overconfident on the whole task, so we # decompose the task into subtasks first @@ -2323,7 +2335,7 @@ async def handle_decompose_append_task( subtask_ids=[st.id for st in subtasks], ) for cb in self._callbacks: - cb.log_task_decomposed(task_decomposed_event) + await cb.log_task_decomposed(task_decomposed_event) for subtask in subtasks: task_created_event = TaskCreatedEvent( task_id=subtask.id, @@ -2333,7 +2345,7 @@ async def handle_decompose_append_task( metadata=subtask.additional_info, ) for cb in self._callbacks: - cb.log_task_created(task_created_event) + await cb.log_task_created(task_created_event) if subtasks: # _pending_tasks will contain both undecomposed @@ -2361,6 +2373,9 @@ async def process_task_async( Returns: Task: The updated task. """ + # Emit worker-created callbacks lazily once an event loop is present. + await self._flush_initial_worker_created_callbacks() + # Delegate to intervention pipeline when requested to keep # backward-compat. if interactive: @@ -2412,7 +2427,7 @@ async def _process_task_with_pipeline(self, task: Task) -> Task: metadata=task.additional_info, ) for cb in self._callbacks: - cb.log_task_created(task_created_event) + await cb.log_task_created(task_created_event) task.state = TaskState.FAILED self.set_channel(TaskChannel()) @@ -2709,7 +2724,7 @@ def _start_child_node_when_paused( # Close the coroutine to prevent RuntimeWarning start_coroutine.close() - def add_single_agent_worker( + async def add_single_agent_worker_async( self, description: str, worker: ChatAgent, @@ -2719,6 +2734,8 @@ def add_single_agent_worker( r"""Add a worker node to the workforce that uses a single agent. Can be called when workforce is paused to dynamically add workers. + This is the async version of add_single_agent_worker. + Args: description (str): Description of the worker node. worker (ChatAgent): The agent to be added. @@ -2765,13 +2782,68 @@ def add_single_agent_worker( # If workforce is paused, start the worker's listening task self._start_child_node_when_paused(worker_node.start()) - self._notify_worker_created( + await self._notify_worker_created( worker_node, worker_type='SingleAgentWorker', ) return self - def add_role_playing_worker( + def add_single_agent_worker( + self, + description: str, + worker: ChatAgent, + pool_max_size: int = DEFAULT_WORKER_POOL_SIZE, + enable_workflow_memory: bool = False, + ) -> Workforce: + r"""Add a worker node to the workforce that uses a single agent. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`add_single_agent_worker_async` instead. + + Args: + description (str): Description of the worker node. + worker (ChatAgent): The agent to be added. + pool_max_size (int): Maximum size of the agent pool. + (default: :obj:`10`) + enable_workflow_memory (bool): Whether to enable workflow memory + accumulation. Set to True if you plan to call + save_workflow_memories(). (default: :obj:`False`) + + Returns: + Workforce: The workforce node itself. + """ + import asyncio + import warnings + + warnings.warn( + "add_single_agent_worker() is deprecated and will be removed in a " + "future version. Use " + "'await workforce.add_single_agent_worker_async()' instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync add_single_agent_worker() from " + "async context. Use " + "'await workforce.add_single_agent_worker_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + return asyncio.run( + self.add_single_agent_worker_async( + description=description, + worker=worker, + pool_max_size=pool_max_size, + enable_workflow_memory=enable_workflow_memory, + ) + ) + + async def add_role_playing_worker_async( self, description: str, assistant_role_name: str, @@ -2784,6 +2856,8 @@ def add_role_playing_worker( r"""Add a worker node to the workforce that uses `RolePlaying` system. Can be called when workforce is paused to dynamically add workers. + This is the async version of add_role_playing_worker. + Args: description (str): Description of the node. assistant_role_name (str): The role name of the assistant agent. @@ -2842,12 +2916,79 @@ def add_role_playing_worker( # If workforce is paused, start the worker's listening task self._start_child_node_when_paused(worker_node.start()) - self._notify_worker_created( + await self._notify_worker_created( worker_node, worker_type='RolePlayingWorker', ) return self + def add_role_playing_worker( + self, + description: str, + assistant_role_name: str, + user_role_name: str, + assistant_agent_kwargs: Optional[Dict] = None, + user_agent_kwargs: Optional[Dict] = None, + summarize_agent_kwargs: Optional[Dict] = None, + chat_turn_limit: int = 3, + ) -> Workforce: + r"""Add a worker node to the workforce that uses `RolePlaying` system. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`add_role_playing_worker_async` instead. + + Args: + description (str): Description of the node. + assistant_role_name (str): The role name of the assistant agent. + user_role_name (str): The role name of the user agent. + assistant_agent_kwargs (Optional[Dict]): The keyword arguments to + initialize the assistant agent in the role playing. + (default: :obj:`None`) + user_agent_kwargs (Optional[Dict]): The keyword arguments to + initialize the user agent in the role playing. + (default: :obj:`None`) + summarize_agent_kwargs (Optional[Dict]): The keyword arguments to + initialize the summarize agent. (default: :obj:`None`) + chat_turn_limit (int): The maximum number of chat turns in the + role playing. (default: :obj:`3`) + + Returns: + Workforce: The workforce node itself. + """ + import asyncio + import warnings + + warnings.warn( + "add_role_playing_worker() is deprecated and will be removed in a " + "future version. Use " + "'await workforce.add_role_playing_worker_async()' instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync add_role_playing_worker() from " + "async context. Use " + "'await workforce.add_role_playing_worker_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + return asyncio.run( + self.add_role_playing_worker_async( + description=description, + assistant_role_name=assistant_role_name, + user_role_name=user_role_name, + assistant_agent_kwargs=assistant_agent_kwargs, + user_agent_kwargs=user_agent_kwargs, + summarize_agent_kwargs=summarize_agent_kwargs, + chat_turn_limit=chat_turn_limit, + ) + ) + def add_workforce(self, workforce: Workforce) -> Workforce: r"""Add a workforce node to the workforce. Can be called when workforce is paused to dynamically add workers. @@ -2884,9 +3025,11 @@ async def _async_reset(self) -> None: self._pause_event.set() @check_if_running(False) - def reset(self) -> None: + async def reset_async(self) -> None: r"""Reset the workforce and all the child nodes under it. Can only be called when the workforce is not running. + + This is the async version of reset. """ super().reset() self._task = None @@ -2919,9 +3062,7 @@ def reset(self) -> None: if self._loop and not self._loop.is_closed(): # If we have a loop, use it to set the event safely try: - asyncio.run_coroutine_threadsafe( - self._async_reset(), self._loop - ).result() + await self._async_reset() except RuntimeError as e: logger.warning(f"Failed to reset via existing loop: {e}") # Fallback to direct event manipulation @@ -2932,7 +3073,37 @@ def reset(self) -> None: for cb in self._callbacks: if isinstance(cb, WorkforceMetrics): - cb.reset_task_data() + await cb.reset_task_data() + + @check_if_running(False) + def reset(self) -> None: + r"""Reset the workforce and all the child nodes under it. Can only + be called when the workforce is not running. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`reset_async` instead. + """ + import asyncio + import warnings + + warnings.warn( + "reset() is deprecated and will be removed in a future version. " + "Use 'await workforce.reset_async()' instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync reset() from async context. " + "Use 'await workforce.reset_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + asyncio.run(self.reset_async()) def save_workflow_memories( self, @@ -3800,7 +3971,7 @@ async def _post_task(self, task: Task, assignee_id: str) -> None: task_id=task.id, worker_id=assignee_id ) for cb in self._callbacks: - cb.log_task_started(task_started_event) + await cb.log_task_started(task_started_event) try: await self._channel.post_task(task, self.node_id, assignee_id) @@ -3947,7 +4118,7 @@ async def _create_worker_node_for_task(self, task: Task) -> Worker: self._children.append(new_node) - self._notify_worker_created( + await self._notify_worker_created( new_node, worker_type='SingleAgentWorker', role=new_node_conf.role, @@ -4085,7 +4256,7 @@ async def _post_ready_tasks(self) -> None: for cb in self._callbacks: # queue_time_seconds can be derived by logger if task # creation time is logged - cb.log_task_assigned(task_assigned_event) + await cb.log_task_assigned(task_assigned_event) # Step 2: Iterate through all pending tasks and post those that are # ready @@ -4243,7 +4414,7 @@ async def _post_ready_tasks(self) -> None: }, ) for cb in self._callbacks: - cb.log_task_failed(task_failed_event) + await cb.log_task_failed(task_failed_event) self._completed_tasks.append(task) self._cleanup_task_tracking(task.id) @@ -4306,7 +4477,7 @@ async def _handle_failed_task(self, task: Task) -> bool: }, ) for cb in self._callbacks: - cb.log_task_failed(task_failed_event) + await cb.log_task_failed(task_failed_event) # Check for immediate halt conditions after max retries. if task.failure_count >= MAX_TASK_RETRIES: @@ -4493,7 +4664,7 @@ async def _handle_completed_task(self, task: Task) -> None: metadata={'current_state': task.state.value}, ) for cb in self._callbacks: - cb.log_task_completed(task_completed_event) + await cb.log_task_completed(task_completed_event) # Find and remove the completed task from pending tasks tasks_list = list(self._pending_tasks) @@ -4609,9 +4780,11 @@ async def _graceful_shutdown(self, failed_task: Task) -> None: # Wait for the full timeout period await asyncio.sleep(self.graceful_shutdown_timeout) - def get_workforce_log_tree(self) -> str: + async def get_workforce_log_tree_async(self) -> str: r"""Returns an ASCII tree representation of the task hierarchy and worker status. + + This is the async version of get_workforce_log_tree. """ metrics_cb: List[WorkforceMetrics] = [ cb for cb in self._callbacks if isinstance(cb, WorkforceMetrics) @@ -4619,21 +4792,86 @@ def get_workforce_log_tree(self) -> str: if len(metrics_cb) == 0: return "Metrics Callback not initialized." else: - return metrics_cb[0].get_ascii_tree_representation() + return await metrics_cb[0].get_ascii_tree_representation() - def get_workforce_kpis(self) -> Dict[str, Any]: - r"""Returns a dictionary of key performance indicators.""" + def get_workforce_log_tree(self) -> str: + r"""Returns an ASCII tree representation of the task hierarchy and + worker status. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`get_workforce_log_tree_async` instead. + """ + import asyncio + import warnings + + warnings.warn( + "get_workforce_log_tree() is deprecated and will be removed in a " + "future version. Use " + "'await workforce.get_workforce_log_tree_async()' instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync get_workforce_log_tree() from async " + "context. Use " + "'await workforce.get_workforce_log_tree_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + return asyncio.run(self.get_workforce_log_tree_async()) + + async def get_workforce_kpis_async(self) -> Dict[str, Any]: + r"""Returns a dictionary of key performance indicators. + + This is the async version of get_workforce_kpis. + """ metrics_cb: List[WorkforceMetrics] = [ cb for cb in self._callbacks if isinstance(cb, WorkforceMetrics) ] if len(metrics_cb) == 0: return {"error": "Metrics Callback not initialized."} else: - return metrics_cb[0].get_kpis() + return await metrics_cb[0].get_kpis() - def dump_workforce_logs(self, file_path: str) -> None: + def get_workforce_kpis(self) -> Dict[str, Any]: + r"""Returns a dictionary of key performance indicators. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`get_workforce_kpis_async` instead. + """ + import asyncio + import warnings + + warnings.warn( + "get_workforce_kpis() is deprecated and will be removed in a " + "future version. Use 'await workforce.get_workforce_kpis_async()' " + "instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync get_workforce_kpis() from async context. " + "Use 'await workforce.get_workforce_kpis_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + return asyncio.run(self.get_workforce_kpis_async()) + + async def dump_workforce_logs_async(self, file_path: str) -> None: r"""Dumps all collected logs to a JSON file. + This is the async version of dump_workforce_logs. + Args: file_path (str): The path to the JSON file. """ @@ -4643,10 +4881,42 @@ def dump_workforce_logs(self, file_path: str) -> None: if len(metrics_cb) == 0: print("Logger not initialized. Cannot dump logs.") return - metrics_cb[0].dump_to_json(file_path) + await metrics_cb[0].dump_to_json(file_path) # Use logger.info or print, consistent with existing style logger.info(f"Workforce logs dumped to {file_path}") + def dump_workforce_logs(self, file_path: str) -> None: + r"""Dumps all collected logs to a JSON file. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`dump_workforce_logs_async` instead. + + Args: + file_path (str): The path to the JSON file. + """ + import asyncio + import warnings + + warnings.warn( + "dump_workforce_logs() is deprecated and will be removed in a " + "future version. Use " + "'await workforce.dump_workforce_logs_async()' instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync dump_workforce_logs() from async context. " + "Use 'await workforce.dump_workforce_logs_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + asyncio.run(self.dump_workforce_logs_async(file_path)) + async def _handle_skip_task(self) -> bool: r"""Handle skip request by marking pending and in-flight tasks as completed. @@ -5119,7 +5389,7 @@ async def _listen_to_channel(self) -> None: logger.info("All tasks completed.") all_tasks_completed_event = AllTasksCompletedEvent() for cb in self._callbacks: - cb.log_all_tasks_completed(all_tasks_completed_event) + await cb.log_all_tasks_completed(all_tasks_completed_event) # shut down the whole workforce tree self.stop() @@ -5187,9 +5457,11 @@ def stop(self) -> None: f"(event-loop not yet started)." ) - def clone(self, with_memory: bool = False) -> 'Workforce': + async def clone_async(self, with_memory: bool = False) -> 'Workforce': r"""Creates a new instance of Workforce with the same configuration. + This is the async version of clone. + Args: with_memory (bool, optional): Whether to copy the memory (conversation history) to the new instance. If True, the new @@ -5219,13 +5491,13 @@ def clone(self, with_memory: bool = False) -> 'Workforce': for child in self._children: if isinstance(child, SingleAgentWorker): cloned_worker = child.worker.clone(with_memory) - new_instance.add_single_agent_worker( + await new_instance.add_single_agent_worker_async( child.description, cloned_worker, pool_max_size=10, ) elif isinstance(child, RolePlayingWorker): - new_instance.add_role_playing_worker( + await new_instance.add_role_playing_worker_async( child.description, child.assistant_role_name, child.user_role_name, @@ -5235,13 +5507,53 @@ def clone(self, with_memory: bool = False) -> 'Workforce': child.chat_turn_limit, ) elif isinstance(child, Workforce): - new_instance.add_workforce(child.clone(with_memory)) + new_instance.add_workforce( + await child.clone_async(with_memory) + ) else: logger.warning(f"{type(child)} is not being cloned.") continue return new_instance + def clone(self, with_memory: bool = False) -> 'Workforce': + r"""Creates a new instance of Workforce with the same configuration. + + .. deprecated:: + This synchronous method is deprecated and will be removed in a + future version. Use :meth:`clone_async` instead. + + Args: + with_memory (bool, optional): Whether to copy the memory + (conversation history) to the new instance. If True, the new + instance will have the same conversation history. If False, + the new instance will have a fresh memory. + (default: :obj:`False`) + + Returns: + Workforce: A new instance of Workforce with the same configuration. + """ + import asyncio + import warnings + + warnings.warn( + "clone() is deprecated and will be removed in a future version. " + "Use 'await workforce.clone_async()' instead.", + DeprecationWarning, + stacklevel=2, + ) + + try: + asyncio.get_running_loop() + raise RuntimeError( + "Cannot call sync clone() from async context. " + "Use 'await workforce.clone_async()' instead." + ) + except RuntimeError as e: + if "no running event loop" not in str(e).lower(): + raise + return asyncio.run(self.clone_async(with_memory)) + @dependencies_required("mcp") def to_mcp( self, @@ -5350,7 +5662,7 @@ async def process_task( } # Reset tool - def reset(): + async def reset(): r"""Reset the workforce to its initial state. Clears all pending tasks, resets all child nodes, and returns @@ -5366,7 +5678,7 @@ def reset(): >>> print(result["message"]) # "Workforce reset successfully" """ try: - workforce_instance.reset() + await workforce_instance.reset_async() return { "status": "success", "message": "Workforce reset successfully", @@ -5470,7 +5782,7 @@ def get_children_info(): return children_info # Add single agent worker - def add_single_agent_worker( + async def add_single_agent_worker( description, system_message=None, role_name="Assistant", @@ -5534,7 +5846,9 @@ def add_single_agent_worker( "message": str(e), } - workforce_instance.add_single_agent_worker(description, agent) + await workforce_instance.add_single_agent_worker_async( + description, agent + ) return { "status": "success", @@ -5545,7 +5859,7 @@ def add_single_agent_worker( return {"status": "error", "message": str(e)} # Add role playing worker - def add_role_playing_worker( + async def add_role_playing_worker( description, assistant_role_name, user_role_name, @@ -5602,7 +5916,7 @@ def add_role_playing_worker( "message": "Cannot add workers while workforce is running", # noqa: E501 } - workforce_instance.add_role_playing_worker( + await workforce_instance.add_role_playing_worker_async( description=description, assistant_role_name=assistant_role_name, user_role_name=user_role_name, diff --git a/camel/societies/workforce/workforce_callback.py b/camel/societies/workforce/workforce_callback.py index 9dee1d559a..5ed43dfd6a 100644 --- a/camel/societies/workforce/workforce_callback.py +++ b/camel/societies/workforce/workforce_callback.py @@ -35,40 +35,42 @@ class WorkforceCallback(ABC): """ @abstractmethod - def log_task_created( + async def log_task_created( self, event: TaskCreatedEvent, ) -> None: pass @abstractmethod - def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: + async def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: pass @abstractmethod - def log_task_assigned(self, event: TaskAssignedEvent) -> None: + async def log_task_assigned(self, event: TaskAssignedEvent) -> None: pass @abstractmethod - def log_task_started(self, event: TaskStartedEvent) -> None: + async def log_task_started(self, event: TaskStartedEvent) -> None: pass @abstractmethod - def log_task_completed(self, event: TaskCompletedEvent) -> None: + async def log_task_completed(self, event: TaskCompletedEvent) -> None: pass @abstractmethod - def log_task_failed(self, event: TaskFailedEvent) -> None: + async def log_task_failed(self, event: TaskFailedEvent) -> None: pass @abstractmethod - def log_worker_created(self, event: WorkerCreatedEvent) -> None: + async def log_worker_created(self, event: WorkerCreatedEvent) -> None: pass @abstractmethod - def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: + async def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: pass @abstractmethod - def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None: + async def log_all_tasks_completed( + self, event: AllTasksCompletedEvent + ) -> None: pass diff --git a/camel/societies/workforce/workforce_logger.py b/camel/societies/workforce/workforce_logger.py index b04c134288..7bfe7c21a2 100644 --- a/camel/societies/workforce/workforce_logger.py +++ b/camel/societies/workforce/workforce_logger.py @@ -67,7 +67,7 @@ def _log_event(self, event_type: str, **kwargs: Any) -> None: if event_type == 'worker_created': self._initial_worker_logs.append(log_entry) - def log_task_created( + async def log_task_created( self, event: TaskCreatedEvent, ) -> None: @@ -96,7 +96,7 @@ def log_task_created( event.task_id ) - def log_task_decomposed( + async def log_task_decomposed( self, event: TaskDecomposedEvent, ) -> None: @@ -110,7 +110,7 @@ def log_task_decomposed( if event.parent_task_id in self._task_hierarchy: self._task_hierarchy[event.parent_task_id]['status'] = "decomposed" - def log_task_assigned( + async def log_task_assigned( self, event: TaskAssignedEvent, ) -> None: @@ -137,7 +137,7 @@ def log_task_assigned( ) self._worker_information[event.worker_id]['status'] = 'busy' - def log_task_started( + async def log_task_started( self, event: TaskStartedEvent, ) -> None: @@ -151,7 +151,7 @@ def log_task_started( if event.task_id in self._task_hierarchy: self._task_hierarchy[event.task_id]['status'] = 'processing' - def log_task_completed(self, event: TaskCompletedEvent) -> None: + async def log_task_completed(self, event: TaskCompletedEvent) -> None: r"""Logs the successful completion of a task.""" self._log_event( event_type=event.event_type, @@ -185,7 +185,7 @@ def log_task_completed(self, event: TaskCompletedEvent) -> None: + 1 ) - def log_task_failed( + async def log_task_failed( self, event: TaskFailedEvent, ) -> None: @@ -211,7 +211,7 @@ def log_task_failed( + 1 ) - def log_worker_created( + async def log_worker_created( self, event: WorkerCreatedEvent, ) -> None: @@ -233,7 +233,7 @@ def log_worker_created( **(event.metadata or {}), } - def log_worker_deleted( + async def log_worker_deleted( self, event: WorkerDeletedEvent, ) -> None: @@ -248,7 +248,7 @@ def log_worker_deleted( self._worker_information[event.worker_id]['status'] = 'deleted' # Or del self._worker_information[worker_id] - def log_queue_status( + async def log_queue_status( self, event: QueueStatusEvent, ) -> None: @@ -261,10 +261,12 @@ def log_queue_status( metadata=event.metadata or {}, ) - def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None: + async def log_all_tasks_completed( + self, event: AllTasksCompletedEvent + ) -> None: pass - def reset_task_data(self) -> None: + async def reset_task_data(self) -> None: r"""Resets logs and data related to tasks, preserving worker information. """ @@ -283,7 +285,7 @@ def reset_task_data(self) -> None: f"{self.workforce_id}" ) - def dump_to_json(self, file_path: str) -> None: + async def dump_to_json(self, file_path: str) -> None: r"""Dumps all log entries to a JSON file. Args: @@ -447,7 +449,7 @@ def _get_task_tree_string( ) return tree_str - def get_ascii_tree_representation(self) -> str: + async def get_ascii_tree_representation(self) -> str: r"""Generates an ASCII tree representation of the current task hierarchy and worker status. """ @@ -480,7 +482,7 @@ def get_ascii_tree_representation(self) -> str: ) return output_str - def get_kpis(self) -> Dict[str, Any]: + async def get_kpis(self) -> Dict[str, Any]: r"""Calculates and returns key performance indicators from the logs.""" kpis: Dict[str, Any] = { 'total_tasks_created': 0, diff --git a/camel/societies/workforce/workforce_metrics.py b/camel/societies/workforce/workforce_metrics.py index 06a7735c67..10a34715ad 100644 --- a/camel/societies/workforce/workforce_metrics.py +++ b/camel/societies/workforce/workforce_metrics.py @@ -17,17 +17,17 @@ class WorkforceMetrics(ABC): @abstractmethod - def reset_task_data(self) -> None: + async def reset_task_data(self) -> None: pass @abstractmethod - def dump_to_json(self, file_path: str) -> None: + async def dump_to_json(self, file_path: str) -> None: pass @abstractmethod - def get_ascii_tree_representation(self) -> str: + async def get_ascii_tree_representation(self) -> str: pass @abstractmethod - def get_kpis(self) -> Dict[str, Any]: + async def get_kpis(self) -> Dict[str, Any]: pass diff --git a/examples/benchmarks/browsecomp_workforce.py b/examples/benchmarks/browsecomp_workforce.py index 07de756bed..0382f08588 100644 --- a/examples/benchmarks/browsecomp_workforce.py +++ b/examples/benchmarks/browsecomp_workforce.py @@ -12,6 +12,8 @@ # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import asyncio + from camel.agents.chat_agent import ChatAgent from camel.benchmarks.browsecomp import BrowseCompBenchmark from camel.messages.base import BaseMessage @@ -19,7 +21,8 @@ from camel.societies.workforce.workforce import Workforce from camel.types.enums import ModelPlatformType, ModelType -if __name__ == '__main__': + +async def main(): # Configure the model model_config = { "model_platform": ModelPlatformType.DEFAULT, @@ -66,13 +69,13 @@ new_worker_agent=new_worker_agent, ) - # Add workers to the workforce - workforce.add_single_agent_worker( - description="Web content researcher", worker=web_researcher_agent + await workforce.add_single_agent_worker_async( + description="Web content researcher", + worker=web_researcher_agent, ) # Add a role-playing worker for complex queries - workforce.add_role_playing_worker( + await workforce.add_role_playing_worker_async( description="Collaborative research team", assistant_role_name="Research Assistant", user_role_name="Research Lead", @@ -99,3 +102,7 @@ benchmark.validate(grader=grader_agent) print("Benchmark completed. Results saved to report_workforce.html") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/debug/eigent.py b/examples/debug/eigent.py index 9fdd92e5ef..1b8c404d0d 100644 --- a/examples/debug/eigent.py +++ b/examples/debug/eigent.py @@ -725,22 +725,25 @@ async def main(): use_structured_output_handler=False, ) - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( "Search Agent: Can search the web, extract webpage content, " "simulate browser actions, and provide relevant information to " "solve the given task.", worker=search_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( "Developer Agent: A skilled coding assistant that can write and " "execute code, run terminal commands, and verify solutions to " "complete tasks.", worker=developer_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( "Document Agent: A document processing assistant for creating, " "modifying, and managing various document formats, including " "presentations.", worker=document_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( "Multi-Modal Agent: A multi-modal processing assistant for " "analyzing, and generating media content like audio and images.", worker=multi_modal_agent, @@ -763,16 +766,16 @@ async def main(): # Test WorkforceLogger features print("\n--- Workforce Log Tree ---") - print(workforce.get_workforce_log_tree()) + print(await workforce.get_workforce_log_tree_async()) print("\n--- Workforce KPIs ---") - kpis = workforce.get_workforce_kpis() + kpis = await workforce.get_workforce_kpis_async() for key, value in kpis.items(): print(f"{key}: {value}") log_file_path = "eigent_logs.json" print(f"\n--- Dumping Workforce Logs to {log_file_path} ---") - workforce.dump_workforce_logs(log_file_path) + await workforce.dump_workforce_logs_async(log_file_path) print(f"Logs dumped. Please check the file: {log_file_path}") diff --git a/examples/toolkits/async_browser_toolkit.py b/examples/toolkits/async_browser_toolkit.py index 89ecbc1174..a3fc07507a 100644 --- a/examples/toolkits/async_browser_toolkit.py +++ b/examples/toolkits/async_browser_toolkit.py @@ -111,4 +111,4 @@ 'Minecraft: Switch Edition - Nintendo Switch'. The price is $35.97, and it has a rating of 4.8 stars based on 1,525 ratings. ========================================================================== -""" +""" \ No newline at end of file diff --git a/examples/toolkits/message_agent_toolkit.py b/examples/toolkits/message_agent_toolkit.py index 0009805222..ef765b62c2 100644 --- a/examples/toolkits/message_agent_toolkit.py +++ b/examples/toolkits/message_agent_toolkit.py @@ -20,7 +20,7 @@ from camel.toolkits import AgentCommunicationToolkit -def create_messaging_workforce(): +async def create_messaging_workforce(): r"""Create a workforce where agents can communicate with each other.""" # 1. Initialize the message toolkit @@ -76,9 +76,9 @@ def create_messaging_workforce(): # 5. Create workforce and add workers with matching names workforce = Workforce("Content Creation Team") - workforce.add_single_agent_worker("Researcher", researcher) - workforce.add_single_agent_worker("Writer", writer) - workforce.add_single_agent_worker("Reviewer", reviewer) + await workforce.add_single_agent_worker_async("Researcher", researcher) + await workforce.add_single_agent_worker_async("Writer", writer) + await workforce.add_single_agent_worker_async("Reviewer", reviewer) return workforce, msg_toolkit @@ -87,7 +87,7 @@ async def demonstrate_coordination(): r"""Show how agents coordinate using messaging during task execution.""" print("šŸ—ļø Setting up messaging-enabled workforce...") - workforce, msg_toolkit = create_messaging_workforce() + workforce, msg_toolkit = await create_messaging_workforce() print("šŸ“‹ Available agents:") print(msg_toolkit.list_available_agents()) diff --git a/examples/workforce/eigent.py b/examples/workforce/eigent.py index 6dee6f8151..67cdc34624 100644 --- a/examples/workforce/eigent.py +++ b/examples/workforce/eigent.py @@ -1069,24 +1069,27 @@ async def main(): task_timeout_seconds=900.0, ) - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( "Search Agent: An expert web researcher that can browse websites, " "perform searches, and extract information to support other agents.", worker=search_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( "Developer Agent: A master-level coding assistant with a powerful " "terminal. It can write and execute code, manage files, automate " "desktop tasks, and deploy web applications to solve complex " "technical challenges.", worker=developer_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( "Document Agent: A document processing assistant skilled in creating " "and modifying a wide range of file formats. It can generate " "text-based files (Markdown, JSON, YAML, HTML), office documents " "(Word, PDF), presentations (PowerPoint), and data files " "(Excel, CSV).", worker=document_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( "Multi-Modal Agent: A specialist in media processing. It can " "analyze images and audio, transcribe speech, download videos, and " "generate new images from text prompts.", @@ -1109,16 +1112,16 @@ async def main(): # Test WorkforceLogger features print("\n--- Workforce Log Tree ---") - print(workforce.get_workforce_log_tree()) + print(await workforce.get_workforce_log_tree_async()) print("\n--- Workforce KPIs ---") - kpis = workforce.get_workforce_kpis() + kpis = await workforce.get_workforce_kpis_async() for key, value in kpis.items(): print(f"{key}: {value}") log_file_path = "eigent_logs.json" print(f"\n--- Dumping Workforce Logs to {log_file_path} ---") - workforce.dump_workforce_logs(log_file_path) + await workforce.dump_workforce_logs_async(log_file_path) print(f"Logs dumped. Please check the file: {log_file_path}") diff --git a/examples/workforce/hackathon_judges.py b/examples/workforce/hackathon_judges.py index 7d43b04ef0..31c6cccb80 100644 --- a/examples/workforce/hackathon_judges.py +++ b/examples/workforce/hackathon_judges.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import asyncio import textwrap from camel.agents import ChatAgent @@ -57,7 +58,7 @@ def make_judge( return agent -def main(): +async def main(): proj_content = textwrap.dedent( """\ Project name: CAMEL-Powered Adaptive Learning Assistant @@ -228,48 +229,52 @@ def main(): id="0", ) - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( 'Visionary Veronica (Judge), a venture capitalist who is ' 'obsessed with how projects can be scaled into "unicorn" companies', worker=vc_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( 'Critical John (Judge), an experienced engineer and a' ' perfectionist.', worker=eng_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( 'Innovator Iris (Judge), a well-known AI startup founder who' ' is always looking for the "next big thing" in AI.', worker=founder_agent, - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( 'Friendly Frankie (Judge), a contributor to the CAMEL-AI ' 'project and is always excited to see how people are using it.', worker=contributor_agent, - ).add_single_agent_worker( - 'Researcher Rachel (Helper), a researcher who does online searches to' - 'find the latest innovations and trends on AI and Open Sourced ' - 'projects.', + ) + await workforce.add_single_agent_worker_async( + 'Researcher Rachel (Helper), a researcher who does online ' + 'searches to find the latest innovations and trends on AI and Open ' + 'Sourced projects.', worker=researcher_agent, ) - workforce.process_task(task) + await workforce.process_task_async(task) # Test WorkforceLogger features print("\n--- Workforce Log Tree ---") - print(workforce.get_workforce_log_tree()) + print(await workforce.get_workforce_log_tree_async()) print("\n--- Workforce KPIs ---") - kpis = workforce.get_workforce_kpis() + kpis = await workforce.get_workforce_kpis_async() for key, value in kpis.items(): print(f"{key}: {value}") log_file_path = "hackathon_judges_logs.json" print(f"\n--- Dumping Workforce Logs to {log_file_path} ---") - workforce.dump_workforce_logs(log_file_path) + await workforce.dump_workforce_logs_async(log_file_path) print(f"Logs dumped. Please check the file: {log_file_path}") if __name__ == "__main__": - main() + asyncio.run(main()) ''' =============================================================================== diff --git a/examples/workforce/human_in_the_loop/simple_hotkey_demo.py b/examples/workforce/human_in_the_loop/simple_hotkey_demo.py index 7ac97cd938..46da21f0c3 100644 --- a/examples/workforce/human_in_the_loop/simple_hotkey_demo.py +++ b/examples/workforce/human_in_the_loop/simple_hotkey_demo.py @@ -25,7 +25,7 @@ from camel.types import ModelPlatformType, ModelType -def create_simple_workforce(): +async def create_simple_workforce(): r"""Create a simple workforce for demonstration.""" thinking_toolkit = ThinkingToolkit() file_write_toolkit = FileWriteToolkit() @@ -60,8 +60,10 @@ def create_simple_workforce(): ) workforce = Workforce('Simple Demo Workforce') - workforce.add_single_agent_worker("Poet Agent", poet_agent) - workforce.add_single_agent_worker("File Write Agent", file_write_agent) + await workforce.add_single_agent_worker_async("Poet Agent", poet_agent) + await workforce.add_single_agent_worker_async( + "File Write Agent", file_write_agent + ) return workforce @@ -138,11 +140,11 @@ def simple_intervention_menu(workforce: Workforce): print("\nšŸ”„ Enter 4 to resume or 5 to stop") -def main(): +async def main(): r"""Main demo function.""" # Create workforce and task - workforce = create_simple_workforce() + workforce = await create_simple_workforce() task = Task( content=( "write a poem about the sun and the moon" "then write the md file" @@ -224,4 +226,4 @@ def run_workforce(): if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/examples/workforce/multi_turn_conversation.py b/examples/workforce/multi_turn_conversation.py index 61032df224..7fd8386ad8 100644 --- a/examples/workforce/multi_turn_conversation.py +++ b/examples/workforce/multi_turn_conversation.py @@ -31,6 +31,8 @@ Turn 3: "Which one would you recommend based on our discussion?" """ +import asyncio + from camel.agents.chat_agent import ChatAgent from camel.messages.base import BaseMessage from camel.models import ModelFactory @@ -38,94 +40,103 @@ from camel.tasks.task import Task from camel.types import ModelPlatformType, ModelType -# Set up agents -research_agent = ChatAgent( - system_message=BaseMessage.make_assistant_message( - role_name="Researcher", - content="You are a research specialist who gathers and " - "analyzes information. You focus on finding facts and " - "providing detailed context.", - ), - model=ModelFactory.create( - model_platform=ModelPlatformType.DEFAULT, - model_type=ModelType.DEFAULT, - ), -) - -writer_agent = ChatAgent( - system_message=BaseMessage.make_assistant_message( - role_name="Writer", - content="You are a professional writer who creates clear, " - "concise responses. You synthesize information into " - "well-structured answers.", - ), - model=ModelFactory.create( - model_platform=ModelPlatformType.DEFAULT, - model_type=ModelType.DEFAULT, - ), -) - -# Create workforce with 2 agents -workforce = Workforce('Multi-turn Assistant Team') -workforce.add_single_agent_worker( - "A researcher who gathers and analyzes information", worker=research_agent -).add_single_agent_worker( - "A writer who synthesizes information into clear responses", - worker=writer_agent, -) - -# Initialize conversation history for all rounds -conversation_history = [] -turn_number = 0 - -print("=== Multi-turn Workforce Conversation ===") -print("Type your task/question (or 'quit' to exit)") -print("=" * 50) - -# Multi-turn conversation loop -while True: - turn_number += 1 - print(f"\n--- Turn {turn_number} ---") - - # Get user input from terminal - user_input = input("You: ").strip() - - if user_input.lower() in ['quit', 'exit', 'q']: - print("Exiting conversation. Goodbye!") - break - - if not user_input: - print("Please enter a valid task or question.") - continue - - # Build task content with context from previous rounds - history_context = "" - for i in range(len(conversation_history)): - item = conversation_history[i] - history_context += f"Round {i+1}:\n" - history_context += f"Task: {item['task']}\n" - history_context += f"Result: {item['result']}\n\n" - - task_content = ( - f"{history_context}{'='*50}\nNew task: {user_input}" - if history_context - else user_input + +async def main(): + # Set up agents + research_agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="Researcher", + content="You are a research specialist who gathers and " + "analyzes information. You focus on finding facts and " + "providing detailed context.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.DEFAULT, + model_type=ModelType.DEFAULT, + ), ) - # Create and process task - task = Task(content=task_content, id=str(turn_number)) - task_result = workforce.process_task(task) + writer_agent = ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name="Writer", + content="You are a professional writer who creates clear, " + "concise responses. You synthesize information into " + "well-structured answers.", + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.DEFAULT, + model_type=ModelType.DEFAULT, + ), + ) + + # Create workforce with 2 agents + workforce = Workforce('Multi-turn Assistant Team') + + await workforce.add_single_agent_worker_async( + "A researcher who gathers and analyzes information", + worker=research_agent, + ) + await workforce.add_single_agent_worker_async( + "A writer who synthesizes information into clear responses", + worker=writer_agent, + ) + + # Initialize conversation history for all rounds + conversation_history = [] + turn_number = 0 + + print("=== Multi-turn Workforce Conversation ===") + print("Type your task/question (or 'quit' to exit)") + print("=" * 50) + + # Multi-turn conversation loop + while True: + turn_number += 1 + print(f"\n--- Turn {turn_number} ---") + + # Get user input from terminal + user_input = input("You: ").strip() + + if user_input.lower() in ['quit', 'exit', 'q']: + print("Exiting conversation. Goodbye!") + break + + if not user_input: + print("Please enter a valid task or question.") + continue + + # Build task content with context from previous rounds + history_context = "" + for i in range(len(conversation_history)): + item = conversation_history[i] + history_context += f"Round {i+1}:\n" + history_context += f"Task: {item['task']}\n" + history_context += f"Result: {item['result']}\n\n" + + task_content = ( + f"{history_context}{'='*50}\nNew task: {user_input}" + if history_context + else user_input + ) + + # Create and process task + task = Task(content=task_content, id=str(turn_number)) + task_result = await workforce.process_task_async(task) + + # Display response + print(f"\nAssistant: {task_result.result}") + + # Store all information from this round for future context + round_info = { + 'task': user_input, + 'result': task_result.result, + } - # Display response - print(f"\nAssistant: {task_result.result}") + conversation_history.append(round_info) - # Store all information from this round for future context - round_info = { - 'task': user_input, - 'result': task_result.result, - } + print("\n--- Conversation Complete ---") + print(f"Total turns: {turn_number}") - conversation_history.append(round_info) -print("\n--- Conversation Complete ---") -print(f"Total turns: {turn_number}") +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/workforce/multiple_single_agents.py b/examples/workforce/multiple_single_agents.py index a84f045aa8..54ce69b11e 100644 --- a/examples/workforce/multiple_single_agents.py +++ b/examples/workforce/multiple_single_agents.py @@ -11,6 +11,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import asyncio + from camel.agents.chat_agent import ChatAgent from camel.messages.base import BaseMessage from camel.models import ModelFactory @@ -20,7 +22,7 @@ from camel.types import ModelPlatformType, ModelType -def main(): +async def main(): # 1. Set up a Research agent with search tools search_agent = ChatAgent( system_message=BaseMessage.make_assistant_message( @@ -68,12 +70,15 @@ def main(): graceful_shutdown_timeout=30.0, ) - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( "A researcher who can search online for information.", worker=search_agent, - ).add_single_agent_worker( - "An analyst who can process research findings.", worker=analyst_agent - ).add_single_agent_worker( + ) + await workforce.add_single_agent_worker_async( + "An analyst who can process research findings.", + worker=analyst_agent, + ) + await workforce.add_single_agent_worker_async( "A writer who can create a final report from the analysis.", worker=writer_agent, ) @@ -91,22 +96,22 @@ def main(): id='0', ) - workforce.process_task(human_task) + await workforce.process_task_async(human_task) # Test WorkforceLogger features print("\n--- Workforce Log Tree ---") - print(workforce.get_workforce_log_tree()) + print(await workforce.get_workforce_log_tree_async()) print("\n--- Workforce KPIs ---") - kpis = workforce.get_workforce_kpis() + kpis = await workforce.get_workforce_kpis_async() for key, value in kpis.items(): print(f"{key}: {value}") log_file_path = "multiple_single_agents_logs.json" print(f"\n--- Dumping Workforce Logs to {log_file_path} ---") - workforce.dump_workforce_logs(log_file_path) + await workforce.dump_workforce_logs_async(log_file_path) print(f"Logs dumped. Please check the file: {log_file_path}") if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/examples/workforce/pipeline_workflow_example.py b/examples/workforce/pipeline_workflow_example.py index 439aea5d88..a990515156 100644 --- a/examples/workforce/pipeline_workflow_example.py +++ b/examples/workforce/pipeline_workflow_example.py @@ -134,7 +134,7 @@ async def example_1_literature_analysis_pipeline(): summary_agent = ChatAgent( system_message=summary_system_message, model=model, tools=[] ) - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( f"Summary Specialist {i+1}", summary_agent ) @@ -151,8 +151,12 @@ async def example_1_literature_analysis_pipeline(): system_message=synthesis_system_message, model=model, tools=[] ) - workforce.add_single_agent_worker("Literature Researcher", search_agent) - workforce.add_single_agent_worker("Research Synthesizer", synthesis_agent) + await workforce.add_single_agent_worker_async( + "Literature Researcher", search_agent + ) + await workforce.add_single_agent_worker_async( + "Research Synthesizer", synthesis_agent + ) # Build literature analysis pipeline workforce.pipeline_add( diff --git a/examples/workforce/role_playing_with_agents.py b/examples/workforce/role_playing_with_agents.py index a813034348..29e9cce98d 100644 --- a/examples/workforce/role_playing_with_agents.py +++ b/examples/workforce/role_playing_with_agents.py @@ -13,6 +13,8 @@ # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import asyncio + from camel.agents.chat_agent import ChatAgent from camel.messages.base import BaseMessage from camel.models import ModelFactory @@ -22,7 +24,7 @@ from camel.types import ModelPlatformType, ModelType -def main(): +async def main(): guide_sysmsg = BaseMessage.make_assistant_message( role_name="tour guide", content="You have to lead everyone to have fun", @@ -61,7 +63,8 @@ def main(): ) workforce = Workforce('a travel group') - workforce.add_role_playing_worker( + await workforce.add_single_agent_worker_async('planner', planner_agent) + await workforce.add_single_agent_worker_async( description='research Group', assistant_role_name=assistant_role_name, user_role_name=user_role_name, @@ -69,30 +72,29 @@ def main(): user_agent_kwargs=user_agent_kwargs, summarize_agent_kwargs={}, chat_turn_limit=1, - ).add_single_agent_worker( - 'tour guide', guide_agent - ).add_single_agent_worker('planner', planner_agent) + ) + await workforce.add_single_agent_worker_async('tour guide', guide_agent) human_task = Task( content="research history of Paris and plan a tour.", id='0', ) - workforce.process_task(human_task) + await workforce.process_task_async(human_task) # Test WorkforceLogger features print("\n--- Workforce Log Tree ---") - print(workforce.get_workforce_log_tree()) + print(await workforce.get_workforce_log_tree_async()) print("\n--- Workforce KPIs ---") - kpis = workforce.get_workforce_kpis() + kpis = await workforce.get_workforce_kpis_async() for key, value in kpis.items(): print(f"{key}: {value}") log_file_path = "role_playing_with_agents_logs.json" print(f"\n--- Dumping Workforce Logs to {log_file_path} ---") - workforce.dump_workforce_logs(log_file_path) + await workforce.dump_workforce_logs_async(log_file_path) print(f"Logs dumped. Please check the file: {log_file_path}") if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/examples/workforce/simple_workforce_mcp.py b/examples/workforce/simple_workforce_mcp.py index 3403a46d94..2a4c19a3e7 100644 --- a/examples/workforce/simple_workforce_mcp.py +++ b/examples/workforce/simple_workforce_mcp.py @@ -27,12 +27,14 @@ - Call add_single_agent_worker to add new workers """ +import asyncio + from camel.agents import ChatAgent from camel.messages import BaseMessage from camel.societies.workforce import Workforce -def main(): +async def main(): """Create and run a simple workforce MCP server.""" # Create a basic workforce @@ -44,7 +46,7 @@ def main(): content="You are a helpful analyst who can process various tasks.", ) analyst = ChatAgent(system_message=analyst_msg) - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( description="General purpose analyst", worker=analyst ) @@ -61,10 +63,10 @@ def main(): try: # This will start the server and block - mcp_server.run() + await mcp_server.run_stdio_async() except KeyboardInterrupt: print("\nServer stopped.") if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/examples/workforce/workforce_callbacks_example.py b/examples/workforce/workforce_callbacks_example.py index 01fdeda2a4..05a7294463 100644 --- a/examples/workforce/workforce_callbacks_example.py +++ b/examples/workforce/workforce_callbacks_example.py @@ -38,6 +38,7 @@ ) from camel.societies.workforce.workforce import Workforce from camel.societies.workforce.workforce_callback import WorkforceCallback +from camel.tasks import Task from camel.types import ModelPlatformType, ModelType logger = get_logger(__name__) @@ -46,55 +47,57 @@ class PrintCallback(WorkforceCallback): r"""Simple callback printing events to logs to observe ordering.""" - def log_task_created(self, event: TaskCreatedEvent) -> None: + async def log_task_created(self, event: TaskCreatedEvent) -> None: print( f"[PrintCallback] task_created: id={event.task_id}, " f"desc={event.description!r}, parent={event.parent_task_id}" ) - def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: + async def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: print( f"[PrintCallback] task_decomposed: parent={event.parent_task_id}," f" subtasks={event.subtask_ids}" ) - def log_task_assigned(self, event: TaskAssignedEvent) -> None: + async def log_task_assigned(self, event: TaskAssignedEvent) -> None: print( f"[PrintCallback] task_assigned: task={event.task_id}, " f"worker={event.worker_id}" ) - def log_task_started(self, event: TaskStartedEvent) -> None: + async def log_task_started(self, event: TaskStartedEvent) -> None: print( f"[PrintCallback] task_started: task={event.task_id}, " f"worker={event.worker_id}" ) - def log_task_completed(self, event: TaskCompletedEvent) -> None: + async def log_task_completed(self, event: TaskCompletedEvent) -> None: print( f"[PrintCallback] task_completed: task={event.task_id}, " f"worker={event.worker_id}, took={event.processing_time_seconds}s" ) - def log_task_failed(self, event: TaskFailedEvent) -> None: + async def log_task_failed(self, event: TaskFailedEvent) -> None: logger.warning( f"[PrintCallback] task_failed: task={event.task_id}, " f"err={event.error_message}" ) - def log_worker_created(self, event: WorkerCreatedEvent) -> None: + async def log_worker_created(self, event: WorkerCreatedEvent) -> None: print( f"[PrintCallback] worker_created: id={event.worker_id}, " f"type={event.worker_type}, role={event.role}" ) - def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: + async def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: print( f"[PrintCallback] worker_deleted: id={event.worker_id}, " f"reason={event.reason}" ) - def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None: + async def log_all_tasks_completed( + self, event: AllTasksCompletedEvent + ) -> None: print("[PrintCallback] all_tasks_completed") @@ -125,23 +128,18 @@ async def run_demo() -> None: teacher = build_teacher_agent() student = build_student_agent() - workforce.add_single_agent_worker("Teacher Worker", teacher) - workforce.add_single_agent_worker("Student Worker", student) - workforce.add_main_task( - "The teacher set an exam question and had the students answer it." + await workforce.add_single_agent_worker_async("Teacher Worker", teacher) + await workforce.add_single_agent_worker_async("Student Worker", student) + await workforce.process_task_async( + Task( + content="The teacher set an exam question and had the " + "students answer it." + ) ) - # Start Workforce and wait for completion (timeout to avoid hanging) - wf_task = asyncio.create_task(workforce.start()) - try: - await asyncio.wait_for(wf_task, timeout=30.0) - except asyncio.TimeoutError: - logger.warning("Workforce run timed out; stopping...") - workforce.stop() - # Read KPIs and a simple "tree" - print(f"KPIs: {workforce.get_workforce_kpis()}") - print(f"Tree: {workforce.get_workforce_log_tree()}") + print(f"KPIs: {await workforce.get_workforce_log_tree_async()}") + print(f"Tree: {await workforce.get_workforce_log_tree()}") if __name__ == "__main__": diff --git a/examples/workforce/workforce_multitask_with_skip.py b/examples/workforce/workforce_multitask_with_skip.py index fff9b31f81..cd7da946c8 100644 --- a/examples/workforce/workforce_multitask_with_skip.py +++ b/examples/workforce/workforce_multitask_with_skip.py @@ -79,7 +79,7 @@ async def interactive_skip_demo(): model_type=ModelType.DEFAULT, ) worker = ChatAgent(model=model) - workforce.add_single_agent_worker("Interactive Worker", worker) + await workforce.add_single_agent_worker_async("Interactive Worker", worker) # Add multiple tasks logger.info("Adding tasks...") diff --git a/examples/workforce/workforce_shared_memory_validation.py b/examples/workforce/workforce_shared_memory_validation.py index 945a223817..346b224ab2 100644 --- a/examples/workforce/workforce_shared_memory_validation.py +++ b/examples/workforce/workforce_shared_memory_validation.py @@ -11,6 +11,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import asyncio + from camel.agents import ChatAgent from camel.messages import BaseMessage from camel.models import ModelFactory @@ -32,7 +34,7 @@ def create_test_agent(role_name: str, unique_fact: str, model): ) -def main(): +async def main(): r"""Run the validation test for shared memory functionality.""" print("=== Workforce Shared Memory Validation Test ===\n") @@ -70,9 +72,13 @@ def main(): "Charlie", "Charlie knows the deadline is Friday", model ) - workforce.add_single_agent_worker("Alice the Coder", agent_alice) - workforce.add_single_agent_worker("Bob the Manager", agent_bob) - workforce.add_single_agent_worker("Charlie the Designer", agent_charlie) + await workforce.add_single_agent_worker_async( + "Alice the Coder", agent_alice + ) + await workforce.add_single_agent_worker_async("Bob the Manager", agent_bob) + await workforce.add_single_agent_worker_async( + "Charlie the Designer", agent_charlie + ) print("āœ“ Created workforce with 3 agents") print(" - Alice knows: secret code BLUE42") @@ -322,11 +328,13 @@ def test_agent_knowledge( "Charlie", "Charlie knows the deadline is Friday", model ) - workforce_no_memory.add_single_agent_worker( + await workforce_no_memory.add_single_agent_worker_async( "Alice the Coder", agent_alice_2 ) - workforce_no_memory.add_single_agent_worker("Bob the Manager", agent_bob_2) - workforce_no_memory.add_single_agent_worker( + await workforce_no_memory.add_single_agent_worker_async( + "Bob the Manager", agent_bob_2 + ) + await workforce_no_memory.add_single_agent_worker_async( "Charlie the Designer", agent_charlie_2 ) @@ -401,7 +409,7 @@ def test_agent_knowledge( if __name__ == "__main__": - main() + asyncio.run(main()) """ diff --git a/examples/workforce/workforce_workflow_memory_example.py b/examples/workforce/workforce_workflow_memory_example.py index fa6f05b7d9..62e41f1f8c 100644 --- a/examples/workforce/workforce_workflow_memory_example.py +++ b/examples/workforce/workforce_workflow_memory_example.py @@ -100,7 +100,7 @@ async def demonstrate_first_session(): # Add math agent with math tools math_agent = create_math_agent() - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( description="math_expert", worker=math_agent, enable_workflow_memory=True, # Enable to save workflows later @@ -108,7 +108,7 @@ async def demonstrate_first_session(): # Add writer agent without tools writer_agent = create_writer_agent() - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( description="content_writer", worker=writer_agent, enable_workflow_memory=True, # Enable to save workflows later @@ -154,7 +154,7 @@ async def demonstrate_second_session(): # Add workers with same descriptive names as before math_agent = create_math_agent() - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( description="math_expert", # Same description = loads matching # workflow worker=math_agent, @@ -162,7 +162,7 @@ async def demonstrate_second_session(): ) writer_agent = create_writer_agent() - workforce.add_single_agent_worker( + await workforce.add_single_agent_worker_async( description="content_writer", # Same description = loads # matching workflow worker=writer_agent, diff --git a/test/workforce/test_workforce.py b/test/workforce/test_workforce.py index c36dd3f8e1..4d535ec43a 100644 --- a/test/workforce/test_workforce.py +++ b/test/workforce/test_workforce.py @@ -336,7 +336,8 @@ def test_workforce_initialization(mock_model, share_memory): assert workforce.graceful_shutdown_timeout == 2.0 -def test_shared_memory_operations( +@pytest.mark.asyncio +async def test_shared_memory_operations( mock_model, mock_agent, sample_shared_memory ): r"""Test shared memory collection and synchronization""" @@ -353,7 +354,7 @@ def test_shared_memory_operations( share_memory=True, ) - workforce.add_single_agent_worker("TestAgent", mock_agent) + await workforce.add_single_agent_worker_async("TestAgent", mock_agent) # Test memory collection and synchronization with patch.object( @@ -366,7 +367,8 @@ def test_shared_memory_operations( mock_share.assert_called_once_with(sample_shared_memory) -def test_cross_agent_memory_access(mock_model, sample_shared_memory): +@pytest.mark.asyncio +async def test_cross_agent_memory_access(mock_model, sample_shared_memory): r"""Test cross-agent information access after memory sync""" # Create custom agents for the workforce coordinator_agent = ChatAgent( @@ -400,8 +402,8 @@ def test_cross_agent_memory_access(mock_model, sample_shared_memory): bob_response.msgs = [MagicMock(content="I know room 314 and code BLUE42")] agent_bob.step.return_value = bob_response - workforce.add_single_agent_worker("Alice", agent_alice) - workforce.add_single_agent_worker("Bob", agent_bob) + await workforce.add_single_agent_worker_async("Alice", agent_alice) + await workforce.add_single_agent_worker_async("Bob", agent_bob) # Simulate memory sync with patch.object( @@ -420,20 +422,21 @@ def test_cross_agent_memory_access(mock_model, sample_shared_memory): assert "blue42" in content and "314" in content -def test_dynamic_worker_addition(): +@pytest.mark.asyncio +async def test_dynamic_worker_addition(): r"""Test adding workers dynamically during different workforce states""" workforce = Workforce(description="Dynamic Test Workforce") from camel.societies.workforce.workforce import WorkforceState # Test 1: Add worker in IDLE state (should work) agent1 = ChatAgent("You are a test agent.") - workforce.add_single_agent_worker("Test Worker 1", agent1) + await workforce.add_single_agent_worker_async("Test Worker 1", agent1) assert len(workforce._children) == 1 # Test 2: Add worker in PAUSED state (should work) workforce._state = WorkforceState.PAUSED agent2 = ChatAgent("You are another test agent.") - workforce.add_single_agent_worker("Test Worker 2", agent2) + await workforce.add_single_agent_worker_async("Test Worker 2", agent2) assert len(workforce._children) == 2 # Test 3: Try to add worker in RUNNING state (should fail) @@ -443,7 +446,7 @@ def test_dynamic_worker_addition(): with pytest.raises( RuntimeError, match="Cannot add workers while workforce is running" ): - workforce.add_single_agent_worker("Should Fail", agent3) + await workforce.add_single_agent_worker_async("Should Fail", agent3) assert len(workforce._children) == 2 # No new worker added @@ -458,11 +461,11 @@ async def test_dynamic_worker_types(): # Add SingleAgentWorker agent = ChatAgent("You are a specialist.") - workforce.add_single_agent_worker("Specialist", agent) + await workforce.add_single_agent_worker_async("Specialist", agent) assert len(workforce._children) == 1 # Add RolePlayingWorker - workforce.add_role_playing_worker( + await workforce.add_role_playing_worker_async( description="Analysis Team", assistant_role_name="Analyst", user_role_name="Expert", diff --git a/test/workforce/test_workforce_callbacks.py b/test/workforce/test_workforce_callbacks.py index 58b4398805..6f27a88ec2 100644 --- a/test/workforce/test_workforce_callbacks.py +++ b/test/workforce/test_workforce_callbacks.py @@ -18,6 +18,8 @@ from camel.agents import ChatAgent from camel.messages import BaseMessage from camel.models import ModelFactory +from camel.societies.workforce import SingleAgentWorker +from camel.societies.workforce.base import BaseNode from camel.societies.workforce.events import ( AllTasksCompletedEvent, TaskAssignedEvent, @@ -45,33 +47,35 @@ def __init__(self) -> None: self.events: list[WorkforceEvent] = [] # Task events - def log_task_created(self, event: TaskCreatedEvent) -> None: + async def log_task_created(self, event: TaskCreatedEvent) -> None: self.events.append(event) - def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: + async def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: self.events.append(event) - def log_task_assigned(self, event: TaskAssignedEvent) -> None: + async def log_task_assigned(self, event: TaskAssignedEvent) -> None: self.events.append(event) - def log_task_started(self, event: TaskStartedEvent) -> None: + async def log_task_started(self, event: TaskStartedEvent) -> None: self.events.append(event) - def log_task_completed(self, event: TaskCompletedEvent) -> None: + async def log_task_completed(self, event: TaskCompletedEvent) -> None: self.events.append(event) - def log_task_failed(self, event: TaskFailedEvent) -> None: + async def log_task_failed(self, event: TaskFailedEvent) -> None: self.events.append(event) # Worker events - def log_worker_created(self, event: WorkerCreatedEvent) -> None: + async def log_worker_created(self, event: WorkerCreatedEvent) -> None: self.events.append(event) - def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: + async def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: self.events.append(event) # Terminal event - def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None: + async def log_all_tasks_completed( + self, event: AllTasksCompletedEvent + ) -> None: self.events.append(event) @@ -80,53 +84,54 @@ class _MetricsCallback(WorkforceCallback, WorkforceMetrics): def __init__(self) -> None: self.events: list[WorkforceEvent] = [] - self.reset_task_data() self.dump_to_json_called = False self.get_ascii_tree_called = False self.get_kpis_called = False # WorkforceMetrics interface - def reset_task_data(self) -> None: + async def reset_task_data(self) -> None: self.dump_to_json_called = False self.get_ascii_tree_called = False self.get_kpis_called = False - def dump_to_json(self, file_path: str) -> None: + async def dump_to_json(self, file_path: str) -> None: self.dump_to_json_called = True - def get_ascii_tree_representation(self) -> str: + async def get_ascii_tree_representation(self) -> str: self.get_ascii_tree_called = True return "Stub ASCII Tree" - def get_kpis(self) -> Dict[str, Any]: + async def get_kpis(self) -> Dict[str, Any]: self.get_kpis_called = True return {} - def log_task_created(self, event: TaskCreatedEvent) -> None: + async def log_task_created(self, event: TaskCreatedEvent) -> None: self.events.append(event) - def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: + async def log_task_decomposed(self, event: TaskDecomposedEvent) -> None: self.events.append(event) - def log_task_assigned(self, event: TaskAssignedEvent) -> None: + async def log_task_assigned(self, event: TaskAssignedEvent) -> None: self.events.append(event) - def log_task_started(self, event: TaskStartedEvent) -> None: + async def log_task_started(self, event: TaskStartedEvent) -> None: self.events.append(event) - def log_task_completed(self, event: TaskCompletedEvent) -> None: + async def log_task_completed(self, event: TaskCompletedEvent) -> None: self.events.append(event) - def log_task_failed(self, event: TaskFailedEvent) -> None: + async def log_task_failed(self, event: TaskFailedEvent) -> None: self.events.append(event) - def log_worker_created(self, event: WorkerCreatedEvent) -> None: + async def log_worker_created(self, event: WorkerCreatedEvent) -> None: self.events.append(event) - def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: + async def log_worker_deleted(self, event: WorkerDeletedEvent) -> None: self.events.append(event) - def log_all_tasks_completed(self, event: AllTasksCompletedEvent) -> None: + async def log_all_tasks_completed( + self, event: AllTasksCompletedEvent + ) -> None: self.events.append(event) @@ -139,7 +144,82 @@ def _build_stub_agent() -> ChatAgent: return ChatAgent(model=model) -def test_workforce_callback_registration_and_metrics_handling(): +def _build_persona_agent(role_name: str, content: str) -> ChatAgent: + """Construct a stub-backed ChatAgent with a system persona.""" + return ChatAgent( + system_message=BaseMessage.make_assistant_message( + role_name=role_name, + content=content, + ), + model=ModelFactory.create( + model_platform=ModelPlatformType.OPENAI, + model_type=ModelType.STUB, + ), + ) + + +def _build_worker_specs() -> list[tuple[str, str, ChatAgent]]: + """Build the standard trio of workers used across tests.""" + return [ + ( + "A researcher who can search online for information.", + "SearchWork", + _build_persona_agent( + "Research Specialist", + "You are a research specialist who excels at finding and " + "gathering information from the web.", + ), + ), + ( + "An analyst who can process research findings.", + "AnalystWorker", + _build_persona_agent( + "Business Analyst", + "You are an expert business analyst. Your job is " + "to analyze research findings, identify key insights, " + "opportunities, and challenges.", + ), + ), + ( + "A writer who can create a final report from the analysis.", + "WriterWorker", + _build_persona_agent( + "Report Writer", + "You are a professional report writer. You take " + "analytical insights and synthesize them into a clear, " + "concise, and well-structured final report.", + ), + ), + ] + + +async def _assert_metrics_callbacks( + workforce: Workforce, cb: _MetricsCallback +): + """Verify metrics callback toggles across reset cycles.""" + assert not cb.dump_to_json_called + assert not cb.get_ascii_tree_called + assert not cb.get_kpis_called + await workforce.dump_workforce_logs_async("foo.log") + assert cb.dump_to_json_called + + await workforce.reset_async() + assert not cb.dump_to_json_called + assert not cb.get_ascii_tree_called + assert not cb.get_kpis_called + await workforce.get_workforce_kpis_async() + assert cb.get_kpis_called + + await workforce.reset_async() + assert not cb.dump_to_json_called + assert not cb.get_ascii_tree_called + assert not cb.get_kpis_called + await workforce.get_workforce_log_tree_async() + assert cb.get_ascii_tree_called + + +@pytest.mark.asyncio +async def test_workforce_callback_registration_and_metrics_handling(): """Verify default logger addition and metrics-callback skip logic. - When no metrics callback is provided, WorkforceLogger is added. @@ -157,7 +237,7 @@ def test_workforce_callback_registration_and_metrics_handling(): # Add a worker and ensure our callback saw the event agent = _build_stub_agent() - wf1.add_single_agent_worker("UnitTest Worker", agent) + await wf1.add_single_agent_worker_async("UnitTest Worker", agent) assert any(isinstance(e, WorkerCreatedEvent) for e in cb.events) # 2) Metrics-capable callback present -> no default WorkforceLogger @@ -172,161 +252,48 @@ def test_workforce_callback_registration_and_metrics_handling(): Workforce("CB Test - Invalid", callbacks=[object()]) -def assert_event_sequence(events: list[str], min_worker_count: int): - """ - Validate that the given event sequence follows the expected logical order. - This version is flexible to handle: - - Task retries and dynamic worker creation - - Cases where tasks are not decomposed (e.g., when using stub models) - """ - idx = 0 - n = len(events) - - # 1. Expect at least min_worker_count WorkerCreatedEvent events first - initial_worker_count = 0 - while idx < n and events[idx] == "WorkerCreatedEvent": - initial_worker_count += 1 - idx += 1 - assert initial_worker_count >= min_worker_count, ( - f"Expected at least {min_worker_count} initial " - f"WorkerCreatedEvents, got {initial_worker_count}" - ) - - # 2. Expect one main TaskCreatedEvent - assert idx < n and events[idx] == "TaskCreatedEvent", ( - f"Event {idx} should be TaskCreatedEvent, got " - f"{events[idx] if idx < n else 'END'}" - ) - idx += 1 - - # 3. TaskDecomposedEvent may or may not be present - # (depends on coordinator behavior) - # If the coordinator can't parse stub responses, it may skip - # decomposition - has_decomposition = idx < n and events[idx] == "TaskDecomposedEvent" - if has_decomposition: - idx += 1 - - # 4. Count all event types in the remaining events - all_events = events[idx:] - task_assigned_count = all_events.count("TaskAssignedEvent") - task_started_count = all_events.count("TaskStartedEvent") - task_completed_count = all_events.count("TaskCompletedEvent") - all_tasks_completed_count = all_events.count("AllTasksCompletedEvent") - - # 5. Validate basic invariants - # At minimum, the main task should be assigned and processed - assert ( - task_assigned_count >= 1 - ), f"Expected at least 1 TaskAssignedEvent, got {task_assigned_count}" - assert ( - task_started_count >= 1 - ), f"Expected at least 1 TaskStartedEvent, got {task_started_count}" - assert ( - task_completed_count >= 1 - ), f"Expected at least 1 TaskCompletedEvent, got {task_completed_count}" - - # 6. Expect exactly one AllTasksCompletedEvent at the end - assert all_tasks_completed_count == 1, ( - f"Expected exactly 1 AllTasksCompletedEvent, got " - f"{all_tasks_completed_count}" - ) - assert ( - events[-1] == "AllTasksCompletedEvent" - ), "Last event should be AllTasksCompletedEvent" - - # 7. All events should be of expected types - allowed_events = { - "WorkerCreatedEvent", - "WorkerDeletedEvent", - "TaskCreatedEvent", - "TaskDecomposedEvent", - "TaskAssignedEvent", - "TaskStartedEvent", - "TaskCompletedEvent", - "TaskFailedEvent", - "AllTasksCompletedEvent", - } - for i, e in enumerate(events): - assert e in allowed_events, f"Unexpected event type at {i}: {e}" - - -def test_workforce_emits_expected_event_sequence(): - # Use STUB model to avoid real API calls and ensure fast, - # deterministic execution - search_agent = ChatAgent( - system_message=BaseMessage.make_assistant_message( - role_name="Research Specialist", - content="You are a research specialist who excels at finding and " - "gathering information from the web.", - ), - model=ModelFactory.create( - model_platform=ModelPlatformType.OPENAI, - model_type=ModelType.STUB, - ), - ) - - analyst_agent = ChatAgent( - system_message=BaseMessage.make_assistant_message( - role_name="Business Analyst", - content="You are an expert business analyst. Your job is " - "to analyze research findings, identify key insights, " - "opportunities, and challenges.", - ), - model=ModelFactory.create( - model_platform=ModelPlatformType.OPENAI, - model_type=ModelType.STUB, - ), - ) - - writer_agent = ChatAgent( - system_message=BaseMessage.make_assistant_message( - role_name="Report Writer", - content="You are a professional report writer. You take " - "analytical insights and synthesize them into a clear, " - "concise, and well-structured final report.", - ), - model=ModelFactory.create( - model_platform=ModelPlatformType.OPENAI, - model_type=ModelType.STUB, - ), - ) - +@pytest.mark.asyncio +@pytest.mark.parametrize( + "preconfigure_children", + [False, True], + ids=["add_workers_at_runtime", "preconfigure_children"], +) +async def test_workforce_emits_expected_events_for_worker_init_modes( + preconfigure_children: bool, +): + """Validate event ordering for both worker setup paths.""" cb = _MetricsCallback() - - # Use STUB models for coordinator and task agents to avoid real API calls - coordinator_agent = ChatAgent( - model=ModelFactory.create( - model_platform=ModelPlatformType.OPENAI, - model_type=ModelType.STUB, + coordinator_agent = _build_stub_agent() + task_agent = _build_stub_agent() + worker_specs = _build_worker_specs() + + if preconfigure_children: + children: list[BaseNode] = [ + SingleAgentWorker(description=child_desc, worker=agent) + for _, child_desc, agent in worker_specs + ] + workforce = Workforce( + 'Business Analysis Team', + graceful_shutdown_timeout=30.0, + callbacks=[cb], + coordinator_agent=coordinator_agent, + task_agent=task_agent, + children=children, ) - ) - task_agent = ChatAgent( - model=ModelFactory.create( - model_platform=ModelPlatformType.OPENAI, - model_type=ModelType.STUB, + else: + workforce = Workforce( + 'Business Analysis Team', + graceful_shutdown_timeout=30.0, + callbacks=[cb], + coordinator_agent=coordinator_agent, + task_agent=task_agent, ) - ) + for add_desc, _, agent in worker_specs: + await workforce.add_single_agent_worker_async( + add_desc, + worker=agent, + ) - workforce = Workforce( - 'Business Analysis Team', - graceful_shutdown_timeout=30.0, - callbacks=[cb], - coordinator_agent=coordinator_agent, - task_agent=task_agent, - ) - - workforce.add_single_agent_worker( - "A researcher who can search online for information.", - worker=search_agent, - ).add_single_agent_worker( - "An analyst who can process research findings.", worker=analyst_agent - ).add_single_agent_worker( - "A writer who can create a final report from the analysis.", - worker=writer_agent, - ) - - # Use a simpler task to ensure fast and deterministic execution human_task = Task( content=( "Create a simple report about electric scooters. " @@ -338,29 +305,20 @@ def test_workforce_emits_expected_event_sequence(): id='0', ) - workforce.process_task(human_task) + await workforce.process_task_async(human_task) - # test that the event sequence is as expected + expected_events = [ + "WorkerCreatedEvent", + "WorkerCreatedEvent", + "WorkerCreatedEvent", + "TaskCreatedEvent", + "WorkerCreatedEvent", + "TaskAssignedEvent", + "TaskStartedEvent", + "TaskCompletedEvent", + "AllTasksCompletedEvent", + ] actual_events = [e.__class__.__name__ for e in cb.events] - assert_event_sequence(actual_events, min_worker_count=3) - - # test that metrics callback methods work as expected - assert not cb.dump_to_json_called - assert not cb.get_ascii_tree_called - assert not cb.get_kpis_called - workforce.dump_workforce_logs("foo.log") - assert cb.dump_to_json_called + assert actual_events == expected_events - workforce.reset() - assert not cb.dump_to_json_called - assert not cb.get_ascii_tree_called - assert not cb.get_kpis_called - workforce.get_workforce_kpis() - assert cb.get_kpis_called - - workforce.reset() - assert not cb.dump_to_json_called - assert not cb.get_ascii_tree_called - assert not cb.get_kpis_called - workforce.get_workforce_log_tree() - assert cb.get_ascii_tree_called + await _assert_metrics_callbacks(workforce, cb) diff --git a/test/workforce/test_workforce_pipeline.py b/test/workforce/test_workforce_pipeline.py index 517fbe7855..76f9963139 100644 --- a/test/workforce/test_workforce_pipeline.py +++ b/test/workforce/test_workforce_pipeline.py @@ -245,8 +245,12 @@ async def test_pipeline_failed_task_continues_workflow(): success_worker = SuccessfulWorker("Success Worker") fail_worker = FailingWorker("Fail Worker") - workforce.add_single_agent_worker("Success Worker", success_worker.worker) - workforce.add_single_agent_worker("Fail Worker", fail_worker.worker) + await workforce.add_single_agent_worker_async( + "Success Worker", success_worker.worker + ) + await workforce.add_single_agent_worker_async( + "Fail Worker", fail_worker.worker + ) # Build pipeline where middle task will fail workforce.pipeline_add("Task 1").pipeline_add( @@ -295,7 +299,9 @@ async def test_pipeline_fork_with_one_branch_failing(): # Add workers success_worker = SuccessfulWorker("Success Worker") - workforce.add_single_agent_worker("Success Worker", success_worker.worker) + await workforce.add_single_agent_worker_async( + "Success Worker", success_worker.worker + ) # Build fork-join workforce.pipeline_add("Task A").pipeline_fork( @@ -426,7 +432,8 @@ def test_all_pipeline_tasks_successful_with_pending(): ), "Pipeline should not be marked successful with pending tasks" -def test_pipeline_reset_clears_state(): +@pytest.mark.asyncio +async def test_pipeline_reset_clears_state(): """Test that reset() clears pipeline state correctly.""" workforce = Workforce("Test Workforce") @@ -438,7 +445,7 @@ def test_pipeline_reset_clears_state(): assert workforce.mode == WorkforceMode.PIPELINE # Reset workforce - workforce.reset() + await workforce.reset_async() # Verify state was cleared assert len(workforce._pending_tasks) == 0