Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/praisonai/praisonai/cli/commands/standardise.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def _run_ai(parsed) -> int:
acp=False # ACP not needed for generation
)
# Start runtime in background
asyncio.get_event_loop().run_until_complete(runtime.start())
asyncio.run(runtime.start())
if runtime.lsp_ready:
print("🔧 LSP server ready for code intelligence")
except Exception:
Expand Down Expand Up @@ -488,7 +488,7 @@ def _run_ai(parsed) -> int:
# Cleanup runtime
if runtime:
try:
asyncio.get_event_loop().run_until_complete(runtime.stop())
asyncio.run(runtime.stop())
except Exception:
pass

Expand Down
15 changes: 3 additions & 12 deletions src/praisonai/praisonai/jobs/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,7 @@ async def _run_recipe(self, job: Job) -> Any:
await self._notify_progress(job)

# Execute the recipe
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: execute_resolved_recipe(resolved)
)
result = await asyncio.to_thread(execute_resolved_recipe, resolved)

# Update progress
job.update_progress(percentage=90.0, step="Finalizing")
Expand Down Expand Up @@ -323,11 +319,7 @@ async def _run_praisonai_agents(self, job: Job, agent_file: str) -> Any:
await self._notify_progress(job)

# Run the agent
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: agent.start(job.prompt)
)
result = await asyncio.to_thread(agent.start, job.prompt)

# Update progress
job.update_progress(percentage=90.0, step="Finalizing")
Expand Down Expand Up @@ -360,8 +352,7 @@ async def _run_legacy_praisonai(self, job: Job, agent_file: str, framework: str)
await self._notify_progress(job)

# Run in executor (blocking call)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, praisonai.run)
result = await asyncio.to_thread(praisonai.run)

# Update progress
job.update_progress(percentage=90.0, step="Finalizing")
Expand Down
19 changes: 13 additions & 6 deletions src/praisonai/praisonai/jobs/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import logging
import os
import threading
from typing import Optional
from contextlib import asynccontextmanager

Expand All @@ -21,25 +22,31 @@
# Global instances (for single-process deployment)
_store: Optional[JobStore] = None
_executor: Optional[JobExecutor] = None
_store_lock = threading.Lock()
_executor_lock = threading.Lock()


def get_store() -> JobStore:
"""Get or create the job store."""
global _store
if _store is None:
_store = InMemoryJobStore(max_jobs=1000)
with _store_lock:
if _store is None:
_store = InMemoryJobStore(max_jobs=1000)
return _store


def get_executor() -> JobExecutor:
"""Get or create the job executor."""
global _executor
if _executor is None:
_executor = JobExecutor(
store=get_store(),
max_concurrent=int(os.environ.get("PRAISONAI_MAX_CONCURRENT_JOBS", "10")),
default_timeout=int(os.environ.get("PRAISONAI_JOB_TIMEOUT", "3600"))
)
with _executor_lock:
if _executor is None:
_executor = JobExecutor(
store=get_store(),
max_concurrent=int(os.environ.get("PRAISONAI_MAX_CONCURRENT_JOBS", "10")),
default_timeout=int(os.environ.get("PRAISONAI_JOB_TIMEOUT", "3600"))
)
return _executor


Expand Down
29 changes: 9 additions & 20 deletions src/praisonai/praisonai/sandbox/e2b.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ async def stop(self) -> None:

if self._sandbox:
try:
await asyncio.get_event_loop().run_in_executor(
None, self._sandbox.kill
)
await asyncio.to_thread(self._sandbox.kill)
except Exception as e:
logger.warning(f"Failed to kill E2B sandbox: {e}")
self._sandbox = None
Expand Down Expand Up @@ -161,9 +159,8 @@ async def _execute_python_code(
"""Execute Python code using E2B code interpreter."""
try:
# Run in executor to avoid blocking the event loop
execution = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._sandbox.run_code(code, timeout=limits.timeout_seconds)
execution = await asyncio.to_thread(
self._sandbox.run_code, code, timeout=limits.timeout_seconds
)

# Extract results from E2B execution
Expand Down Expand Up @@ -227,15 +224,13 @@ async def _execute_bash_command(
# Change directory if needed
if working_dir:
import shlex
await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._sandbox.commands.run(f"cd {shlex.quote(working_dir)}", timeout=5)
await asyncio.to_thread(
self._sandbox.commands.run, f"cd {shlex.quote(working_dir)}", timeout=5
)

# Execute the command
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._sandbox.commands.run(command, timeout=limits.timeout_seconds)
result = await asyncio.to_thread(
self._sandbox.commands.run, command, timeout=limits.timeout_seconds
)

return SandboxResult(
Expand Down Expand Up @@ -318,10 +313,7 @@ async def write_file(
if isinstance(content, bytes):
content = content.decode("utf-8")

await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._sandbox.files.write(path, content)
)
await asyncio.to_thread(self._sandbox.files.write, path, content)
return True
except Exception as e:
logger.error(f"Failed to write file {path}: {e}")
Expand All @@ -336,10 +328,7 @@ async def read_file(
await self.start()

try:
content = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._sandbox.files.read(path)
)
content = await asyncio.to_thread(self._sandbox.files.read, path)
return content
except Exception as e:
logger.warning(f"Failed to read file {path}: {e}")
Expand Down
68 changes: 33 additions & 35 deletions src/praisonai/praisonai/scheduler/async_agent_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,16 @@ class AsyncAgentScheduler:
- Cancellation support
- No global state pollution
- Native async coordination
- Timeout support (TODO: needs porting from sync version)
- Budget tracking (TODO: needs porting from sync version)
- YAML/recipe constructors (TODO: needs porting from sync version)
- Timeout support per execution
- Budget tracking and limits

Example:
scheduler = AsyncAgentScheduler(agent, task="Check news")
scheduler = AsyncAgentScheduler(
agent,
task="Check news",
timeout=30, # 30 second timeout per execution
max_cost=1.00 # $1.00 budget limit
)
await scheduler.start(schedule_expr="hourly")
await asyncio.sleep(3600) # Let it run
await scheduler.stop()
Expand All @@ -89,9 +93,8 @@ def __init__(
config: Optional[Dict[str, Any]] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
# TODO: Add these missing features from sync version:
# timeout: Optional[int] = None,
# max_cost: Optional[float] = 1.00
timeout: Optional[int] = None,
max_cost: Optional[float] = 1.00
):
"""
Initialize async agent scheduler.
Expand All @@ -102,18 +105,17 @@ def __init__(
config: Optional configuration dict
on_success: Callback function on successful execution
on_failure: Callback function on failed execution
# TODO: Add timeout and max_cost parameters
timeout: Maximum execution time per run in seconds (None = no limit)
max_cost: Maximum total cost in USD (default: $1.00 for safety)
"""
self.agent = agent
self.task = task
self.config = config or {}
self.on_success = on_success
self.on_failure = on_failure

# TODO: Add these missing features from sync version:
# self.timeout = timeout
# self.max_cost = max_cost
# self._total_cost = 0.0
self.timeout = timeout
self.max_cost = max_cost
self._total_cost = 0.0

self.is_running = False
self._task: Optional[asyncio.Task] = None
Expand Down Expand Up @@ -350,45 +352,41 @@ async def _run_schedule(self, interval: int, max_retries: int):
self.is_running = False

async def _execute_with_retry(self, max_retries: int):
"""Execute agent with retry logic.

TODO: Port missing features from sync version:
- Timeout support per execution
- Budget tracking and limits
- Daemon state updates (_update_state_if_daemon)
"""
"""Execute agent with retry logic."""
self._ensure_async_primitives() # guarantees _stats_lock is bound to current loop

async with self._stats_lock:
self._execution_count += 1

# TODO: Add budget check from sync version:
# if self.max_cost and self._total_cost >= self.max_cost:
# logger.warning(f"Budget limit reached: ${self._total_cost:.4f} >= ${self.max_cost}")
# return
# Check budget limit
if self.max_cost and self._total_cost >= self.max_cost:
logger.warning(f"Budget limit reached: ${self._total_cost:.4f} >= ${self.max_cost}")
logger.warning("Stopping scheduler to prevent additional costs")
return
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated

last_exc: Optional[Exception] = None
for attempt in range(max_retries):
try:
logger.info(f"Async attempt {attempt + 1}/{max_retries}")

# TODO: Add timeout support from sync version:
# if self.timeout:
# result = await asyncio.wait_for(
# self._executor.execute(self.task),
# timeout=self.timeout
# )
# else:
result = await self._executor.execute(self.task)
# Execute with timeout if specified
if self.timeout:
result = await asyncio.wait_for(
self._executor.execute(self.task),
timeout=self.timeout
)
else:
result = await self._executor.execute(self.task)

logger.info(f"Async agent execution successful on attempt {attempt + 1}")
logger.info(f"Result: {result}")

# Estimate cost (rough: ~$0.0001 per execution for gpt-4o-mini)
estimated_cost = 0.0001 # Base cost estimate

async with self._stats_lock:
self._success_count += 1
# TODO: Add cost tracking from sync version:
# estimated_cost = self._estimate_cost(result)
# self._total_cost += estimated_cost
self._total_cost += estimated_cost
Comment thread
greptile-apps[bot] marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Budget exhaustion does not actually stop the scheduler.

This branch logs that it is stopping, but it only returns from _execute_with_retry(). _run_schedule() will keep waking up and calling it forever. On top of that, _total_cost is only updated after successful runs, so repeated failures/timeouts are effectively free and can overshoot max_cost. Set the stop event when the budget is exhausted and charge each attempt before deciding whether another retry is allowed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/scheduler/async_agent_scheduler.py` around lines 361
- 390, The budget-check branch in _execute_with_retry currently only returns, so
_run_schedule keeps calling it and failures don't increment _total_cost; change
it to set the scheduler stop event (e.g., self._stop_event.set()) when budget is
exhausted and return so the scheduler halts; also deduct/charge an
estimated_cost (or per-attempt charge) before each attempt (inside
_execute_with_retry, before retries loop or at start of each attempt) while
holding self._stats_lock so _total_cost is updated even for failed/timeouts and
prevent starting an attempt that would exceed self.max_cost by checking
self._total_cost + estimated_cost >= self.max_cost and setting self._stop_event
if exceeded.

safe_call(self.on_success, result)
# TODO: Add daemon state update from sync version:
Expand Down
Loading