Skip to content

Commit 2fd826d

Browse files
authored
Merge pull request #852 from trueagi-io/event_agent
agent with events
2 parents 7a2c651 + 959e27d commit 2fd826d

File tree

13 files changed

+428
-159
lines changed

13 files changed

+428
-159
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from .agent_base import AgentObject
1+
from .agent_base import AgentObject, EventAgent
22
from .agent_base import agent_atoms
3-
from .agent_base import BaseListeningAgent
3+
Lines changed: 157 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from hyperon import *
22
from hyperon.ext import register_atoms
3+
import queue
34

45
'''
56
This is very preliminary and incomplete PoC version.
@@ -24,6 +25,7 @@
2425

2526
import threading
2627
from queue import Queue
28+
2729
class StreamMethod(threading.Thread):
2830
def __init__(self, method, args):
2931
super().__init__() #daemon=True
@@ -32,16 +34,33 @@ def __init__(self, method, args):
3234
self.args = args
3335

3436
def run(self):
35-
for r in self.method(*self.args):
36-
self._result.put(r)
37+
# FIXME? if we raise the exception here, the thread is not stopped
38+
# but should we put Error into the result?
39+
try:
40+
for r in self.method(*self.args):
41+
self._result.put(r)
42+
except Exception as e:
43+
self._result.put(E(S('Error'), ValueAtom(self.args), ValueAtom(e)))
3744

3845
def __iter__(self):
3946
return self
4047

4148
def __next__(self):
42-
if self._result.empty() and not self.is_alive():
43-
raise StopIteration
44-
return self._result.get()
49+
try:
50+
while self.is_alive():
51+
yield self._result.get_nowait()
52+
except queue.Empty:
53+
pass
54+
raise StopIteration
55+
56+
def _try_atom2str(val):
57+
if isinstance(val, GroundedAtom):
58+
val = val.get_object().content
59+
elif isinstance(val, SymbolAtom):
60+
val = val.get_name()
61+
if val is None or isinstance(val, str):
62+
return val
63+
return repr(val)
4564

4665

4766
class AgentObject:
@@ -60,16 +79,17 @@ class AgentObject:
6079
_name = None
6180

6281
@classmethod
63-
def get_agent_atom(cls, metta, *args, unwrap=True):
82+
def get_agent_atom(cls, metta, *args, unwrap=True, **kwargs):
6483
# metta and unwrap are not passed to __init__, because
6584
# they are needed only for __metta_call__, so children
6685
# classes do not need to pass them to super().__init__
86+
# TODO: catch exceptions and turn them into error messages?
6787
if unwrap:
6888
# a hacky way to unwrap args
69-
agent_atom = OperationObject("_", cls).execute(*args)[0]
89+
agent_atom = OperationObject("_", cls).execute(*args, **kwargs)[0]
7090
agent = agent_atom.get_object().content
7191
else:
72-
agent = cls(*args)
92+
agent = cls(*args, **kwargs)
7393
if metta is not None:
7494
if hasattr(agent, '_metta') and agent._metta is not None:
7595
raise RuntimeError(f"MeTTa is already defined for {agent}")
@@ -88,13 +108,6 @@ def agent_creator_atom(cls, metta=None, unwrap=True):
88108
def name(cls):
89109
return cls._name if cls._name is not None else str(cls)
90110

91-
def _try_unwrap(self, val):
92-
if val is None or isinstance(val, str):
93-
return val
94-
if isinstance(val, GroundedAtom):
95-
return str(val.get_object().content)
96-
return repr(val)
97-
98111
def __init__(self, path=None, atoms={}, include_paths=None, code=None):
99112
if path is None and code is None:
100113
# purely Python agent
@@ -103,12 +116,12 @@ def __init__(self, path=None, atoms={}, include_paths=None, code=None):
103116
if isinstance(path, ExpressionAtom):# and path != E():
104117
code = path
105118
elif path is not None:
106-
path = self._try_unwrap(path)
119+
path = _try_atom2str(path)
107120
with open(path, mode='r') as f:
108121
code = f.read()
109122
# _code can remain None if the agent uses parent runner (when called from MeTTa)
110123
self._code = code.get_children()[1] if isinstance(code, ExpressionAtom) else \
111-
self._try_unwrap(code)
124+
_try_atom2str(code)
112125
self._atoms = atoms
113126
self._include_paths = include_paths
114127
self._context_space = None
@@ -118,7 +131,11 @@ def _create_metta(self):
118131
if self._code is None:
119132
return None
120133
self._init_metta()
121-
self._load_code() # TODO: check that the result contains only units
134+
result = self._load_code()
135+
for r in result:
136+
if r != [E()]:
137+
# TODO: catch only errors? unwrap error messages?
138+
raise RuntimeError(r)
122139

123140
def _init_metta(self):
124141
### =========== Creating MeTTa runner ===========
@@ -137,8 +154,11 @@ def _init_metta(self):
137154
self._metta = metta
138155

139156
def _load_code(self):
140-
return self._metta.run(self._code) if isinstance(self._code, str) else \
157+
if isinstance(self._code, str):
158+
return self._metta.run(self._code)
159+
else:
141160
self._metta.space().add_atom(self._code)
161+
return []
142162

143163
def __call__(self, atom):
144164
if self._unwrap or self._metta is None:
@@ -152,80 +172,156 @@ def is_daemon(self):
152172

153173
def __metta_call__(self, *args):
154174
call = True
175+
unwrap = self._unwrap
155176
method = self.__call__
156177
if len(args) > 0 and isinstance(args[0], SymbolAtom):
157178
n = args[0].get_name()
158179
if n[0] == '.' and hasattr(self, n[1:]):
159180
method = getattr(self, n[1:])
160181
args = args[1:]
161182
call = False
162-
if self._unwrap:
183+
# FIXME? Python methods called via . are supposed to be purely Python
184+
unwrap = True
185+
if unwrap:
163186
method = OperationObject(f"{method}", method).execute
164-
st = StreamMethod(method, args)
165-
st.start()
166-
# We don't return the stream here; otherwise it will be consumed immediately.
167-
# If the agent itself would be StreamMethod, its results could be accessbile.
168-
# Here, they are lost (TODO?).
169187
if call and self.is_daemon():
188+
st = StreamMethod(method, args)
189+
st.start()
190+
# We don't return the stream here; otherwise it will be consumed immediately.
191+
# If the agent itself would be StreamMethod, its results could be accessbile.
192+
# Here, they are lost (TODO?).
170193
return [E()]
171-
return st
194+
# NOTE: previously, `StreamMethod` object was created always and returned here
195+
# instead of calling `method` directly. The idea was that agents could consume
196+
# a part of the stream from other agents they are calling, but this cannot work
197+
# without support from MeTTa, because StreamMethod also calls `method` (just in
198+
# a separate thread), thus, it should be MeTTa itself, which turns `match` results
199+
# into a stream. Thus, we return the result directly now
200+
return method(*args)
172201

173-
class BaseListeningAgent(AgentObject):
174-
def __init__(self, path=None, atoms={}, include_paths=None, code=None):
202+
class EventAgent(AgentObject):
203+
204+
StopEvent = object()
205+
206+
def __init__(self, path=None, atoms={}, include_paths=None, code=None, event_bus=None):
207+
if event_bus is not None:
208+
# EventAgent is not a daemon by default: although its `event_processor` runs in a thread
209+
# and should be stopped, other non-event methods are more convenient to call directly to
210+
# get their results back in a caller
211+
# self.daemon = True
212+
atoms = {**atoms}
213+
atoms['&event_bus'] = event_bus if isinstance(event_bus, Atom) else ValueAtom(event_bus)
214+
atoms['queue-subscription'] = OperationAtom('queue-subscription', self.queue_subscription, unwrap=False)
215+
atoms['has-event-bus'] = OperationAtom('has-event-bus',
216+
lambda: self.event_bus is not None and \
217+
hasattr(self.event_bus, "create_subscription"))
218+
self.event_bus = event_bus.get_object().value if isinstance(event_bus, GroundedAtom) else event_bus
175219
super().__init__(path, atoms, include_paths, code)
176-
self.messages = Queue()
220+
# Even if there is no event bus, events can be submitted by child class methods
221+
self.events = Queue()
177222
self.running = False
178-
self._output = Queue()
179-
self.lock = threading.RLock()
180-
self.said = False
223+
self.outputs = Queue()
181224

182-
def start(self, *args):
225+
def _init_metta(self):
226+
# NOTE: atm, there is no utility for the base agent to import `agents` by default,
227+
# but event agents in metta typically need it
228+
super()._init_metta()
229+
if self._metta is not None:
230+
self._metta.run("! (import! &self agents)")
231+
232+
def start(self, *args):
233+
if self.running:
234+
raise RuntimeError("Currently, EventAgent is supposed to be running in one thread")
183235
if not args:
184236
args = ()
185237
self.running = True
186-
st = StreamMethod(self.messages_processor, args)
238+
st = StreamMethod(self.event_processor, args)
187239
st.start()
188240

189-
def message_processor(self, message, *args):
190-
yield None
241+
# TODO? it's similar to `input` in former BaseListeningAgent, but
242+
# it uses event_id and func, which might be inconvenient...
243+
# either event_id or func can be made excessive, so we could
244+
# turn this function into a queued call to an agent method...
245+
def recv_queue_event(self, event_id, func, *args):
246+
# TODO? we can keep {event_id: func} dict in self
247+
# instead of having `func` as an argument here
248+
self.events.put((event_id, func, args))
191249

192-
def handle_event(self):
193-
pass
250+
def queue_subscription(self, event_id, func):
251+
'''
252+
Subscribing `recv_queue_event` to the given event channel
253+
to put incoming events to queue processed in a separate
254+
thread instead of subscribing `func` directly
255+
'''
256+
self.event_bus.create_subscription(_try_atom2str(event_id),
257+
lambda *args: self.recv_queue_event(event_id, func, *args))
258+
return [E()]
194259

195-
def messages_processor(self, *args):
260+
def event_processor(self, *args):
261+
# `*args` received on `start`
196262
while self.running:
197-
self.handle_event()
198-
if not self.messages.empty():
199-
self.clear_output()
200-
with self.lock:
201-
m = self.messages.get()
202-
self.said = False
203-
for resp in self.message_processor(m, *args):
204-
with self.lock:
205-
self._output.put(resp)
206-
return []
263+
# TODO? func can be a Python function?
264+
(event_id, func, args) = self.events.get()
265+
if event_id is self.StopEvent:
266+
break
267+
# Wrapping into ValueAtom if arg is not an atom yet
268+
resp = self._metta.evaluate_atom(E(func,
269+
*[a if isinstance(a, Atom) else ValueAtom(a) for a in args]))
270+
# TODO? do we need `outputs` here? we may want to publish `resp` to a certain channel
271+
# or let `func` to do this direclty...
272+
# ??? self.clear_outputs()
273+
for r in resp:
274+
self.outputs.put(r)
207275

208276
def stop(self):
277+
self.events.put((self.StopEvent, None, None))
209278
self.running = False
210-
return []
211279

212-
def input(self, msg):
213-
with self.lock:
214-
self.messages.put(msg)
215-
return []
280+
# TODO? choose the model of dealing with outputs... do we need them at all?
281+
def clear_outputs(self):
282+
try:
283+
while True:
284+
self.outputs.get_nowait()
285+
except queue.Empty:
286+
pass
216287

217288
def get_output(self):
218-
while not self._output.empty():
219-
with self.lock:
220-
self.said = True
221-
yield self._output.get()
289+
try:
290+
while True:
291+
yield self.outputs.get_nowait()
292+
except queue.Empty:
293+
pass
294+
295+
def subscribe_metta_func(metta: MeTTa, event_bus: GroundedAtom, event_id: Atom, func: Atom): #metta,
296+
event_bus = event_bus.get_object().content
297+
if not hasattr(event_bus, "create_subscription"):
298+
raise RuntimeError("Event bus should have create_subscription method")
299+
event_id = _try_atom2str(event_id)
300+
event_bus.create_subscription(event_id,
301+
lambda *args: metta.evaluate_atom(E(func, *[ValueAtom(a) for a in args])))
302+
return [E()]
222303

223-
def clear_output(self):
224-
with self.lock:
225-
self._output = Queue()
304+
# The function to be called from MeTTa
305+
def publish_event(event_bus: Atom, event_id: Atom, content: Atom):
306+
assert isinstance(event_bus, GroundedAtom), f"{event_bus} is not a grounded object"
307+
event_bus = event_bus.get_object().value
308+
event_id = _try_atom2str(event_id)
309+
# FIXME? We want to be able to pass Atoms as event content, but not
310+
# any event bus can support this... or should we always use wrappers,
311+
# which provide this support?
312+
event_bus.publish(event_id, content)
313+
return [E()]
226314

227315
@register_atoms(pass_metta=True)
228316
def agent_atoms(metta):
229317
return {
230318
r"create-agent": AgentObject.agent_creator_atom(unwrap=False),
319+
# We have to avoid generic agent_creator_atom here to pass ordered parameters
320+
r"event-agent": OperationAtom('event-agent',
321+
lambda path=None, event_bus=None: EventAgent.get_agent_atom(None, unwrap=False, path=path, event_bus=event_bus),
322+
unwrap=False),
323+
r"direct-subscription": OperationAtom('direct-subscription',
324+
lambda *args: subscribe_metta_func(metta, *args), unwrap=False),
325+
r"publish-event": OperationAtom('publish-event',
326+
publish_event, unwrap=False)
231327
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .basic_bus import event_atoms
2+
3+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from collections import defaultdict
2+
import logging
3+
from hyperon import OperationAtom
4+
from hyperon.ext import register_atoms
5+
6+
class BasicEventBus:
7+
def __init__(self):
8+
self.subscriptions = defaultdict(list)
9+
self.is_running = True
10+
def terminate(self):
11+
self.is_running = False
12+
def create_subscription(self, topic, cb):
13+
self.subscriptions[topic].append(cb)
14+
def publish(self, topic, msg):
15+
for cb in self.subscriptions[topic]:
16+
cb(msg)
17+
def get_logger(self):
18+
return logging.getLogger("MockNode")
19+
20+
@register_atoms
21+
def event_atoms():
22+
return {
23+
r"basic-event-bus": OperationAtom('basic-event-bus', BasicEventBus)
24+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
!(import! &self agents)
2+
3+
; checking that we can create passive agents and call their functions
4+
; passive agents do nothing unless they are directly called
5+
6+
; basic agent from file
7+
!(assertEqual
8+
((create-agent agent.metta) (g 3))
9+
9)
10+
11+
; basic agent from expression
12+
!(assertEqual
13+
((create-agent (quote (= (f $x) (+ 1 $x)))) (f 10))
14+
11)
15+
16+
; event-agent has the same behavior when event_bus is not provided
17+
!(add-reduct &self
18+
(= (my-agent) (event-agent agent.metta)))
19+
; multiple consequent calls should work
20+
!(assertEqual
21+
((my-agent) (h 3 5))
22+
15)
23+
!(assertEqual
24+
((my-agent) (g 4))
25+
16)
26+
27+
; atm, we can access Python methods of the agent, although
28+
; they are all supposed to be unwrap=True
29+
; also, event-agent without event bus is not a daemon
30+
!(assertEqual
31+
((my-agent) .is_daemon)
32+
False)
33+

0 commit comments

Comments
 (0)