Skip to content

Commit 23f3a6c

Browse files
authored
Merge branch 'main' into fix-unique
2 parents 54d68aa + 021a037 commit 23f3a6c

File tree

3 files changed

+153
-29
lines changed

3 files changed

+153
-29
lines changed

python/hyperon/atoms.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,35 @@ class MettaError(Exception):
311311
, but we don't want to output Python error stack."""
312312
pass
313313

314+
def unwrap_args(atoms):
315+
args = []
316+
kwargs = {}
317+
for a in atoms:
318+
if isinstance(a, ExpressionAtom):
319+
ch = a.get_children()
320+
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
321+
for c in ch[1:]:
322+
try:
323+
kwarg = c.get_children()
324+
assert len(kwarg) == 2
325+
except:
326+
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
327+
try:
328+
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
329+
except:
330+
raise NoReduceError()
331+
continue
332+
try:
333+
args.append(a.get_object().content)
334+
except:
335+
# NOTE:
336+
# Currently, applying grounded operations to pure atoms is not reduced.
337+
# If we want, we can raise an exception, or form an error expression instead,
338+
# so a MeTTa program can catch and analyze it.
339+
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
340+
raise NoReduceError()
341+
return args, kwargs
342+
314343
class OperationObject(GroundedObject):
315344
"""
316345
An OperationObject represents an operation as a grounded object, allowing for more
@@ -385,32 +414,7 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
385414
"""
386415
# type-check?
387416
if self.unwrap:
388-
args = []
389-
kwargs = {}
390-
for a in atoms:
391-
if isinstance(a, ExpressionAtom):
392-
ch = a.get_children()
393-
if len(ch) > 0 and repr(ch[0]) == "Kwargs":
394-
for c in ch[1:]:
395-
try:
396-
kwarg = c.get_children()
397-
assert len(kwarg) == 2
398-
except:
399-
raise RuntimeError(f"Incorrect kwarg format {kwarg}")
400-
try:
401-
kwargs[get_string_value(kwarg[0])] = kwarg[1].get_object().content
402-
except:
403-
raise NoReduceError()
404-
continue
405-
try:
406-
args.append(a.get_object().content)
407-
except:
408-
# NOTE:
409-
# Currently, applying grounded operations to pure atoms is not reduced.
410-
# If we want, we can raise an exception, or form an error expression instead,
411-
# so a MeTTa program can catch and analyze it.
412-
# raise RuntimeError("Grounded operation " + self.name + " with unwrap=True expects only grounded arguments")
413-
raise NoReduceError()
417+
args, kwargs = unwrap_args(atoms)
414418
try:
415419
result = self.op(*args, **kwargs)
416420
except MettaError as e:
@@ -422,7 +426,9 @@ def execute(self, *atoms, res_typ=AtomType.UNDEFINED):
422426
return [ValueAtom(result, res_typ)]
423427
else:
424428
result = self.op(*atoms)
425-
if not isinstance(result, list):
429+
try:
430+
iter(result)
431+
except TypeError:
426432
raise RuntimeError("Grounded operation `" + self.name + "` should return list")
427433
return result
428434

python/hyperon/exts/agents/agent_base.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,42 @@
44
'''
55
This is very preliminary and incomplete PoC version.
66
However, it is put to exts, because metta-motto depends on it.
7+
Reagrding threading:
8+
- Generic threading for metta can be introduced with
9+
parallel and sequential composition, for-comprehension, etc.
10+
Agents could be built on top of this functionality. However,
11+
this piece of code was driven by metta-motto demands.
12+
- Two main cases for agents are:
13+
-- Immediate call with inputs to get outputs
14+
-- Asynchronous events and responses
15+
Supporting both cases in one implementation is more convenient,
16+
because both of them can be needed simultaneously in certain
17+
domains (e.g. metta-motto)
18+
- Implementation can be quite different.
19+
-- Agents could be started explicitly
20+
-- They could inherint from StreamMethod
21+
-- Other methods could be called directly without StreamMethod wrapper
22+
All these nuances are to be fleshed out
723
'''
824

25+
import threading
26+
from queue import Queue
27+
class StreamMethod(threading.Thread):
28+
def __init__(self, method, args):
29+
super().__init__() #daemon=True
30+
self._result = Queue()
31+
self.method = method
32+
self.args = args
33+
def run(self):
34+
for r in self.method(*self.args):
35+
self._result.put(r)
36+
def __iter__(self):
37+
return self
38+
def __next__(self):
39+
if self._result.empty() and not self.is_alive():
40+
raise StopIteration
41+
return self._result.get()
42+
943
class AgentObject:
1044

1145
'''
@@ -58,6 +92,9 @@ def _try_unwrap(self, val):
5892
return repr(val)
5993

6094
def __init__(self, path=None, atoms={}, include_paths=None, code=None):
95+
if path is None and code is None:
96+
# purely Python agent
97+
return
6198
# The first argument is either path or code when called from MeTTa
6299
if isinstance(path, ExpressionAtom):# and path != E():
63100
code = path
@@ -106,16 +143,28 @@ def __call__(self, atom):
106143
)
107144
return self._metta.evaluate_atom(atom)
108145

146+
def is_daemon(self):
147+
return hasattr(self, 'daemon') and self.daemon is True
148+
109149
def __metta_call__(self, *args):
150+
call = True
110151
method = self.__call__
111152
if len(args) > 0 and isinstance(args[0], SymbolAtom):
112153
n = args[0].get_name()
113154
if n[0] == '.' and hasattr(self, n[1:]):
114155
method = getattr(self, n[1:])
115156
args = args[1:]
157+
call = False
116158
if self._unwrap:
117-
return OperationObject(f"{method}", method).execute(*args)
118-
return method(*args)
159+
method = OperationObject(f"{method}", method).execute
160+
st = StreamMethod(method, args)
161+
st.start()
162+
# We don't return the stream here; otherwise it will be consumed immediately.
163+
# If the agent itself would be StreamMethod, its results could be accessbile.
164+
# Here, they are lost (TODO?).
165+
if call and self.is_daemon():
166+
return [E()]
167+
return st
119168

120169

121170
@register_atoms(pass_metta=True)

python/hyperon/exts/agents/tests/test_agents.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from hyperon import MeTTa
22
from hyperon.exts.agents import AgentObject
3+
from queue import Queue
4+
from time import sleep
35

46
class MyAgent(AgentObject):
57
'''
@@ -20,3 +22,70 @@ def __call__(self, a, b):
2022
! (&agent 7 8)
2123
! (&agent .subs 4)
2224
'''))
25+
26+
# =================================
27+
28+
class Agent1(AgentObject):
29+
def __call__(self):
30+
for x in range(10):
31+
yield x
32+
sleep(1)
33+
class Agent2(AgentObject):
34+
def __call__(self, xs):
35+
for x in xs:
36+
yield x*2
37+
class Agent3(AgentObject):
38+
def __call__(self, xs):
39+
for x in xs:
40+
print("Result: ", x)
41+
42+
m = MeTTa()
43+
m.register_atom('new-agent-1', Agent1.agent_creator_atom())
44+
m.register_atom('new-agent-2', Agent2.agent_creator_atom())
45+
m.register_atom('new-agent-3', Agent3.agent_creator_atom())
46+
print(m.run('''
47+
! ((new-agent-3) ((new-agent-2) ((new-agent-1))))
48+
'''))
49+
50+
# =================================
51+
52+
class Agnt(AgentObject):
53+
def __init__(self):
54+
self.messages = Queue()
55+
self.running = False
56+
self.output = Queue()
57+
self.daemon = True
58+
def __call__(self):
59+
self.running = True
60+
cnt = 0
61+
while self.running:
62+
if self.messages.empty():
63+
self.output.put(f"Waiting {cnt}")
64+
sleep(2)
65+
cnt += 1
66+
else:
67+
m = self.messages.get()
68+
self.output.put(m[::-1])
69+
def stop(self):
70+
self.running = False
71+
def input(self, msg):
72+
self.messages.put(msg)
73+
def response(self):
74+
if self.output.empty():
75+
return None
76+
return self.output.get()
77+
78+
m = MeTTa()
79+
m.register_atom('agnt', Agnt.agent_creator_atom())
80+
print(m.run('''
81+
! (bind! &a1 (agnt))
82+
! (&a1)
83+
! (println! "Agent is running")
84+
! ((py-atom time.sleep) 1)
85+
! ("First response:" (&a1 .response))
86+
! (&a1 .input "Hello")
87+
! (println! "Agent is receiving messages")
88+
! ((py-atom time.sleep) 2)
89+
! ("Second response:" (&a1 .response))
90+
! (&a1 .stop)
91+
'''))

0 commit comments

Comments
 (0)