-
Notifications
You must be signed in to change notification settings - Fork 175
Expand file tree
/
Copy pathregistry.py
More file actions
96 lines (84 loc) · 3.52 KB
/
registry.py
File metadata and controls
96 lines (84 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from ai.backend.common.auth import PublicKey, SecretKey
from ai.backend.common.clients.valkey_client.valkey_image.client import ValkeyImageClient
from ai.backend.common.clients.valkey_client.valkey_live.client import ValkeyLiveClient
from ai.backend.common.clients.valkey_client.valkey_stat.client import ValkeyStatClient
from ai.backend.common.dependencies import NonMonitorableDependencyProvider
from ai.backend.common.events.dispatcher import EventProducer
from ai.backend.common.events.hub.hub import EventHub
from ai.backend.common.plugin.hook import HookPluginContext
from ai.backend.manager.agent_cache import AgentRPCCache
from ai.backend.manager.clients.agent import AgentClientPool
from ai.backend.manager.config.provider import ManagerConfigProvider
from ai.backend.manager.models.storage import StorageSessionManager
from ai.backend.manager.models.utils import ExtendedAsyncSAEngine
from ai.backend.manager.plugin.network import NetworkPluginContext
from ai.backend.manager.registry import AgentRegistry
from ai.backend.manager.repositories.user.repository import UserRepository
from ai.backend.manager.sokovan.scheduling_controller.scheduling_controller import (
SchedulingController,
)
@dataclass
class AgentRegistryInput:
"""Input required for agent registry setup."""
config_provider: ManagerConfigProvider
db: ExtendedAsyncSAEngine
agent_cache: AgentRPCCache
agent_client_pool: AgentClientPool
valkey_stat: ValkeyStatClient
valkey_live: ValkeyLiveClient
valkey_image: ValkeyImageClient
event_producer: EventProducer
event_hub: EventHub
storage_manager: StorageSessionManager
hook_plugin_ctx: HookPluginContext
network_plugin_ctx: NetworkPluginContext
scheduling_controller: SchedulingController
user_repository: UserRepository
debug: bool
manager_public_key: PublicKey
manager_secret_key: SecretKey
class AgentRegistryDependency(
NonMonitorableDependencyProvider[AgentRegistryInput, AgentRegistry],
):
"""Provides AgentRegistry lifecycle management."""
@property
def stage_name(self) -> str:
return "agent-registry"
@asynccontextmanager
async def provide(self, setup_input: AgentRegistryInput) -> AsyncIterator[AgentRegistry]:
"""Initialize and provide an agent registry.
Creates the AgentRegistry, calls init() for startup,
and calls shutdown() during cleanup.
Args:
setup_input: Input containing all registry dependencies
Yields:
Initialized AgentRegistry
"""
registry = AgentRegistry(
setup_input.config_provider,
setup_input.db,
setup_input.agent_cache,
setup_input.agent_client_pool,
setup_input.valkey_stat,
setup_input.valkey_live,
setup_input.valkey_image,
setup_input.event_producer,
setup_input.event_hub,
setup_input.storage_manager,
setup_input.hook_plugin_ctx,
setup_input.network_plugin_ctx,
setup_input.scheduling_controller,
setup_input.user_repository,
debug=setup_input.debug,
manager_public_key=setup_input.manager_public_key,
manager_secret_key=setup_input.manager_secret_key,
)
await registry.init()
try:
yield registry
finally:
await registry.shutdown()