Skip to content

Latest commit

 

History

History
316 lines (222 loc) · 7.97 KB

emitter.md

File metadata and controls

316 lines (222 loc) · 7.97 KB

👀 Emitter (Observability)

Table of Contents


Overview

The Emitter is a powerful event management and observability tool that allows you to track, monitor, and react to events happening within your AI agents and workflows.

This flexible event-driven mechanism providers the ability to:

  • Observe system events
  • Debug agent behaviors
  • Log and track agent interactions
  • Implement custom event handling

Note

Location within the framework: beeai_framework/emitter.

Basic usage

The following example demonstrates how the Emitter feature works.

import asyncio
import json
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    # Get the root emitter or create your own
    root = Emitter.root()

    cleanup = root.match(
        "*.*", lambda data, event: print(f"Received event '{event.path}' with data {json.dumps(data)}")
    )

    await root.emit("start", {"id": 123})
    await root.emit("end", {"id": 123})

    cleanup()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/base.py

Note

You can create your own emitter by initiating the Emitter class, but typically it's better to use or fork the root one.

Key features

Event matching

Event matching allows you to:

  • Listen to specific event types
  • Use wildcard matching
  • Handle nested events
import asyncio
import re
import sys
import traceback

from beeai_framework.adapters.ollama import OllamaChatModel
from beeai_framework.backend import ChatModel
from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    emitter = Emitter.root().child(namespace=["app"])
    model = OllamaChatModel()

    # Match events by a concrete name (strictly typed)
    emitter.on("update", lambda data, event: print(data, ": on update"))

    # Match all events emitted directly on the instance (not nested)
    emitter.match("*", lambda data, event: print(data, ": match all instance"))

    # Match all events (included nested)
    cleanup = Emitter.root().match("*.*", lambda data, event: print(data, ": match all nested"))

    # Match events by providing a filter function
    model.emitter.match(
        lambda event: isinstance(event.creator, ChatModel), lambda data, event: print(data, ": match ChatModel")
    )

    # Match events by regex
    emitter.match(re.compile(r"watsonx"), lambda data, event: print(data, ": match regex"))

    await emitter.emit("update", "update")
    await Emitter.root().emit("root", "root")
    await model.emitter.emit("model", "model")

    cleanup()  # You can remove a listener from an emitter by calling the cleanup function it returns


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/matchers.py

Event piping

Event piping enables:

  • Transferring events between emitters
  • Transforming events in transit
  • Creating complex event workflows
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    first: Emitter = Emitter(namespace=["app"])

    first.match(
        "*.*",
        lambda data, event: print(
            f"'first' has retrieved the following event '{event.path}', isDirect: {event.source == first}"
        ),
    )

    second: Emitter = Emitter(namespace=["app", "llm"])

    second.match(
        "*.*",
        lambda data, event: print(
            f"'second' has retrieved the following event '{event.path}', isDirect: {event.source == second}"
        ),
    )

    # Propagate all events from the 'second' emitter to the 'first' emitter
    unpipe = second.pipe(first)

    await first.emit("a", {})
    await second.emit("b", {})

    print("Unpipe")
    unpipe()

    await first.emit("c", {})
    await second.emit("d", {})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/piping.py


Framework usage

Typically, you consume out-of-the-box modules that use the Emitter concept on your behalf.

Agent usage

Integrate emitters with agents to:

  • Track agent decision-making
  • Log agent interactions
  • Debug agent behaviors
import asyncio
import sys
import traceback

from beeai_framework.adapters.ollama import OllamaChatModel
from beeai_framework.agents.react import ReActAgent
from beeai_framework.errors import FrameworkError
from beeai_framework.memory import UnconstrainedMemory


async def main() -> None:
    agent = ReActAgent(
        llm=OllamaChatModel("llama3.1"),
        memory=UnconstrainedMemory(),
        tools=[],
    )

    # Matching events on the instance level
    agent.emitter.match("*.*", lambda data, event: None)

    # Matching events on the execution (run) level
    await agent.run("Hello agent!").observe(
        lambda emitter: emitter.match("*.*", lambda data, event: print(f"RUN LOG: received event '{event.path}'"))
    )


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/agentMatchers.py

Note

The observe method is also supported on Tools and Backend.

Tip

See the events documentation for more information on standard emitter events.


Advanced usage

Advanced techniques include:

  • Custom event handlers
  • Complex event filtering
  • Performance optimization
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    # Create emitter with a type support
    emitter = Emitter.root().child(
        namespace=["bee", "demo"],
        creator={},  # typically a class
        context={},  # custom data (propagates to the event's context property)
        group_id=None,  # optional id for grouping common events (propagates to the event's groupId property)
        trace=None,  # data to identify what emitted what and in which context (internally used by framework components)
    )

    # Listen for "start" event
    emitter.on("start", lambda data, event: print(f"Received '{event.name}' event with id '{data['id']}'"))

    # Listen for "update" event
    emitter.on(
        "update", lambda data, event: print(f"Received '{event.name}' with id '{data['id']}' and data '{data['data']}'")
    )

    await emitter.emit("start", {"id": 123})
    await emitter.emit("update", {"id": 123, "data": "Hello Bee!"})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

Source: examples/emitter/advanced.py


Examples

  • All emitter examples can be found in here.