-
Notifications
You must be signed in to change notification settings - Fork 279
/
Copy pathpiping.py
46 lines (34 loc) · 1.06 KB
/
piping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import sys
import traceback
from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError
async def main() -> None:
first: Emitter = Emitter(namespace=["app"])
first.match(
"*.*",
lambda data, event: print(
f"'first' has retrieved the following event '{event.path}', isDirect: {event.source == first}"
),
)
second: Emitter = Emitter(namespace=["app", "llm"])
second.match(
"*.*",
lambda data, event: print(
f"'second' has retrieved the following event '{event.path}', isDirect: {event.source == second}"
),
)
# Propagate all events from the 'second' emitter to the 'first' emitter
unpipe = second.pipe(first)
await first.emit("a", {})
await second.emit("b", {})
print("Unpipe")
unpipe()
await first.emit("c", {})
await second.emit("d", {})
if __name__ == "__main__":
try:
asyncio.run(main())
except FrameworkError as e:
traceback.print_exc()
sys.exit(e.explain())