Skip to content

Commit 51be9f1

Browse files
author
alcholiclg
committed
test and fix problems
1 parent 2e2c73f commit 51be9f1

4 files changed

Lines changed: 124 additions & 29 deletions

File tree

ms_agent/cron/executor.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,18 @@ def __init__(
4141
):
4242
self._default_timeout = default_timeout
4343
self._semaphore = semaphore or asyncio.Semaphore(5)
44-
self._per_job_locks: dict[str, asyncio.Lock] = {}
44+
self._per_job_semas: dict[str, asyncio.Semaphore] = {}
4545
self._active_agents: dict[str, Any] = {}
4646
self._output_dir = output_dir
4747
self._session_dir = session_dir
4848

4949
async def execute(self, job: CronJobSpec, config: Any) -> ExecutionResult:
5050
async with self._semaphore:
51-
lock = self._per_job_locks.setdefault(job.id, asyncio.Lock())
52-
async with lock:
51+
job_sema = self._per_job_semas.get(job.id)
52+
if job_sema is None:
53+
job_sema = asyncio.Semaphore(max(1, job.concurrency))
54+
self._per_job_semas[job.id] = job_sema
55+
async with job_sema:
5356
return await self._do_execute(job, config)
5457

5558
async def _do_execute(self, job: CronJobSpec, config: Any) -> ExecutionResult:
@@ -108,7 +111,7 @@ async def _do_execute(self, job: CronJobSpec, config: Any) -> ExecutionResult:
108111

109112
def _build_engine(self, job: CronJobSpec, config: Any) -> Any:
110113
"""Build an agent or workflow engine based on job spec."""
111-
from ms_agent.config import Config
114+
load_cache = getattr(config, 'load_cache', False)
112115

113116
if job.workflow:
114117
from ms_agent.workflow.loader import WorkflowLoader
@@ -123,12 +126,14 @@ def _build_engine(self, job: CronJobSpec, config: Any) -> Any:
123126
config_dir_or_id=job.project,
124127
config=config,
125128
trust_remote_code=job.trust_remote_code,
129+
load_cache=load_cache,
126130
)
127131
else:
128132
from ms_agent.agent.loader import AgentLoader
129133
return AgentLoader.build(
130134
config=config,
131135
trust_remote_code=job.trust_remote_code,
136+
load_cache=load_cache,
132137
)
133138

134139
def _extract_output(self, messages: Any) -> str:

ms_agent/cron/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ async def _on_tick(self) -> None:
108108
for job, state in jobs_and_states:
109109
if not job.enabled:
110110
continue
111-
if state.status == 'paused':
111+
if state.status in ('paused', 'running', 'completed'):
112112
continue
113113
next_ms = _iso_to_ms(state.next_run_at)
114114
if next_ms is not None and next_ms <= now:

ms_agent/cron/service.py

Lines changed: 99 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ def __init__(self, workspace: Union[str, Path, None] = None):
103103
tick_interval=DEFAULT_TICK_INTERVAL,
104104
)
105105
self._running = False
106+
self._background_tasks: set[asyncio.Task] = set()
106107

107108
self.on_job_complete: List[Callable[[CronJobSpec, ExecutionResult], Awaitable[None]]] = []
108109
self.on_job_start: List[Callable[[CronJobSpec], Awaitable[None]]] = []
@@ -119,31 +120,69 @@ def manager(self) -> JobManager:
119120

120121
async def start(self) -> None:
121122
self._running = True
123+
self._manager.repo.import_declarative()
122124
self._pid_manager.write_pid()
123125
await self._scheduler.start()
124126

125-
async def stop(self) -> None:
127+
async def stop(self, force: bool = False, timeout: float = 30) -> None:
128+
"""Stop the cron service.
129+
130+
Args:
131+
force: If True, cancel all in-flight jobs immediately.
132+
If False, wait up to `timeout` seconds for them to finish,
133+
then cancel any that remain.
134+
timeout: Seconds to wait for graceful drain (ignored if force=True).
135+
"""
126136
self._running = False
127137
self._scheduler.stop()
138+
139+
if self._background_tasks:
140+
if force:
141+
for t in self._background_tasks:
142+
t.cancel()
143+
done, pending = await asyncio.wait(
144+
self._background_tasks,
145+
timeout=0 if force else timeout,
146+
)
147+
for t in pending:
148+
t.cancel()
149+
if pending:
150+
await asyncio.gather(*pending, return_exceptions=True)
151+
128152
self._pid_manager.remove_pid()
129153

130154
def is_running(self) -> bool:
131155
return self._running
132156

133157
async def run_forever(self) -> None:
134-
"""Run the scheduler loop until interrupted."""
158+
"""Run the scheduler loop until interrupted.
159+
160+
SIGTERM → graceful stop (wait up to 30s for in-flight jobs).
161+
SIGINT → force stop (cancel all immediately).
162+
"""
135163
await self.start()
136164
stop_event = asyncio.Event()
165+
self._force_stop = False
166+
167+
def _graceful():
168+
stop_event.set()
169+
170+
def _force():
171+
self._force_stop = True
172+
stop_event.set()
137173

138174
loop = asyncio.get_event_loop()
139-
for sig in (signal.SIGINT, signal.SIGTERM):
140-
try:
141-
loop.add_signal_handler(sig, stop_event.set)
142-
except (NotImplementedError, RuntimeError):
143-
pass
175+
try:
176+
loop.add_signal_handler(signal.SIGTERM, _graceful)
177+
except (NotImplementedError, RuntimeError):
178+
pass
179+
try:
180+
loop.add_signal_handler(signal.SIGINT, _force)
181+
except (NotImplementedError, RuntimeError):
182+
pass
144183

145184
await stop_event.wait()
146-
await self.stop()
185+
await self.stop(force=self._force_stop)
147186

148187
# === Job CRUD (delegates to manager) ===
149188

@@ -210,9 +249,18 @@ def stop_daemon(self) -> bool:
210249
# === Scheduler Callbacks ===
211250

212251
async def _on_due_jobs(self, due: List[Tuple[CronJobSpec, CronJobState]]) -> None:
213-
"""Called by scheduler when jobs are due."""
214-
tasks = [self._execute_job(job, state) for job, state in due]
215-
await asyncio.gather(*tasks, return_exceptions=True)
252+
"""Called by scheduler when jobs are due.
253+
254+
Fire-and-forget: spawn tasks but do NOT await them, so the scheduler
255+
tick loop can re-arm immediately and pick up newly due jobs.
256+
"""
257+
for job, state in due:
258+
task = asyncio.create_task(
259+
self._execute_job(job, state),
260+
name=f'cron-{job.id}',
261+
)
262+
self._background_tasks.add(task)
263+
task.add_done_callback(self._background_tasks.discard)
216264

217265
async def _execute_job(self, job: CronJobSpec, state: CronJobState) -> None:
218266
self._manager.mark_running(job.id)
@@ -225,6 +273,13 @@ async def _execute_job(self, job: CronJobSpec, state: CronJobState) -> None:
225273

226274
config = self._build_config(job)
227275
result = await self._executor.execute(job, config)
276+
277+
retries_left = job.max_retries
278+
while not result.success and retries_left > 0:
279+
retries_left -= 1
280+
config = self._build_config(job)
281+
result = await self._executor.execute(job, config)
282+
228283
self._manager.record_result(job, result)
229284

230285
for cb in self.on_job_complete:
@@ -247,11 +302,13 @@ async def _execute_job(self, job: CronJobSpec, state: CronJobState) -> None:
247302
def _build_config(self, job: CronJobSpec) -> Any:
248303
"""Build DictConfig for agent execution.
249304
250-
Cron jobs run non-interactively, so we force:
251-
- stream=False (executor expects List[Message], not AsyncGenerator)
252-
- callbacks=[] (no input_callback; stdin is unavailable)
253-
- max_chat_round capped (allows tool-calling rounds, prevents runaway)
254-
- output/session paths under the cron workspace
305+
Config inheritance chain (later overrides earlier):
306+
1. Project config (from job.project via Config.from_task)
307+
2. Job-level overrides (from job.overrides dict)
308+
3. Cron-mandatory overrides (stream=False, no interactive callbacks)
309+
310+
The project's max_chat_round is respected; a default of 50 is used
311+
only when no project config sets it.
255312
"""
256313
from omegaconf import OmegaConf
257314

@@ -268,10 +325,20 @@ def _build_config(self, job: CronJobSpec) -> Any:
268325
config = OmegaConf.merge(config, OmegaConf.create(job.overrides))
269326

270327
OmegaConf.update(config, 'generation_config.stream', False, merge=True)
271-
OmegaConf.update(config, 'callbacks', [], merge=False)
272-
current_rounds = getattr(config, 'max_chat_round', None)
273-
if current_rounds is None or current_rounds > 10:
274-
OmegaConf.update(config, 'max_chat_round', 10, merge=True)
328+
329+
existing_cbs = getattr(config, 'callbacks', None)
330+
if existing_cbs:
331+
safe_cbs = [
332+
cb for cb in existing_cbs
333+
if cb != 'input_callback' and not str(cb).endswith('input_callback')
334+
]
335+
OmegaConf.update(config, 'callbacks', safe_cbs, merge=False)
336+
else:
337+
OmegaConf.update(config, 'callbacks', [], merge=False)
338+
339+
if getattr(config, 'max_chat_round', None) is None:
340+
OmegaConf.update(config, 'max_chat_round', 50, merge=True)
341+
275342
OmegaConf.update(
276343
config, 'session_log.dir',
277344
str(self._workspace / 'sessions'), merge=True
@@ -281,6 +348,11 @@ def _build_config(self, job: CronJobSpec) -> Any:
281348
str(self._workspace / 'output' / job.id), merge=True
282349
)
283350

351+
if job.session_mode == 'persistent':
352+
OmegaConf.update(config, 'load_cache', True, merge=True)
353+
OmegaConf.update(config, 'save_history', True, merge=True)
354+
OmegaConf.update(config, 'tag', f'cron-{job.id}', merge=True)
355+
284356
return config
285357

286358
# === Manual Tick ===
@@ -305,6 +377,13 @@ async def run_job_now(self, job_id: str) -> Optional[ExecutionResult]:
305377

306378
config = self._build_config(job)
307379
result = await self._executor.execute(job, config)
380+
381+
retries_left = job.max_retries
382+
while not result.success and retries_left > 0:
383+
retries_left -= 1
384+
config = self._build_config(job)
385+
result = await self._executor.execute(job, config)
386+
308387
self._manager.record_result(job, result)
309388

310389
for cb in self.on_job_complete:

ms_agent/tools/cron_tool.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
"""CronTool: agent-facing tool for managing cron jobs."""
22
from __future__ import annotations
33

4+
import asyncio
45
import json
56
import os
67
from pathlib import Path
78
from typing import Any, Dict, List
89

910
from ms_agent.cron.executor import is_in_cron_context
10-
from ms_agent.cron.manager import JobManager
1111
from ms_agent.llm.utils import Tool
1212
from ms_agent.tools.base import ToolBase
1313

@@ -32,7 +32,9 @@ def __init__(self, config, **kwargs):
3232
'MS_AGENT_CRON_WORKSPACE',
3333
os.path.expanduser('~/.ms_agent/cron'),
3434
)
35-
self._manager = JobManager(Path(workspace))
35+
from ms_agent.cron.service import CronService
36+
self._service = CronService(workspace=workspace)
37+
self._manager = self._service.manager
3638

3739
async def connect(self) -> None:
3840
pass
@@ -145,8 +147,17 @@ async def _action_run(self, args: dict) -> str:
145147
job_id = args.get('job_id')
146148
if not job_id:
147149
return _json_dumps({'error': '"job_id" is required.'})
148-
ok = self._manager.trigger_job(job_id)
149-
return _json_dumps({'status': 'triggered' if ok else 'failed', 'job_id': job_id})
150+
result = await self._service.run_job_now(job_id)
151+
if result is None:
152+
return _json_dumps({'error': f'Job {job_id} not found.'})
153+
return _json_dumps({
154+
'status': 'completed' if result.success else 'failed',
155+
'job_id': job_id,
156+
'success': result.success,
157+
'duration_ms': result.duration_ms,
158+
'output_preview': result.output[:500] if result.output else '',
159+
'error': result.error,
160+
})
150161

151162
async def _action_remove(self, args: dict) -> str:
152163
job_id = args.get('job_id')

0 commit comments

Comments
 (0)