Skip to content

Commit d585eac

Browse files
committed
update
1 parent e6f6105 commit d585eac

File tree

2 files changed

+90
-56
lines changed

2 files changed

+90
-56
lines changed

camel/societies/workforce/worker.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
) -> None:
4646
super().__init__(description, node_id=node_id)
4747
self._active_task_ids: Set[str] = set()
48+
self._running_tasks: Set[asyncio.Task] = set()
4849

4950
def __repr__(self):
5051
return f"Worker node {self.node_id} ({self.description})"
@@ -113,15 +114,12 @@ async def _listen_to_channel(self):
113114
self._running = True
114115
logger.info(f"{self} started.")
115116

116-
# Keep track of running task coroutines
117-
running_tasks: Set[asyncio.Task] = set()
118-
119117
while self._running:
120118
try:
121119
# Clean up completed tasks
122-
completed_tasks = [t for t in running_tasks if t.done()]
120+
completed_tasks = [t for t in self._running_tasks if t.done()]
123121
for completed_task in completed_tasks:
124-
running_tasks.remove(completed_task)
122+
self._running_tasks.discard(completed_task)
125123
# Check for exceptions in completed tasks
126124
try:
127125
await completed_task
@@ -138,11 +136,11 @@ async def _listen_to_channel(self):
138136
task_coroutine = asyncio.create_task(
139137
self._process_single_task(task)
140138
)
141-
running_tasks.add(task_coroutine)
139+
self._running_tasks.add(task_coroutine)
142140

143141
except asyncio.TimeoutError:
144142
# No tasks available, continue loop
145-
if not running_tasks:
143+
if not self._running_tasks:
146144
# No tasks running and none available, short sleep
147145
await asyncio.sleep(0.1)
148146
continue
@@ -155,12 +153,14 @@ async def _listen_to_channel(self):
155153
continue
156154

157155
# Wait for all remaining tasks to complete when stopping
158-
if running_tasks:
156+
if self._running_tasks:
159157
logger.info(
160-
f"{self} stopping, waiting for {len(running_tasks)} "
158+
f"{self} stopping, waiting for {len(self._running_tasks)} "
161159
f"tasks to complete..."
162160
)
163-
await asyncio.gather(*running_tasks, return_exceptions=True)
161+
await asyncio.gather(
162+
*self._running_tasks, return_exceptions=True
163+
)
164164

165165
logger.info(f"{self} stopped.")
166166

@@ -173,4 +173,8 @@ async def start(self):
173173
def stop(self):
174174
r"""Stop the worker."""
175175
self._running = False
176+
# Cancel any in-flight task coroutines
177+
for task in list(self._running_tasks):
178+
if not task.done():
179+
task.cancel()
176180
return

camel/societies/workforce/workforce.py

Lines changed: 76 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,6 +1754,56 @@ def stop_gracefully(self) -> None:
17541754
f"(event-loop not yet started)."
17551755
)
17561756

1757+
async def _async_stop_immediately(self) -> None:
1758+
r"""Force-stop the workforce without waiting for in-flight tasks."""
1759+
self._stop_requested = True
1760+
self._pause_event.set()
1761+
logger.warning(f"Workforce {self.node_id} force stop requested.")
1762+
1763+
# Remove pending tasks and clear channel postings to avoid new work
1764+
self._pending_tasks.clear()
1765+
if self._channel:
1766+
try:
1767+
in_flight = await self._channel.get_in_flight_tasks(
1768+
self.node_id
1769+
)
1770+
for task in in_flight:
1771+
await self._channel.remove_task(task.id)
1772+
except Exception as e:
1773+
logger.error(
1774+
f"Failed to clear in-flight tasks during force stop: {e}",
1775+
exc_info=True,
1776+
)
1777+
self._in_flight_tasks = 0
1778+
1779+
# Stop child nodes and cancel their listening tasks immediately
1780+
for child in self._children:
1781+
if child._running:
1782+
child.stop()
1783+
for task in self._child_listening_tasks:
1784+
if not task.done():
1785+
task.cancel()
1786+
1787+
self._running = False
1788+
self._state = WorkforceState.STOPPED
1789+
1790+
def stop_immediately(self) -> None:
1791+
r"""Force-stop without waiting for current tasks to finish."""
1792+
if self._loop and not self._loop.is_closed():
1793+
self._submit_coro_to_loop(self._async_stop_immediately())
1794+
else:
1795+
# No running loop; best-effort synchronous cleanup
1796+
self._stop_requested = True
1797+
self._pause_event.set()
1798+
self._pending_tasks.clear()
1799+
self._in_flight_tasks = 0
1800+
self._running = False
1801+
self._state = WorkforceState.STOPPED
1802+
logger.warning(
1803+
f"Workforce {self.node_id} force stopped "
1804+
f"(event-loop not yet started)."
1805+
)
1806+
17571807
async def _async_skip_gracefully(self) -> None:
17581808
r"""Async implementation of skip_gracefully to run on the event
17591809
loop.
@@ -4990,54 +5040,34 @@ async def start(self) -> None:
49905040

49915041
@check_if_running(True)
49925042
def stop(self) -> None:
4993-
r"""Stop all the child nodes under it. The node itself will be stopped
4994-
by its parent node.
4995-
"""
4996-
# Stop all child nodes first
4997-
for child in self._children:
4998-
if child._running:
4999-
child.stop()
5043+
r"""Forcefully stop the workforce and its children immediately.
50005044
5001-
# Cancel child listening tasks
5002-
if self._child_listening_tasks:
5003-
try:
5004-
loop = asyncio.get_running_loop()
5005-
if loop and not loop.is_closed():
5006-
# Create graceful cleanup task
5007-
async def cleanup():
5008-
await asyncio.sleep(0.1) # Brief grace period
5009-
for task in self._child_listening_tasks:
5010-
if not task.done():
5011-
task.cancel()
5012-
5013-
# Handle both asyncio.Task and concurrent.futures.
5014-
# Future
5015-
awaitables = []
5016-
for task in self._child_listening_tasks:
5017-
if isinstance(task, concurrent.futures.Future):
5018-
# Convert Future to awaitable
5019-
awaitables.append(asyncio.wrap_future(task))
5020-
else:
5021-
# Already an asyncio.Task
5022-
awaitables.append(task)
5023-
5024-
await asyncio.gather(
5025-
*awaitables,
5026-
return_exceptions=True,
5027-
)
5028-
5029-
self._cleanup_task = loop.create_task(cleanup())
5030-
else:
5031-
# No active loop, cancel immediately
5032-
for task in self._child_listening_tasks:
5033-
task.cancel()
5034-
except (RuntimeError, Exception) as e:
5035-
# Fallback: cancel immediately
5036-
logger.debug(f"Exception during task cleanup: {e}")
5037-
for task in self._child_listening_tasks:
5045+
This is now an immediate stop (was previously a graceful lifecycle
5046+
cleanup). It cancels child listeners, clears pending/in-flight tasks,
5047+
and sets state to STOPPED without waiting for active work to finish.
5048+
"""
5049+
if self._loop and not self._loop.is_closed():
5050+
self._submit_coro_to_loop(self._async_stop_immediately())
5051+
else:
5052+
# No running loop; perform synchronous best-effort force stop
5053+
self._stop_requested = True
5054+
self._pause_event.set()
5055+
self._pending_tasks.clear()
5056+
self._in_flight_tasks = 0
5057+
# Stop children
5058+
for child in self._children:
5059+
if child._running:
5060+
child.stop()
5061+
# Cancel listening tasks
5062+
for task in self._child_listening_tasks:
5063+
if not task.done():
50385064
task.cancel()
5039-
5040-
self._running = False
5065+
self._running = False
5066+
self._state = WorkforceState.STOPPED
5067+
logger.warning(
5068+
f"Workforce {self.node_id} force stopped "
5069+
f"(event-loop not yet started)."
5070+
)
50415071

50425072
def clone(self, with_memory: bool = False) -> 'Workforce':
50435073
r"""Creates a new instance of Workforce with the same configuration.

0 commit comments

Comments
 (0)