Skip to content

Commit 6b58e8a

Browse files
authored
enhance: update WorkforceLogger with log_message PR3485 (#3577)
1 parent 4d27028 commit 6b58e8a

File tree

3 files changed

+148
-65
lines changed

3 files changed

+148
-65
lines changed

camel/societies/workforce/workforce.py

Lines changed: 140 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
from camel.responses import ChatAgentResponse
4545
from camel.utils.context_utils import ContextUtility, WorkflowSummary
4646

47-
from colorama import Fore
4847

4948
from camel.agents import ChatAgent
5049
from camel.logger import get_logger
@@ -4373,11 +4372,18 @@ async def _handle_failed_task(self, task: Task) -> bool:
43734372
f"{task.failure_count}/{max_retries}): {detailed_error}"
43744373
)
43754374

4376-
print(
4377-
f"{Fore.RED}❌ Task {task.id} failed "
4378-
f"(attempt {task.failure_count}/{max_retries}): "
4379-
f"{failure_reason}{Fore.RESET}"
4380-
)
4375+
for cb in self._callbacks:
4376+
cb.log_message(
4377+
LogEvent(
4378+
message=(
4379+
f"❌ Task {task.id} failed "
4380+
f"(attempt {task.failure_count}/{max_retries}): "
4381+
f"{failure_reason}"
4382+
),
4383+
level="error",
4384+
color="red",
4385+
)
4386+
)
43814387

43824388
task_failed_event = TaskFailedEvent(
43834389
task_id=task.id,
@@ -4813,7 +4819,14 @@ def dump_workforce_logs(self, file_path: str) -> None:
48134819
cb for cb in self._callbacks if isinstance(cb, WorkforceMetrics)
48144820
]
48154821
if len(metrics_cb) == 0:
4816-
print("Logger not initialized. Cannot dump logs.")
4822+
for cb in self._callbacks:
4823+
cb.log_message(
4824+
LogEvent(
4825+
message="Logger not initialized. Cannot dump logs.",
4826+
level="warning",
4827+
color="yellow",
4828+
)
4829+
)
48174830
return
48184831
metrics_cb[0].dump_to_json(file_path)
48194832
# Use logger.info or print, consistent with existing style
@@ -5108,28 +5121,42 @@ async def _listen_to_channel(self) -> None:
51085121

51095122
# Do not halt if we have main tasks in queue
51105123
if len(self.get_main_task_queue()) > 0:
5111-
print(
5112-
f"{Fore.RED}Task {returned_task.id} has "
5113-
f"failed for "
5114-
f"{self.failure_handling_config.max_retries}"
5115-
f" "
5116-
f"times after insufficient results, "
5117-
f"skipping that task. Final error: "
5118-
f"{returned_task.result or 'Unknown err'}"
5119-
f"{Fore.RESET}"
5120-
)
5124+
cfg = self.failure_handling_config
5125+
max_r = cfg.max_retries
5126+
err = returned_task.result or 'Unknown err'
5127+
for cb in self._callbacks:
5128+
cb.log_message(
5129+
LogEvent(
5130+
message=(
5131+
f"Task {returned_task.id} has "
5132+
f"failed for {max_r} times "
5133+
f"after insufficient results, "
5134+
f"skipping that task. "
5135+
f"Final error: {err}"
5136+
),
5137+
level="error",
5138+
color="red",
5139+
)
5140+
)
51215141
self._skip_requested = True
51225142
continue
51235143

5124-
print(
5125-
f"{Fore.RED}Task {returned_task.id} has "
5126-
f"failed for "
5127-
f"{self.failure_handling_config.max_retries} "
5128-
f"times after insufficient results, halting "
5129-
f"the workforce. Final error: "
5130-
f"{returned_task.result or 'Unknown error'}"
5131-
f"{Fore.RESET}"
5132-
)
5144+
max_r = self.failure_handling_config.max_retries
5145+
err = returned_task.result or 'Unknown error'
5146+
for cb in self._callbacks:
5147+
cb.log_message(
5148+
LogEvent(
5149+
message=(
5150+
f"Task {returned_task.id} has "
5151+
f"failed for {max_r} times "
5152+
f"after insufficient results, "
5153+
f"halting the workforce. "
5154+
f"Final error: {err}"
5155+
),
5156+
level="error",
5157+
color="red",
5158+
)
5159+
)
51335160
await self._graceful_shutdown(returned_task)
51345161
break
51355162
except Exception as e:
@@ -5147,11 +5174,19 @@ async def _listen_to_channel(self) -> None:
51475174
self.failure_handling_config.enabled_strategies
51485175
== []
51495176
):
5150-
print(
5151-
f"{Fore.CYAN}Task {returned_task.id} "
5152-
f"completed (quality check skipped - "
5153-
f"no recovery strategies enabled).{Fore.RESET}"
5154-
)
5177+
for cb in self._callbacks:
5178+
cb.log_message(
5179+
LogEvent(
5180+
message=(
5181+
f"Task {returned_task.id} "
5182+
f"completed (quality check "
5183+
f"skipped - no recovery "
5184+
f"strategies)."
5185+
),
5186+
level="info",
5187+
color="cyan",
5188+
)
5189+
)
51555190
await self._handle_completed_task(returned_task)
51565191
continue
51575192

@@ -5176,31 +5211,48 @@ async def _listen_to_channel(self) -> None:
51765211
returned_task.failure_count
51775212
>= quality_retry_limit
51785213
):
5179-
print(
5180-
f"{Fore.YELLOW}Task {returned_task.id} "
5181-
f"completed with low quality score: "
5182-
f"{quality_eval.quality_score} "
5183-
f"(retry limit reached){Fore.RESET}"
5184-
)
5214+
score = quality_eval.quality_score
5215+
for cb in self._callbacks:
5216+
cb.log_message(
5217+
LogEvent(
5218+
message=(
5219+
f"Task {returned_task.id} "
5220+
f"completed with low quality "
5221+
f"score: {score} "
5222+
f"(retry limit reached)"
5223+
),
5224+
level="warning",
5225+
color="yellow",
5226+
)
5227+
)
51855228
await self._handle_completed_task(
51865229
returned_task
51875230
)
51885231
continue
51895232

5190-
# Print visual feedback for quality-failed tasks
5233+
# Log visual feedback for quality-failed tasks
51915234
# with recovery strategy
51925235
recovery_action = (
51935236
quality_eval.recovery_strategy.value
51945237
if quality_eval.recovery_strategy
51955238
else ""
51965239
)
5197-
print(
5198-
f"{Fore.YELLOW}⚠️ Task {returned_task.id} "
5199-
f"failed quality check (score: "
5200-
f"{quality_eval.quality_score}). "
5201-
f"Issues: {', '.join(quality_eval.issues)}. "
5202-
f"Recovery: {recovery_action}{Fore.RESET}"
5203-
)
5240+
score = quality_eval.quality_score
5241+
issues = ', '.join(quality_eval.issues)
5242+
for cb in self._callbacks:
5243+
cb.log_message(
5244+
LogEvent(
5245+
message=(
5246+
f"⚠️ Task {returned_task.id} "
5247+
f"failed quality check "
5248+
f"(score: {score}). "
5249+
f"Issues: {issues}. "
5250+
f"Recovery: {recovery_action}"
5251+
),
5252+
level="warning",
5253+
color="yellow",
5254+
)
5255+
)
52045256

52055257
# Mark as failed for recovery
52065258
returned_task.failure_count += 1
@@ -5252,11 +5304,19 @@ async def _listen_to_channel(self) -> None:
52525304
)
52535305
continue
52545306
else:
5255-
print(
5256-
f"{Fore.CYAN}Task {returned_task.id} "
5257-
f"completed successfully (quality score: "
5258-
f"{quality_eval.quality_score}).{Fore.RESET}"
5259-
)
5307+
score = quality_eval.quality_score
5308+
for cb in self._callbacks:
5309+
cb.log_message(
5310+
LogEvent(
5311+
message=(
5312+
f"Task {returned_task.id} "
5313+
f"completed successfully "
5314+
f"(quality score: {score})."
5315+
),
5316+
level="info",
5317+
color="cyan",
5318+
)
5319+
)
52605320
await self._handle_completed_task(returned_task)
52615321
elif returned_task.state == TaskState.FAILED:
52625322
try:
@@ -5266,24 +5326,39 @@ async def _listen_to_channel(self) -> None:
52665326

52675327
# Do not halt if we have main tasks in queue
52685328
if len(self.get_main_task_queue()) > 0:
5269-
print(
5270-
f"{Fore.RED}Task {returned_task.id} has "
5271-
f"failed for "
5272-
f"{self.failure_handling_config.max_retries} "
5273-
f"times, skipping that task. Final error: "
5274-
f"{returned_task.result or 'Unknown error'}"
5275-
f"{Fore.RESET}"
5276-
)
5329+
max_r = self.failure_handling_config.max_retries
5330+
err = returned_task.result or 'Unknown error'
5331+
for cb in self._callbacks:
5332+
cb.log_message(
5333+
LogEvent(
5334+
message=(
5335+
f"Task {returned_task.id} has "
5336+
f"failed for {max_r} times, "
5337+
f"skipping that task. "
5338+
f"Final error: {err}"
5339+
),
5340+
level="error",
5341+
color="red",
5342+
)
5343+
)
52775344
self._skip_requested = True
52785345
continue
52795346

5280-
print(
5281-
f"{Fore.RED}Task {returned_task.id} has failed "
5282-
f"for {self.failure_handling_config.max_retries} "
5283-
f"times, halting the workforce. Final error: "
5284-
f"{returned_task.result or 'Unknown error'}"
5285-
f"{Fore.RESET}"
5286-
)
5347+
max_r = self.failure_handling_config.max_retries
5348+
err = returned_task.result or 'Unknown error'
5349+
for cb in self._callbacks:
5350+
cb.log_message(
5351+
LogEvent(
5352+
message=(
5353+
f"Task {returned_task.id} has "
5354+
f"failed for {max_r} times, "
5355+
f"halting the workforce. "
5356+
f"Final error: {err}"
5357+
),
5358+
level="error",
5359+
color="red",
5360+
)
5361+
)
52875362
# Graceful shutdown instead of immediate break
52885363
await self._graceful_shutdown(returned_task)
52895364
break

camel/societies/workforce/workforce_callback.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class WorkforceCallback(ABC):
4242
"yellow": Fore.YELLOW,
4343
"red": Fore.RED,
4444
"green": Fore.GREEN,
45+
"blue": Fore.BLUE,
4546
"cyan": Fore.CYAN,
4647
"magenta": Fore.MAGENTA,
4748
"gray": Fore.LIGHTBLACK_EX,

test/workforce/test_workforce_callbacks.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from camel.models import ModelFactory
2121
from camel.societies.workforce.events import (
2222
AllTasksCompletedEvent,
23+
LogEvent,
2324
TaskAssignedEvent,
2425
TaskCompletedEvent,
2526
TaskCreatedEvent,
@@ -44,6 +45,9 @@ class _NonMetricsCallback(WorkforceCallback):
4445
def __init__(self) -> None:
4546
self.events: list[WorkforceEvent] = []
4647

48+
def log_message(self, event: LogEvent) -> None:
49+
pass
50+
4751
# Task events
4852
def log_task_created(self, event: TaskCreatedEvent) -> None:
4953
self.events.append(event)
@@ -85,6 +89,9 @@ def __init__(self) -> None:
8589
self.get_ascii_tree_called = False
8690
self.get_kpis_called = False
8791

92+
def log_message(self, event: LogEvent) -> None:
93+
pass
94+
8895
# WorkforceMetrics interface
8996
def reset_task_data(self) -> None:
9097
self.dump_to_json_called = False

0 commit comments

Comments
 (0)