Skip to content
Merged
13 changes: 13 additions & 0 deletions camel/societies/workforce/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class WorkforceEventBase(BaseModel):
"task_created",
"task_assigned",
"task_started",
"task_updated",
"task_completed",
"task_failed",
"worker_created",
Expand Down Expand Up @@ -100,6 +101,17 @@ class TaskStartedEvent(WorkforceEventBase):
worker_id: str


class TaskUpdatedEvent(WorkforceEventBase):
event_type: Literal["task_updated"] = "task_updated"
task_id: str
worker_id: Optional[str] = None
update_type: Literal["replan", "reassign", "manual"]
old_value: Optional[str] = None
new_value: Optional[str] = None
parent_task_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None


class TaskCompletedEvent(WorkforceEventBase):
event_type: Literal["task_completed"] = "task_completed"
task_id: str
Expand Down Expand Up @@ -136,6 +148,7 @@ class QueueStatusEvent(WorkforceEventBase):
TaskCreatedEvent,
TaskAssignedEvent,
TaskStartedEvent,
TaskUpdatedEvent,
TaskCompletedEvent,
TaskFailedEvent,
WorkerCreatedEvent,
Expand Down
57 changes: 55 additions & 2 deletions camel/societies/workforce/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import asyncio
import concurrent.futures
import copy
import json
import os
import time
Expand Down Expand Up @@ -102,6 +103,7 @@
TaskDecomposedEvent,
TaskFailedEvent,
TaskStartedEvent,
TaskUpdatedEvent,
WorkerCreatedEvent,
)

Expand Down Expand Up @@ -1678,8 +1680,30 @@ async def _apply_recovery_strategy(
elif strategy == RecoveryStrategy.REPLAN:
# Modify the task content and retry
if recovery_decision.modified_task_content:
task.content = recovery_decision.modified_task_content
logger.info(f"Task {task.id} content modified for replan")
old_content = copy.deepcopy(task.content)
new_content = recovery_decision.modified_task_content

task.content = new_content
logger.info(
f"Task {task.id} content modified for replan, "
f"new_content: {new_content}"
)

task_updated_event = TaskUpdatedEvent(
task_id=task.id,
parent_task_id=task.parent.id if task.parent else None,
worker_id=task.assigned_worker_id,
update_type="replan",
old_value=old_content,
new_value=new_content,
metadata={
"quality_score": recovery_decision.quality_score,
"reasoning": recovery_decision.reasoning,
"issues": recovery_decision.issues,
},
)
for cb in self._callbacks:
cb.log_task_updated(task_updated_event)

# Repost the modified task
if task.id in self._assignees:
Expand Down Expand Up @@ -1734,6 +1758,22 @@ async def _apply_recovery_strategy(
f"{new_worker}"
)

task_updated_event = TaskUpdatedEvent(
task_id=task.id,
parent_task_id=task.parent.id if task.parent else None,
worker_id=task.assigned_worker_id,
update_type="reassign",
old_value=old_worker,
new_value=new_worker,
metadata={
"quality_score": recovery_decision.quality_score,
"reasoning": recovery_decision.reasoning,
"issues": recovery_decision.issues,
},
)
for cb in self._callbacks:
cb.log_task_updated(task_updated_event)

elif strategy == RecoveryStrategy.DECOMPOSE:
# Decompose the task into subtasks
reason = (
Expand Down Expand Up @@ -2073,8 +2113,21 @@ def modify_task_content(self, task_id: str, new_content: str) -> bool:

for task in self._pending_tasks:
if task.id == task_id:
old_content = copy.deepcopy(task.content)
task.content = new_content
logger.info(f"Task {task_id} content modified.")

task_updated_event = TaskUpdatedEvent(
task_id=task.id,
parent_task_id=task.parent.id if task.parent else None,
worker_id=task.assigned_worker_id,
update_type="manual",
old_value=old_content,
new_value=new_content,
)
for cb in self._callbacks:
cb.log_task_updated(task_updated_event)

return True
logger.warning(f"Task {task_id} not found in pending tasks.")
return False
Expand Down
5 changes: 5 additions & 0 deletions camel/societies/workforce/workforce_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TaskDecomposedEvent,
TaskFailedEvent,
TaskStartedEvent,
TaskUpdatedEvent,
WorkerCreatedEvent,
WorkerDeletedEvent,
)
Expand Down Expand Up @@ -82,6 +83,10 @@ def log_task_assigned(self, event: TaskAssignedEvent) -> None:
def log_task_started(self, event: TaskStartedEvent) -> None:
pass

@abstractmethod
def log_task_updated(self, event: TaskUpdatedEvent) -> None:
pass

@abstractmethod
def log_task_completed(self, event: TaskCompletedEvent) -> None:
pass
Expand Down
16 changes: 16 additions & 0 deletions camel/societies/workforce/workforce_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TaskDecomposedEvent,
TaskFailedEvent,
TaskStartedEvent,
TaskUpdatedEvent,
WorkerCreatedEvent,
WorkerDeletedEvent,
)
Expand Down Expand Up @@ -157,6 +158,21 @@ def log_task_started(
if event.task_id in self._task_hierarchy:
self._task_hierarchy[event.task_id]['status'] = 'processing'

def log_task_updated(self, event: TaskUpdatedEvent) -> None:
r"""Logs updates made to a task."""
self._log_event(
event_type=event.event_type,
task_id=event.task_id,
worker_id=event.worker_id,
update_type=event.update_type,
old_value=event.old_value,
new_value=event.new_value,
parent_task_id=event.parent_task_id,
metadata=event.metadata or {},
)
if event.task_id in self._task_hierarchy:
self._task_hierarchy[event.task_id]['status'] = 'updated'

def log_task_completed(self, event: TaskCompletedEvent) -> None:
r"""Logs the successful completion of a task."""
self._log_event(
Expand Down
6 changes: 6 additions & 0 deletions examples/workforce/workforce_callbacks_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ def log_task_started(self, event: TaskStartedEvent) -> None:
f"worker={event.worker_id}"
)

def log_task_updated(self, event: TaskStartedEvent) -> None:
print(
f"[PrintCallback] task_updated: task={event.task_id}, "
f"worker={event.worker_id}"
)

def log_task_completed(self, event: TaskCompletedEvent) -> None:
print(
f"[PrintCallback] task_completed: task={event.task_id}, "
Expand Down
7 changes: 7 additions & 0 deletions test/workforce/test_workforce_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TaskDecomposedEvent,
TaskFailedEvent,
TaskStartedEvent,
TaskUpdatedEvent,
WorkerCreatedEvent,
WorkerDeletedEvent,
WorkforceEvent,
Expand Down Expand Up @@ -61,6 +62,9 @@ def log_task_assigned(self, event: TaskAssignedEvent) -> None:
def log_task_started(self, event: TaskStartedEvent) -> None:
self.events.append(event)

def log_task_updated(self, event: TaskUpdatedEvent) -> None:
self.events.append(event)

def log_task_completed(self, event: TaskCompletedEvent) -> None:
self.events.append(event)

Expand Down Expand Up @@ -121,6 +125,9 @@ def log_task_assigned(self, event: TaskAssignedEvent) -> None:
def log_task_started(self, event: TaskStartedEvent) -> None:
self.events.append(event)

def log_task_updated(self, event: TaskUpdatedEvent) -> None:
self.events.append(event)

def log_task_completed(self, event: TaskCompletedEvent) -> None:
self.events.append(event)

Expand Down