Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/adk-py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ dependencies = [
"jsonpatch>=1.33",
]

[project.optional-dependencies]
memoryhub = ["memoryhub>=0.7.0"]

[dependency-groups]
dev = [
"beeai-framework[duckduckgo,wikipedia]>=0.1.76",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
from .form import *
from .llm import *
from .mcp import *
from .memoryhub import *
from .platform import *
165 changes: 165 additions & 0 deletions apps/adk-py/src/kagenti_adk/a2a/extensions/services/memoryhub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright 2026 © IBM Corp.
# SPDX-License-Identifier: Apache-2.0


from __future__ import annotations

import os
from types import NoneType
from typing import TYPE_CHECKING, Any, Self

import pydantic
from a2a.server.agent_execution.context import RequestContext
from a2a.types import Message as A2AMessage
from pydantic import SecretStr
from typing_extensions import override

from kagenti_adk.a2a.extensions.base import (
DEFAULT_DEMAND_NAME,
BaseExtensionClient,
BaseExtensionServer,
BaseExtensionSpec,
)
from kagenti_adk.util.pydantic import REVEAL_SECRETS, SecureBaseModel, redact_str

__all__ = [
"MemoryHubDemand",
"MemoryHubExtensionClient",
"MemoryHubExtensionMetadata",
"MemoryHubExtensionParams",
"MemoryHubExtensionServer",
"MemoryHubExtensionSpec",
"MemoryHubFulfillment",
]

if TYPE_CHECKING:
from kagenti_adk.server.context import RunContext


class MemoryHubFulfillment(SecureBaseModel):
"""Connection details the client provides for a MemoryHub instance."""

url: str
"""
Base URL of the MemoryHub MCP endpoint, e.g.
``https://memory-hub-mcp.example.com/mcp/``.
"""

api_key: SecretStr | None = None
"""
Static API key for the dev/testing path. Mutually exclusive with the
OAuth fields below.
"""

auth_url: str | None = None
"""
OAuth 2.1 token endpoint. Required together with ``client_id`` and
``client_secret`` for the OAuth path.
"""

client_id: str | None = None
"""
OAuth 2.1 client identifier.
"""

client_secret: SecretStr | None = None
"""
OAuth 2.1 client secret.
"""

@pydantic.field_serializer("url")
def _redact_url(self, v: str, info) -> str:
return redact_str(v, info)


class MemoryHubDemand(pydantic.BaseModel):
"""A request from the agent for a MemoryHub fulfillment."""

description: str | None = None
"""
Short description of how the memory store will be used. Intended to be
shown in the UI alongside a connection picker.
"""


class MemoryHubExtensionParams(pydantic.BaseModel):
memoryhub_demands: dict[str, MemoryHubDemand]
"""MemoryHub connections that the agent requires the client to provide."""


class MemoryHubExtensionMetadata(pydantic.BaseModel):
memoryhub_fulfillments: dict[str, MemoryHubFulfillment] = {}
"""Connection details corresponding to the agent's demands."""


class MemoryHubExtensionSpec(BaseExtensionSpec[MemoryHubExtensionParams, MemoryHubExtensionMetadata]):
URI: str = "https://a2a-extensions.adk.kagenti.dev/services/memoryhub/v1"

@classmethod
def single_demand(
cls,
name: str = DEFAULT_DEMAND_NAME,
description: str | None = None,
default: MemoryHubFulfillment | None = None,
) -> Self:
return cls(
params=MemoryHubExtensionParams(
memoryhub_demands={name: MemoryHubDemand(description=description)}
),
default=(
MemoryHubExtensionMetadata(memoryhub_fulfillments={name: default}) if default else None
),
)


class MemoryHubExtensionServer(BaseExtensionServer[MemoryHubExtensionSpec, MemoryHubExtensionMetadata]):
@override
def handle_incoming_message(self, message: A2AMessage, run_context: RunContext, request_context: RequestContext):
super().handle_incoming_message(message, run_context, request_context)

if not self._metadata_from_client or not self._metadata_from_client.memoryhub_fulfillments:
fulfillment = _memoryhub_fulfillment_from_env()
if fulfillment:
self._metadata_from_client = MemoryHubExtensionMetadata(
memoryhub_fulfillments={"default": fulfillment}
)


class MemoryHubExtensionClient(BaseExtensionClient[MemoryHubExtensionSpec, NoneType]):
def fulfillment_metadata(
self, *, memoryhub_fulfillments: dict[str, MemoryHubFulfillment]
) -> dict[str, Any]:
return {
self.spec.URI: MemoryHubExtensionMetadata(
memoryhub_fulfillments=memoryhub_fulfillments
).model_dump(mode="json", context={REVEAL_SECRETS: True})
}


def _memoryhub_fulfillment_from_env() -> MemoryHubFulfillment | None:
"""Build a default MemoryHub fulfillment from environment variables.

Reads ``MEMORYHUB_URL`` (required), and either ``MEMORYHUB_API_KEY``
(dev path) or ``MEMORYHUB_AUTH_URL`` + ``MEMORYHUB_CLIENT_ID`` +
``MEMORYHUB_CLIENT_SECRET`` (OAuth 2.1 path). Returns None if no URL
is set or no usable credential is available.
"""
url = os.environ.get("MEMORYHUB_URL")
if not url:
return None

api_key = os.environ.get("MEMORYHUB_API_KEY")
auth_url = os.environ.get("MEMORYHUB_AUTH_URL")
client_id = os.environ.get("MEMORYHUB_CLIENT_ID")
client_secret = os.environ.get("MEMORYHUB_CLIENT_SECRET")

if api_key:
return MemoryHubFulfillment(url=url, api_key=SecretStr(api_key))
if auth_url and client_id and client_secret:
return MemoryHubFulfillment(
url=url,
auth_url=auth_url,
client_id=client_id,
client_secret=SecretStr(client_secret),
)
return None
5 changes: 3 additions & 2 deletions apps/adk-py/src/kagenti_adk/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ def __init__(
def __call__(
self, message: Message, context: RunContext, request_context: RequestContext
) -> AbstractAsyncContextManager[Dependency]:
instance = self._dependency_callable(message, context, request_context)

@asynccontextmanager
async def lifespan() -> AsyncGenerator[Dependency]:
instance = self._dependency_callable(message, context, request_context)
if inspect.isawaitable(instance):
instance = await instance
if self.extension or hasattr(instance, "lifespan"):
async with instance.lifespan():
yield instance
Expand Down
22 changes: 22 additions & 0 deletions apps/adk-py/src/kagenti_adk/server/store/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2026 © IBM Corp.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations


class MemoryRejectionError(RuntimeError):
"""Raised when the memory store refused to record a memory.

Backends that run a pre-storage pipeline (deduplication, contradiction
detection, policy/curator rules) may reject a write. The ``reason``
attribute carries the backend's explanation when one is provided.
"""

def __init__(self, reason: str | None = None):
msg = (
f"Memory store rejected the memory: {reason}"
if reason
else "Memory store rejected the memory"
)
super().__init__(msg)
self.reason = reason
140 changes: 140 additions & 0 deletions apps/adk-py/src/kagenti_adk/server/store/memory_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2026 © IBM Corp.
# SPDX-License-Identifier: Apache-2.0

"""Long-term governed memory store abstraction for AI agents.

This module defines the MemoryStore protocol — a complement to ContextStore
that handles durable, cross-session knowledge rather than per-context
conversation replay. ContextStore answers "what was said in this conversation";
MemoryStore answers "what does this agent know across all conversations."

The protocol is backend-agnostic. The MemoryHub implementation in
memoryhub_memory_store.py is one concrete backend; others (Redis, SQLite,
in-memory for testing) can implement the same interface.
"""

from __future__ import annotations

import abc
from typing import Protocol

from pydantic import BaseModel, Field

__all__ = [
"MemoryResult",
"MemoryStore",
"MemoryStoreInstance",
]


class MemoryResult(BaseModel):
"""A single memory returned from search or read.

Field semantics are intentionally backend-agnostic. Concrete backends
map their own concepts onto these fields; the MemoryHub backend's
mapping is documented inline as a worked example.
"""

memory_id: str = Field(
description="Backend-assigned identifier for the memory."
)
content: str = Field(
description="The memory's payload. May be a stub for search results."
)
scope: str = Field(
description=(
"Visibility/governance domain. Backend-defined; in MemoryHub: "
"one of user/project/campaign/organizational/enterprise."
)
)
weight: float = Field(
default=0.7,
description=(
"Priority/curation signal in the range 0.0-1.0. Backends may use "
"it for ranking or ignore it."
),
)
relevance_score: float | None = Field(
default=None,
description=(
"Search relevance score returned by the backend; None for "
"non-search results."
),
)


class MemoryStoreInstance(Protocol):
"""Operations on governed memory, scoped to a context.

Each method maps to a standard memory lifecycle operation.
Implementations should raise backend-specific errors for
authorization failures or validation issues.

Common keyword arguments share semantics across all methods:

- ``scope``: Visibility/governance domain. Backend-defined; in
MemoryHub: one of user/project/campaign/organizational/enterprise.
- ``weight``: Priority/curation signal in the range 0.0-1.0. Backends
may use it for ranking or ignore it.
- ``tags``: Free-form tags for grouping/filtering. Backend-defined
semantics; in MemoryHub: "domains" attached to a memory.
- ``project_id``: Optional grouping within a memory store;
backend-defined semantics. In MemoryHub: a project with member-based
access control. NOT a tenancy boundary — tenancy is established by
the backend's auth credentials.
"""

async def search(
self,
query: str,
*,
scope: str | None = None,
project_id: str | None = None,
max_results: int = 10,
) -> list[MemoryResult]:
"""Search for memories matching ``query``.

``scope`` and ``project_id`` filter results; their semantics are
backend-defined. See the class docstring for the cross-method
conventions.
"""
...

async def create(
self,
content: str,
*,
scope: str = "user",
weight: float = 0.7,
tags: list[str] | None = None,
project_id: str | None = None,
) -> str:
"""Create a new memory. Returns the new memory_id.

``scope``, ``weight``, ``tags`` and ``project_id`` follow the
cross-method conventions documented on the class.

Implementations may raise to signal that the backend rejected the write.
"""
...
Comment thread
JanPokorny marked this conversation as resolved.

async def read(self, memory_id: str) -> MemoryResult | None: ...

async def update(self, memory_id: str, content: str) -> None: ...

async def delete(self, memory_id: str) -> None: ...


class MemoryStore(abc.ABC):
"""Factory that creates MemoryStoreInstance objects per context.

Mirrors the ContextStore pattern: the factory holds connection config,
create() returns a per-context instance.
"""

@property
def required_extensions(self) -> set[str]:
return set()

@abc.abstractmethod
async def create(self, context_id: str) -> MemoryStoreInstance: ...
Loading