Skip to content

Commit da23060

Browse files
authored
chore: update_workforce_stop_logic (#3447)
1 parent d873494 commit da23060

File tree

2 files changed

+139
-58
lines changed

2 files changed

+139
-58
lines changed

camel/societies/workforce/worker.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
) -> None:
4646
super().__init__(description, node_id=node_id)
4747
self._active_task_ids: Set[str] = set()
48+
self._running_tasks: Set[asyncio.Task] = set()
4849

4950
def __repr__(self):
5051
return f"Worker node {self.node_id} ({self.description})"
@@ -113,15 +114,12 @@ async def _listen_to_channel(self):
113114
self._running = True
114115
logger.info(f"{self} started.")
115116

116-
# Keep track of running task coroutines
117-
running_tasks: Set[asyncio.Task] = set()
118-
119117
while self._running:
120118
try:
121119
# Clean up completed tasks
122-
completed_tasks = [t for t in running_tasks if t.done()]
120+
completed_tasks = [t for t in self._running_tasks if t.done()]
123121
for completed_task in completed_tasks:
124-
running_tasks.remove(completed_task)
122+
self._running_tasks.discard(completed_task)
125123
# Check for exceptions in completed tasks
126124
try:
127125
await completed_task
@@ -138,11 +136,11 @@ async def _listen_to_channel(self):
138136
task_coroutine = asyncio.create_task(
139137
self._process_single_task(task)
140138
)
141-
running_tasks.add(task_coroutine)
139+
self._running_tasks.add(task_coroutine)
142140

143141
except asyncio.TimeoutError:
144142
# No tasks available, continue loop
145-
if not running_tasks:
143+
if not self._running_tasks:
146144
# No tasks running and none available, short sleep
147145
await asyncio.sleep(0.1)
148146
continue
@@ -155,12 +153,12 @@ async def _listen_to_channel(self):
155153
continue
156154

157155
# Wait for all remaining tasks to complete when stopping
158-
if running_tasks:
156+
if self._running_tasks:
159157
logger.info(
160-
f"{self} stopping, waiting for {len(running_tasks)} "
158+
f"{self} stopping, waiting for {len(self._running_tasks)} "
161159
f"tasks to complete..."
162160
)
163-
await asyncio.gather(*running_tasks, return_exceptions=True)
161+
await asyncio.gather(*self._running_tasks, return_exceptions=True)
164162

165163
logger.info(f"{self} stopped.")
166164

@@ -171,6 +169,20 @@ async def start(self):
171169

172170
@check_if_running(True)
173171
def stop(self):
174-
r"""Stop the worker."""
172+
r"""Forcefully stop the worker.
173+
174+
Cancels all running tasks immediately and sets the stop flag.
175+
The worker will exit after completing cancellation.
176+
"""
177+
# First cancel all running tasks to interrupt ongoing work
178+
tasks_to_cancel = list(self._running_tasks)
179+
for task in tasks_to_cancel:
180+
if not task.done():
181+
task.cancel()
182+
183+
# Clear the running tasks set since they're all cancelled
184+
self._running_tasks.clear()
185+
186+
# Set stop flag to exit the listen loop
175187
self._running = False
176188
return

camel/societies/workforce/workforce.py

Lines changed: 116 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,95 @@ def stop_gracefully(self) -> None:
18021802
f"(event-loop not yet started)."
18031803
)
18041804

1805+
async def _async_stop_immediately(self) -> None:
1806+
r"""Force-stop the workforce without waiting for in-flight tasks.
1807+
1808+
Note: This method does not wait for child nodes to fully stop.
1809+
Child nodes will receive stop signals but may still be cleaning up
1810+
when this method returns.
1811+
"""
1812+
# Guard against redundant calls
1813+
if not self._running and self._state == WorkforceState.STOPPED:
1814+
logger.debug(
1815+
f"Workforce {self.node_id} already stopped, skipping."
1816+
)
1817+
return
1818+
1819+
self._stop_requested = True
1820+
self._pause_event.set()
1821+
logger.info(f"Workforce {self.node_id} force stop requested.")
1822+
1823+
# Remove pending tasks and clear channel postings to avoid new work
1824+
self._pending_tasks.clear()
1825+
1826+
# Try to clear in-flight tasks from channel
1827+
if self._channel:
1828+
try:
1829+
in_flight = await self._channel.get_in_flight_tasks(
1830+
self.node_id
1831+
)
1832+
for task in in_flight:
1833+
await self._channel.remove_task(task.id)
1834+
# Only reset counter if channel cleanup succeeded
1835+
self._in_flight_tasks = 0
1836+
except Exception as e:
1837+
logger.error(
1838+
f"Failed to clear in-flight tasks during force stop: {e}",
1839+
exc_info=True,
1840+
)
1841+
# Still reset counter for consistency
1842+
self._in_flight_tasks = 0
1843+
1844+
# Stop child nodes and cancel their listening tasks immediately
1845+
for child in self._children:
1846+
if child._running:
1847+
child.stop()
1848+
for listening_task in self._child_listening_tasks:
1849+
if not listening_task.done():
1850+
listening_task.cancel()
1851+
1852+
self._running = False
1853+
self._state = WorkforceState.STOPPED
1854+
1855+
def stop_immediately(self) -> None:
1856+
r"""Force-stop without waiting for current tasks to finish.
1857+
1858+
Note: This method does not wait for child nodes to fully stop.
1859+
Child nodes will receive stop signals but may still be cleaning up
1860+
when this method returns.
1861+
"""
1862+
# Guard against redundant calls
1863+
if not self._running and self._state == WorkforceState.STOPPED:
1864+
logger.debug(
1865+
f"Workforce {self.node_id} already stopped, skipping."
1866+
)
1867+
return
1868+
1869+
if self._loop and not self._loop.is_closed():
1870+
self._submit_coro_to_loop(self._async_stop_immediately())
1871+
else:
1872+
# No running loop; best-effort synchronous cleanup
1873+
self._stop_requested = True
1874+
self._pause_event.set()
1875+
self._pending_tasks.clear()
1876+
self._in_flight_tasks = 0
1877+
1878+
# Stop children
1879+
for child in self._children:
1880+
if child._running:
1881+
child.stop()
1882+
# Cancel listening tasks
1883+
for task in self._child_listening_tasks:
1884+
if not task.done():
1885+
task.cancel()
1886+
1887+
self._running = False
1888+
self._state = WorkforceState.STOPPED
1889+
logger.info(
1890+
f"Workforce {self.node_id} force stopped "
1891+
f"(event-loop not yet started)."
1892+
)
1893+
18051894
async def _async_skip_gracefully(self) -> None:
18061895
r"""Async implementation of skip_gracefully to run on the event
18071896
loop.
@@ -5059,54 +5148,34 @@ async def start(self) -> None:
50595148

50605149
@check_if_running(True)
50615150
def stop(self) -> None:
5062-
r"""Stop all the child nodes under it. The node itself will be stopped
5063-
by its parent node.
5064-
"""
5065-
# Stop all child nodes first
5066-
for child in self._children:
5067-
if child._running:
5068-
child.stop()
5151+
r"""Forcefully stop the workforce and its children immediately.
50695152
5070-
# Cancel child listening tasks
5071-
if self._child_listening_tasks:
5072-
try:
5073-
loop = asyncio.get_running_loop()
5074-
if loop and not loop.is_closed():
5075-
# Create graceful cleanup task
5076-
async def cleanup():
5077-
await asyncio.sleep(0.1) # Brief grace period
5078-
for task in self._child_listening_tasks:
5079-
if not task.done():
5080-
task.cancel()
5081-
5082-
# Handle both asyncio.Task and concurrent.futures.
5083-
# Future
5084-
awaitables = []
5085-
for task in self._child_listening_tasks:
5086-
if isinstance(task, concurrent.futures.Future):
5087-
# Convert Future to awaitable
5088-
awaitables.append(asyncio.wrap_future(task))
5089-
else:
5090-
# Already an asyncio.Task
5091-
awaitables.append(task)
5092-
5093-
await asyncio.gather(
5094-
*awaitables,
5095-
return_exceptions=True,
5096-
)
5097-
5098-
self._cleanup_task = loop.create_task(cleanup())
5099-
else:
5100-
# No active loop, cancel immediately
5101-
for task in self._child_listening_tasks:
5102-
task.cancel()
5103-
except (RuntimeError, Exception) as e:
5104-
# Fallback: cancel immediately
5105-
logger.debug(f"Exception during task cleanup: {e}")
5106-
for task in self._child_listening_tasks:
5107-
task.cancel()
5108-
5109-
self._running = False
5153+
This is now an immediate stop (was previously a graceful lifecycle
5154+
cleanup). It cancels child listeners, clears pending/in-flight tasks,
5155+
and sets state to STOPPED without waiting for active work to finish.
5156+
"""
5157+
if self._loop and not self._loop.is_closed():
5158+
self._submit_coro_to_loop(self._async_stop_immediately())
5159+
else:
5160+
# No running loop; perform synchronous best-effort force stop
5161+
self._stop_requested = True
5162+
self._pause_event.set()
5163+
self._pending_tasks.clear()
5164+
self._in_flight_tasks = 0
5165+
# Stop children
5166+
for child in self._children:
5167+
if child._running:
5168+
child.stop()
5169+
# Cancel listening tasks
5170+
for listening_task in self._child_listening_tasks:
5171+
if not listening_task.done():
5172+
listening_task.cancel()
5173+
self._running = False
5174+
self._state = WorkforceState.STOPPED
5175+
logger.info(
5176+
f"Workforce {self.node_id} force stopped "
5177+
f"(event-loop not yet started)."
5178+
)
51105179

51115180
def clone(self, with_memory: bool = False) -> 'Workforce':
51125181
r"""Creates a new instance of Workforce with the same configuration.

0 commit comments

Comments
 (0)