-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathinterpreter.py
More file actions
365 lines (294 loc) · 9.72 KB
/
interpreter.py
File metadata and controls
365 lines (294 loc) · 9.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable
from typing import Optional
from typing_extensions import Self
from ghoshell_moss.core.concepts.command import CommandTask, CommandToken
from ghoshell_moss.message import Message
from .channel import ChannelMeta
__all__ = [
"CommandTaskCallback",
"CommandTaskParseError",
"CommandTaskParserElement",
"CommandTokenCallback",
"CommandTokenParser",
"Interpreter",
]
CommandTokenCallback = Callable[[CommandToken | None], None]
CommandTaskCallback = Callable[[CommandTask | None], None]
class CommandTaskParseError(Exception):
pass
class CommandTokenParser(ABC):
"""
parse from string stream into command tokens
"""
@abstractmethod
def with_callback(self, *callbacks: CommandTokenCallback) -> None:
"""
send command token to callback method
"""
pass
@abstractmethod
def is_done(self) -> bool:
"""weather this parser is done parsing."""
pass
@abstractmethod
def start(self) -> None:
"""start this parser"""
pass
@abstractmethod
def feed(self, delta: str) -> None:
"""feed this parser with the stream delta"""
pass
@abstractmethod
def commit(self) -> None:
"""notify the parser that the stream is done"""
pass
@abstractmethod
def close(self) -> None:
"""
stop the parser and clear the resources.
"""
pass
@abstractmethod
def buffer(self) -> str:
"""
return the buffered stream content
"""
pass
@abstractmethod
def parsed(self) -> Iterable[CommandToken]:
"""返回已经生成的 command token"""
pass
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
example for how to use parser manually
"""
if exc_val is None:
# ending is needed if parse success
self.commit()
self.close()
class CommandTaskParserElement(ABC):
"""
CommandTaskElement works like AST but in realtime.
It accepts command token from a stream, and generate command task concurrently.
The keypoint is, the command tokens are organized in the recursive pattern,
that one command can embrace many children command within it, and handle them by its own means,
just like a function call other functions inside it.
So we need an Element Tree to parse the tokens into command tasks, and send the tasks immediately
"""
depth: int
current: Optional[CommandTask] = None
"""the current command task of this element, created by `start` type command token"""
children: dict[str, "CommandTaskParserElement"]
"""the children element of this element"""
@abstractmethod
def with_callback(self, callback: CommandTaskCallback) -> None:
"""设置一个 callback, 替换默认的 callback. 通常不需要使用."""
pass
@abstractmethod
def on_token(self, token: CommandToken | None) -> None:
"""
接受一个 command token
:param token: 如果为 None, 表示 command token 流已经结束.
"""
pass
@abstractmethod
def is_end(self) -> bool:
"""是否解析已经完成了."""
pass
@abstractmethod
def destroy(self) -> None:
"""手动清理数据结构, 加快垃圾回收, 避免内存泄漏"""
pass
class Interpreter(ABC):
"""
命令解释器, 从一个文本流中解析 command token.
同时将流式的 command token 解析为流式的 command task, 然后回调给执行器.
The Command Interpreter that parse the LLM-generated streaming tokens into Command Tokens,
and send the compiled command tasks into the shell executor.
Consider it a one-time command parser + command executor
"""
id: str
"""each time stream interpretation has a unique id"""
@abstractmethod
def meta_system_prompt(self) -> str:
"""
给大模型使用 MOSS 的元规则. interpreter 可以定义不同的规则.
"""
pass
@abstractmethod
def channels(self) -> dict[str, ChannelMeta]:
pass
@abstractmethod
def moss_instruction(self) -> str:
"""
当前 interpreter 状态下, moss 的完整使用提示. 用于呈现给大模型.
"""
pass
@abstractmethod
def context_messages(self, *, channel_names: list[str] | None = None) -> list[Message]:
"""
返回 interpreter 的关联上下文.
"""
pass
@abstractmethod
def feed(self, delta: str) -> None:
"""
向 interpreter 提交文本片段, 会自动触发其它流程.
example:
async with interpreter:
async for item in async_iterable_texts:
interpreter.feed(item)
"""
pass
@abstractmethod
def commit(self) -> None:
"""
commit the inputs
"""
pass
@abstractmethod
def with_callback(self, *callbacks: CommandTaskCallback) -> None:
pass
@abstractmethod
def parser(self) -> CommandTokenParser:
"""
interpreter 持有的 Token 解析器. 将文本输入解析成 command token, 同时将 command token 解析成 command task.
example:
with interpreter.parser() as parser:
async for item in async_iterable_texts:
paser.feed(item)
注意 Parser 是同步阻塞的, 因此正确的做法是使用 interpreter 自带的 feed 函数实现非阻塞.
通常 parser 运行在独立的线程池中.
"""
pass
@abstractmethod
def root_task_element(self) -> CommandTaskParserElement:
"""
当前 Interpreter 做树形 Command Token 解析时使用的 Element 对象. debug 用.
通常运行在独立的线程池中.
"""
pass
@abstractmethod
def parsed_tokens(self) -> Iterable[CommandToken]:
"""
已经解析生成的 tokens.
"""
pass
@abstractmethod
def parsed_tasks(self) -> dict[str, CommandTask]:
"""
已经解析生成的 tasks.
"""
pass
@abstractmethod
def outputted(self) -> Iterable[str]:
"""已经对外输出的文本内容."""
pass
@abstractmethod
async def results(self) -> dict[str, str]:
"""
将所有已经执行完的 task 的 result 作为有序的字符串字典输出
知道第一个运行失败的.
其中返回值为 None 或空字符串的不会展示.
todo: 这是一个 alpha 版为了方便快速实现 react 做的临时机制. 不是正式机制.
:return: key is the task name and attrs, value is the result or error of the command
if command task return None, ignore the result of it.
"""
pass
@abstractmethod
def executed(self) -> list[CommandTask]:
"""
返回已经被执行的 tasks.
"""
pass
def executed_tokens(self) -> str:
"""
返回当前已经执行完毕的 tokens.
"""
tokens = []
for task in self.executed():
tokens.append(task.tokens)
return "".join(tokens)
@abstractmethod
def inputted(self) -> str:
"""
返回已经完成输入的文本内容. 必须通过 feed 输入.
"""
pass
@abstractmethod
async def start(self) -> None:
"""
启动解释过程.
start the interpretation, allowed to push the tokens.
"""
pass
@abstractmethod
async def stop(self) -> None:
"""
中断解释过程. 有可能由其它的并行任务来触发, 触发后 feed 不会抛出异常.
stop the interpretation and cancel all the running tasks.
"""
pass
@abstractmethod
def is_stopped(self) -> bool:
"""
判断解释过程是否还在执行中.
"""
pass
@abstractmethod
def is_running(self) -> bool:
"""
是否正在运行中: start -> end 中间.
"""
pass
@abstractmethod
def is_interrupted(self) -> bool:
"""
解释过程是否被中断.
"""
pass
async def __aenter__(self) -> Self:
"""
example to use the interpreter:
async with interpreter as itp:
# the interpreter started
async for item in async_iterable_texts:
# 判断是否被中断. 如果被中断可以 break.
if not itp.is_stopped():
itp.feed(item)
await itp.wait_until_done()
result = itp.results()
"""
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
@abstractmethod
async def wait_parse_done(self, timeout: float | None = None) -> None:
"""
等待解释过程完成. 完成有两种情况:
1. 输入已经完备.
2. 被中断.
wait until the interpretation of command tasks are done (finish, failed or cancelled).
:return: True if the interpretation is fully finished.
"""
pass
@abstractmethod
async def wait_execution_done(
self, timeout: float | None = None, *, throw: bool = False, cancel_on_exception: bool = True
) -> dict[str, CommandTask]:
"""
等待所有的 task 被执行完毕.
如果这些 task 没有被任何方式执行, 将会导致持续的阻塞.
"""
pass
@abstractmethod
def __del__(self) -> None:
"""
为了防止内存泄漏, 增加一个手动清空的方法.
"""
pass