forked from LangQi99/NeoFish
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduled_task.py
More file actions
127 lines (102 loc) · 3.68 KB
/
scheduled_task.py
File metadata and controls
127 lines (102 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
scheduled_task.py — Data classes for the NeoFish scheduled task system.
All inter-component data passing uses typed dataclasses instead of
naked dictionaries, giving IDE autocompletion, runtime type safety,
and a single source of truth for field definitions.
"""
from __future__ import annotations
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional
# ── Task Configuration (persisted to JSON) ─────────────────────────────
@dataclass
class ScheduledTask:
"""A user-created recurring task configuration.
Persisted to ``data/scheduled_tasks.json``. Created by the
``schedule_task`` agent tool and consumed by SchedulerService.
"""
task_id: str
cron_expr: str
description: str
prompt: str
source_session_id: str
source_chat_id: str
source_platform: str
debug: bool = False
enabled: bool = True
created_at: float = field(default_factory=time.time)
last_run_at: Optional[float] = None
last_status: Optional[str] = None # "success" | "failed" | "error"
@classmethod
def new(
cls,
cron_expr: str,
description: str,
prompt: str,
source_session_id: str,
source_chat_id: str,
source_platform: str,
debug: bool = False,
) -> "ScheduledTask":
return cls(
task_id=str(uuid.uuid4()),
cron_expr=cron_expr,
description=description,
prompt=prompt,
source_session_id=source_session_id,
source_chat_id=source_chat_id,
source_platform=source_platform,
debug=debug,
)
# ── Queue Message (SchedulerService → BotSession) ──────────────────────
@dataclass
class TaskTrigger:
"""A triggered task injected into the BotSession queue.
This replaces the ad-hoc dict that was previously constructed in
``SchedulerService._trigger()``. Every field is mandatory so there
are no ``.get()`` calls with fallback defaults on the receiving side.
"""
task_id: str
description: str
prompt: str
source_session_id: str
source_chat_id: str
source_platform: str
debug: bool = False
@classmethod
def from_task(cls, task: ScheduledTask) -> "TaskTrigger":
"""Create a trigger from a ScheduledTask configuration."""
return cls(
task_id=task.task_id,
description=task.description,
prompt=task.prompt,
source_session_id=task.source_session_id,
source_chat_id=task.source_chat_id,
source_platform=task.source_platform,
debug=task.debug,
)
# ── Execution Result (BotSession → callback) ───────────────────────────
@dataclass
class BotTaskResult:
"""The outcome of a single scheduled task execution."""
task_id: str
description: str
status: str # "success" | "failed" | "error"
summary: str
files: list[str] = field(default_factory=list)
error: str | None = None
@property
def is_success(self) -> bool:
return self.status == "success"
# ── Source Context (platform adapter → agent tools) ────────────────────
@dataclass
class SourceMeta:
"""Identifies the originating conversation for tool context.
Passed through ``run_agent_loop`` → ``_create_tool_registry`` so
that ``schedule_task`` / ``list_scheduled_tasks`` /
``cancel_scheduled_task`` handlers know which session owns the task.
"""
session_id: str
chat_id: str
platform: str