Skip to content

Commit e06983a

Browse files
committed
refactor: Clean up workforce.py documentation and improve task import structure
1 parent e06085f commit e06983a

File tree

4 files changed

+218
-6
lines changed

4 files changed

+218
-6
lines changed

camel/societies/workforce/workforce.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,7 @@ class Workforce(BaseNode):
250250
- `AUTO_DECOMPOSE`: automatic task decomposition with per-task
251251
assignment (original behavior).
252252
- `AUTO_DECOMPOSE_GROUPED`: automatic decomposition with task
253-
groups / worker groups (coarse routing, reduced coordinator
254-
load).
253+
groups (coarse routing, reduced coordinator load).
255254
- `PIPELINE`: predefined pipeline mode, simple retry, failed
256255
tasks may still feed into dependents.
257256
(default: :obj:`WorkforceMode.AUTO_DECOMPOSE`)

camel/tasks/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
# limitations under the License.
1313
# ========= Copyright 2023-2025 @ CAMEL-AI.org. All Rights Reserved. =========
1414
from .task import Task, TaskGroup, TaskManager
15-
from .task_prompt import TASK_DECOMPOSE_PROMPT, TASK_DECOMPOSE_GROUPED_PROMPT, TASK_EVOLVE_PROMPT
15+
from .task_prompt import (
16+
TASK_DECOMPOSE_GROUPED_PROMPT,
17+
TASK_DECOMPOSE_PROMPT,
18+
TASK_EVOLVE_PROMPT,
19+
)
1620

1721
__all__ = [
1822
"TASK_DECOMPOSE_PROMPT",

test/tasks/test_task.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ def test_parse_response_grouped_requires_task_group():
192192
with pytest.raises(ValueError) as exc_info:
193193
parse_response_grouped(bad_response, task_id="0")
194194

195-
assert "<tasks> must contain at least one <task_group>" in str(exc_info.value)
195+
assert "<tasks> must contain at least one <task_group>" in str(
196+
exc_info.value
197+
)
196198

197199

198200
@pytest.fixture

test/workforce/test_workforce.py

Lines changed: 209 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@
2121
from camel.agents import ChatAgent
2222
from camel.models import ModelFactory
2323
from camel.societies.workforce.task_channel import TaskChannel
24-
from camel.societies.workforce.workforce import Workforce
25-
from camel.tasks.task import Task, TaskState
24+
from camel.societies.workforce.utils import (
25+
TaskAssignment,
26+
TaskAssignResult,
27+
TaskGroupAssignment,
28+
TaskGroupAssignResult,
29+
)
30+
from camel.societies.workforce.workforce import Workforce, WorkforceMode
31+
from camel.tasks.task import Task, TaskGroup, TaskState
2632
from camel.types import ModelPlatformType, ModelType
2733

2834

@@ -491,3 +497,204 @@ async def mock_coroutine():
491497
workforce._state = WorkforceState.PAUSED
492498
workforce._loop = None
493499
workforce._start_child_node_when_paused(mock_coroutine())
500+
501+
502+
def _build_grouped_tasks():
503+
"""Helper to build two task groups with two tasks each."""
504+
group1 = TaskGroup(content="Group 1", id="g1")
505+
group2 = TaskGroup(content="Group 2", id="g2")
506+
507+
g1_t1 = Task(content="G1 Task 1", id="g1-1")
508+
g1_t2 = Task(content="G1 Task 2", id="g1-2")
509+
g2_t1 = Task(content="G2 Task 1", id="g2-1")
510+
g2_t2 = Task(content="G2 Task 2", id="g2-2")
511+
512+
group1.add_task(g1_t1)
513+
group1.add_task(g1_t2)
514+
group2.add_task(g2_t1)
515+
group2.add_task(g2_t2)
516+
517+
tasks = [g1_t1, g1_t2, g2_t1, g2_t2]
518+
return tasks, group1, group2
519+
520+
521+
def test_expand_group_to_task_dependencies_any_mode_parallelization():
522+
r"""In AUTO_DECOMPOSE_GROUPED, ANY dependency should parallelize tasks
523+
between two groups when group sizes match."""
524+
workforce = Workforce(description="Grouped Workforce Test")
525+
526+
tasks, group1, group2 = _build_grouped_tasks()
527+
528+
group_assign_result = TaskGroupAssignResult(
529+
assignments=[
530+
TaskGroupAssignment(
531+
task_group_id=group1.id,
532+
assignee_id="worker-A",
533+
dependencies=[],
534+
),
535+
TaskGroupAssignment(
536+
task_group_id=group2.id,
537+
assignee_id="worker-B",
538+
dependencies=[[group1.id, "ANY"]],
539+
),
540+
]
541+
)
542+
543+
result = workforce._expand_group_to_task_dependencies(
544+
tasks, group_assign_result
545+
)
546+
547+
# We should have one assignment per task
548+
assert len(result.assignments) == 4
549+
by_task = {a.task_id: a for a in result.assignments}
550+
551+
# Group 1 tasks have no dependencies and are assigned to worker-A
552+
assert by_task["g1-1"].assignee_id == "worker-A"
553+
assert by_task["g1-2"].assignee_id == "worker-A"
554+
assert by_task["g1-1"].dependencies == []
555+
assert by_task["g1-2"].dependencies == []
556+
557+
# Group 2 tasks are parallelized: each depends on the corresponding
558+
# task from group 1 and assigned to worker-B
559+
assert by_task["g2-1"].assignee_id == "worker-B"
560+
assert by_task["g2-2"].assignee_id == "worker-B"
561+
assert by_task["g2-1"].dependencies == ["g1-1"]
562+
assert by_task["g2-2"].dependencies == ["g1-2"]
563+
564+
565+
def test_expand_group_to_task_dependencies_all_mode_aggregates():
566+
r"""In AUTO_DECOMPOSE_GROUPED, ALL dependency should expand to all tasks
567+
in the upstream group."""
568+
workforce = Workforce(description="Grouped Workforce Test")
569+
570+
tasks, group1, group2 = _build_grouped_tasks()
571+
572+
group_assign_result = TaskGroupAssignResult(
573+
assignments=[
574+
TaskGroupAssignment(
575+
task_group_id=group1.id,
576+
assignee_id="worker-A",
577+
dependencies=[],
578+
),
579+
TaskGroupAssignment(
580+
task_group_id=group2.id,
581+
assignee_id="worker-B",
582+
dependencies=[[group1.id, "ALL"]],
583+
),
584+
]
585+
)
586+
587+
result = workforce._expand_group_to_task_dependencies(
588+
tasks, group_assign_result
589+
)
590+
591+
by_task = {a.task_id: a for a in result.assignments}
592+
593+
# Group 2 tasks should depend on all tasks in group 1
594+
expected_deps = {task.id for task in group1.get_tasks()}
595+
assert set(by_task["g2-1"].dependencies) == expected_deps
596+
assert set(by_task["g2-2"].dependencies) == expected_deps
597+
598+
599+
def test_expand_group_to_task_dependencies_unknown_group_raises():
600+
r"""Unknown task_group_id in TaskGroupAssignResult should raise ValueError."""
601+
workforce = Workforce(description="Grouped Workforce Test")
602+
603+
tasks, _, _ = _build_grouped_tasks()
604+
605+
group_assign_result = TaskGroupAssignResult(
606+
assignments=[
607+
TaskGroupAssignment(
608+
task_group_id="nonexistent-group",
609+
assignee_id="worker-X",
610+
dependencies=[],
611+
)
612+
]
613+
)
614+
615+
with pytest.raises(ValueError) as exc_info:
616+
workforce._expand_group_to_task_dependencies(
617+
tasks, group_assign_result
618+
)
619+
620+
assert "Unknown task_group_id: nonexistent-group" in str(exc_info.value)
621+
622+
623+
def test_decompose_task_uses_grouped_mode(monkeypatch):
624+
r"""AUTO_DECOMPOSE_GROUPED mode should use Task.decompose_grouped."""
625+
workforce = Workforce(description="Grouped Decompose Test")
626+
workforce.set_mode(WorkforceMode.AUTO_DECOMPOSE_GROUPED)
627+
628+
task = Task(content="Main grouped task", id="0")
629+
fake_subtasks = [Task(content="Subtask 1", id="0.1")]
630+
631+
with (
632+
patch.object(
633+
Task, "decompose_grouped", return_value=fake_subtasks
634+
) as mock_grouped,
635+
patch.object(Task, "decompose") as mock_decompose,
636+
):
637+
result = workforce._decompose_task(task)
638+
639+
# Should call grouped decomposition but not the regular one
640+
mock_grouped.assert_called_once()
641+
mock_decompose.assert_not_called()
642+
assert result == fake_subtasks
643+
644+
645+
@pytest.mark.asyncio
646+
async def test_find_assignee_uses_grouped_assignment_in_grouped_mode(
647+
monkeypatch,
648+
):
649+
r"""In AUTO_DECOMPOSE_GROUPED mode, _find_assignee should use the
650+
grouped assignment path and update task dependencies."""
651+
workforce = Workforce(description="Grouped Assignment Test")
652+
workforce.set_mode(WorkforceMode.AUTO_DECOMPOSE_GROUPED)
653+
654+
# Avoid readiness wait loop by returning non-empty valid workers
655+
monkeypatch.setattr(
656+
workforce, "_get_valid_worker_ids", lambda: {"worker-1"}
657+
)
658+
659+
tasks = [
660+
Task(content="Task 1", id="t1"),
661+
Task(content="Task 2", id="t2"),
662+
]
663+
664+
grouped_result = TaskAssignResult(
665+
assignments=[
666+
TaskAssignment(
667+
task_id="t1", assignee_id="worker-1", dependencies=[]
668+
),
669+
TaskAssignment(
670+
task_id="t2", assignee_id="worker-1", dependencies=["t1"]
671+
),
672+
]
673+
)
674+
675+
with (
676+
patch.object(
677+
workforce,
678+
"_call_coordinator_for_assignment_grouped",
679+
return_value=grouped_result,
680+
) as mock_grouped,
681+
patch.object(
682+
workforce, "_call_coordinator_for_assignment"
683+
) as mock_plain,
684+
):
685+
assign_result = await workforce._find_assignee(tasks)
686+
687+
# Should use grouped assignment path only
688+
mock_grouped.assert_called_once()
689+
mock_plain.assert_not_called()
690+
691+
# Result should be exactly the grouped_result we returned
692+
assert isinstance(assign_result, TaskAssignResult)
693+
assert [a.task_id for a in assign_result.assignments] == ["t1", "t2"]
694+
695+
# _find_assignee should also update Task.dependencies from assignments
696+
t1 = next(t for t in tasks if t.id == "t1")
697+
t2 = next(t for t in tasks if t.id == "t2")
698+
699+
assert t1.dependencies == []
700+
assert t2.dependencies == [t1]

0 commit comments

Comments
 (0)