Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions python/aibrix/aibrix/metadata/api/v1/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

from typing import Optional

import redis.asyncio as redis
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field, field_validator

from aibrix.logger import init_logger
from aibrix.metadata.store import MetadataStore

logger = init_logger(__name__)
router = APIRouter()
Expand Down Expand Up @@ -54,27 +54,27 @@ class UserResponse(BaseModel):


def _gen_key(name: str) -> str:
"""Generate Redis key for user.
"""Generate storage key for user.

Args:
name: User name

Returns:
Redis key in format: aibrix-users/{name}
Storage key in format: aibrix-users/{name}
"""
return f"aibrix-users/{name}"


async def _get_redis_client(request: Request) -> redis.Redis:
"""Get Redis client from app state.
def _get_metadata_store(request: Request) -> MetadataStore:
"""Get metadata store from app state.

Args:
request: FastAPI request object

Returns:
Redis client instance
MetadataStore instance
"""
return request.app.state.redis_client
return request.app.state.metadata_store


@router.post("/CreateUser")
Expand All @@ -90,17 +90,16 @@ async def create_user(request: Request, user: User) -> UserResponse:
Returns:
UserResponse with creation status
"""
redis_client = await _get_redis_client(request)
store = _get_metadata_store(request)
key = _gen_key(user.name)

# Check if user exists
exists = await redis_client.exists(key)
if exists:
if await store.exists(key):
logger.info(f"User already exists: {user.name}")
return UserResponse(message=f"User: {user.name} exists", user=user)

# Store user as JSON
await redis_client.set(key, user.model_dump_json())
await store.set(key, user.model_dump_json())

logger.info(f"Created user: {user.name}, rpm={user.rpm}, tpm={user.tpm}")
return UserResponse(message=f"Created User: {user.name}", user=user)
Expand All @@ -120,10 +119,10 @@ async def read_user(request: Request, user: User) -> UserResponse:
Raises:
HTTPException: 404 if user not found
"""
redis_client = await _get_redis_client(request)
store = _get_metadata_store(request)
key = _gen_key(user.name)

data = await redis_client.get(key)
data = await store.get(key)
if not data:
logger.warning(f"User not found: {user.name}")
raise HTTPException(status_code=404, detail="user does not exist")
Expand All @@ -149,17 +148,16 @@ async def update_user(request: Request, user: User) -> UserResponse:
Raises:
HTTPException: 404 if user not found
"""
redis_client = await _get_redis_client(request)
store = _get_metadata_store(request)
key = _gen_key(user.name)

# Check if user exists
exists = await redis_client.exists(key)
if not exists:
if not await store.exists(key):
logger.warning(f"Cannot update non-existent user: {user.name}")
raise HTTPException(status_code=404, detail=f"User: {user.name} does not exist")

# Update user
await redis_client.set(key, user.model_dump_json())
await store.set(key, user.model_dump_json())

logger.info(f"Updated user: {user.name}, rpm={user.rpm}, tpm={user.tpm}")
return UserResponse(message=f"Updated User: {user.name}", user=user)
Expand All @@ -179,17 +177,16 @@ async def delete_user(request: Request, user: User) -> UserResponse:
Raises:
HTTPException: 404 if user not found
"""
redis_client = await _get_redis_client(request)
store = _get_metadata_store(request)
key = _gen_key(user.name)

# Check if user exists
exists = await redis_client.exists(key)
if not exists:
if not await store.exists(key):
logger.warning(f"Cannot delete non-existent user: {user.name}")
raise HTTPException(status_code=404, detail=f"User: {user.name} does not exist")

# Delete user
await redis_client.delete(key)
await store.delete(key)

logger.info(f"Deleted user: {user.name}")
return UserResponse(message=f"Deleted User: {user.name}", user=user)
25 changes: 17 additions & 8 deletions python/aibrix/aibrix/metadata/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from aibrix.metadata.cache import JobCache
from aibrix.metadata.core import HTTPXClientWrapper, KopfOperatorWrapper
from aibrix.metadata.setting import settings
from aibrix.metadata.store import RedisMetadataStore
from aibrix.storage import create_storage

logger = init_logger(__name__)
Expand All @@ -44,13 +45,16 @@ async def liveness_check():

@router.get("/readyz")
async def readiness_check(request: Request):
# Check if Redis is ready
# Check if metadata store is ready
try:
if hasattr(request.app.state, "redis_client"):
if hasattr(request.app.state, "metadata_store"):
await request.app.state.metadata_store.ping()
# Backward compatibility: check redis_client if metadata_store not set
elif hasattr(request.app.state, "redis_client"):
await request.app.state.redis_client.ping()
Comment on lines +51 to 54
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readiness_check awaits metadata_store.ping() but ignores its boolean result. With the current RedisMetadataStore.ping() implementation returning False on failure (instead of raising), /readyz can incorrectly report "ready" even when the backend is unhealthy. Consider checking the returned value and returning 503 when it is falsy (or change ping() to raise on failure and keep this handler exception-based).

Suggested change
await request.app.state.metadata_store.ping()
# Backward compatibility: check redis_client if metadata_store not set
elif hasattr(request.app.state, "redis_client"):
await request.app.state.redis_client.ping()
ping_ok = await request.app.state.metadata_store.ping()
if not ping_ok:
logger.error("Metadata store ping returned a falsy result.")
return JSONResponse(
content={"status": "not ready", "error": "metadata store unavailable"},
status_code=503,
)
# Backward compatibility: check redis_client if metadata_store not set
elif hasattr(request.app.state, "redis_client"):
ping_ok = await request.app.state.redis_client.ping()
if not ping_ok:
logger.error("Redis client ping returned a falsy result.")
return JSONResponse(
content={"status": "not ready", "error": "redis client unavailable"},
status_code=503,
)

Copilot uses AI. Check for mistakes.
return JSONResponse(content={"status": "ready"}, status_code=200)
except Exception as e:
logger.error(f"Redis health check failed: {e}")
logger.error(f"Metadata store health check failed: {e}")
return JSONResponse(
content={"status": "not ready", "error": str(e)}, status_code=503
)
Expand Down Expand Up @@ -87,16 +91,19 @@ async def lifespan(app: FastAPI):
# Code executed on startup
logger.info("Initializing FastAPI app...")

# Initialize Redis client
app.state.redis_client = redis.Redis(
# Initialize metadata store (abstraction over Redis)
metadata_store = RedisMetadataStore(
host=envs.STORAGE_REDIS_HOST or "localhost",
port=envs.STORAGE_REDIS_PORT,
db=envs.STORAGE_REDIS_DB,
password=envs.STORAGE_REDIS_PASSWORD,
decode_responses=False,
)
app.state.metadata_store = metadata_store
# Backward compatibility: expose underlying Redis client for components
# that haven't migrated to the MetadataStore interface yet
app.state.redis_client = metadata_store.client
Comment on lines +94 to +104
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lifespan() unconditionally creates and assigns a new RedisMetadataStore to app.state.metadata_store (and resets app.state.redis_client). This makes it hard to inject a mock/in-memory store for tests or alternative bootstrapping code, because any pre-set metadata_store will be overwritten when lifespan runs. Consider initializing the store only if it isn't already present (or allowing an override via args/env), so callers/tests can provide their own store instance.

Copilot uses AI. Check for mistakes.
logger.info(
f"Redis client initialized: {envs.STORAGE_REDIS_HOST}:{envs.STORAGE_REDIS_PORT}"
f"Metadata store initialized: {envs.STORAGE_REDIS_HOST}:{envs.STORAGE_REDIS_PORT}"
)

if hasattr(app.state, "httpx_client_wrapper"):
Expand All @@ -115,7 +122,9 @@ async def lifespan(app: FastAPI):
app.state.kopf_operator_wrapper.stop()
if hasattr(app.state, "httpx_client_wrapper"):
await app.state.httpx_client_wrapper.stop()
if hasattr(app.state, "redis_client"):
if hasattr(app.state, "metadata_store"):
await app.state.metadata_store.close()
elif hasattr(app.state, "redis_client"):
await app.state.redis_client.aclose() # type: ignore[attr-defined]
logger.info("Redis client closed")

Expand Down
159 changes: 159 additions & 0 deletions python/aibrix/aibrix/metadata/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Metadata store abstraction layer.

Provides a clean interface for metadata key-value operations
(e.g., user CRUD) instead of directly referencing a Redis client.
This allows for easier testing, backend swapping, and separation of concerns.
"""

from abc import ABC, abstractmethod
from typing import Optional

import redis.asyncio as redis

from aibrix.logger import init_logger

logger = init_logger(__name__)


class MetadataStore(ABC):
"""Abstract base class for metadata key-value storage.

This interface defines the operations needed by the metadata service
for storing and retrieving structured data (users, configs, etc.).
Unlike the storage.BaseStorage which is designed for file/object storage,
this interface is tailored for simple key-value metadata operations.
"""

@abstractmethod
async def get(self, key: str) -> Optional[bytes]:
"""Get value by key.

Args:
key: The key to look up.

Returns:
The value as bytes if found, None otherwise.
"""
...

@abstractmethod
async def set(self, key: str, value: str | bytes) -> bool:
"""Set a key-value pair.

Args:
key: The key to store.
value: The value to store (string or bytes).

Returns:
True if the operation succeeded.
"""
...

@abstractmethod
async def exists(self, key: str) -> bool:
"""Check if a key exists.

Args:
key: The key to check.

Returns:
True if the key exists, False otherwise.
"""
...

@abstractmethod
async def delete(self, key: str) -> bool:
"""Delete a key.

Args:
key: The key to delete.

Returns:
True if the key was deleted, False if it didn't exist.
"""
...

@abstractmethod
async def ping(self) -> bool:
"""Check if the store backend is reachable.

Returns:
True if the backend is healthy.
"""
...

@abstractmethod
async def close(self) -> None:
"""Close the connection to the store backend."""
...


class RedisMetadataStore(MetadataStore):
"""Redis-backed implementation of the metadata store."""

def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
):
self._client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=False,
)
logger.info(f"Redis metadata store initialized: {host}:{port}")

@property
def client(self) -> redis.Redis:
"""Expose underlying Redis client for backward compatibility.

This property allows existing code that directly accesses the
Redis client to continue working during the migration period.
New code should use the MetadataStore interface methods instead.
"""
return self._client

async def get(self, key: str) -> Optional[bytes]:
data = await self._client.get(key)
return data

async def set(self, key: str, value: str | bytes) -> bool:
result = await self._client.set(key, value)
return bool(result)

async def exists(self, key: str) -> bool:
result = await self._client.exists(key)
return bool(result)

async def delete(self, key: str) -> bool:
result = await self._client.delete(key)
return bool(result)

async def ping(self) -> bool:
try:
return await self._client.ping()
except Exception:
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RedisMetadataStore.ping() swallows all exceptions and returns False without logging. This makes failures harder to diagnose and (combined with callers that don't check the return value) can silently mask outages. Consider either (a) logging the exception and returning False, with callers explicitly checking the result, or (b) letting exceptions propagate and treating failures via exception handling.

Suggested change
except Exception:
except Exception:
logger.exception("Redis metadata store ping failed")

Copilot uses AI. Check for mistakes.
return False

async def close(self) -> None:
await self._client.aclose()
logger.info("Redis metadata store closed")
Loading
Loading