From f7684ca4b9a26ecb89fcfa2281feed9a4fe6a0b8 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Thu, 13 Feb 2025 06:40:58 +0000 Subject: [PATCH 1/3] feat: Distribute model service containers across different GPU servers (WIP) --- .../backend/manager/scheduler/dispatcher.py | 75 +++++++++++++------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/src/ai/backend/manager/scheduler/dispatcher.py b/src/ai/backend/manager/scheduler/dispatcher.py index 63cfee21573..5e9849881b2 100644 --- a/src/ai/backend/manager/scheduler/dispatcher.py +++ b/src/ai/backend/manager/scheduler/dispatcher.py @@ -68,6 +68,7 @@ RedisConnectionInfo, ResourceSlot, SessionId, + SessionTypes, aobject, ) from ai.backend.logging import BraceStyleAdapter @@ -403,26 +404,61 @@ def _pipeline(r: Redis) -> RedisPipeline: raise asyncio.CancelledError() raise - async def _load_scheduler( - self, - db_sess: SASession, - sgroup_name: str, - ) -> tuple[AbstractScheduler, AbstractAgentSelector]: + async def _get_scaling_group_data( + self, db_sess: SASession, sgroup_name: str + ) -> tuple[str, ScalingGroupOpts]: query = sa.select(ScalingGroupRow.scheduler, ScalingGroupRow.scheduler_opts).where( ScalingGroupRow.name == sgroup_name ) result = await db_sess.execute(query) row = result.first() - scheduler_name = row.scheduler - sgroup_opts: ScalingGroupOpts = row.scheduler_opts + if row is None: + raise ValueError(f"Scaling group '{sgroup_name}' not found.") + return row.scheduler, row.scheduler_opts + + async def _load_scheduler(self, db_sess: SASession, sgroup_name: str) -> AbstractScheduler: + scheduler_name, sgroup_opts = await self._get_scaling_group_data(db_sess, sgroup_name) + + global_scheduler_opts = {} + if self.shared_config["plugins"]["scheduler"]: + global_scheduler_opts = self.shared_config["plugins"]["scheduler"].get( + scheduler_name, {} + ) + scheduler_config = {**global_scheduler_opts, **sgroup_opts.config} + + return load_scheduler(scheduler_name, sgroup_opts, scheduler_config) + + async def _load_agent_selector( + self, + db_sess: SASession, + sgroup_name: str, + pending_session: SessionRow, # TODO: id and session_type? + ) -> AbstractAgentSelector: + session_type = pending_session.session_type + + scheduler_name, sgroup_opts = await self._get_scaling_group_data(db_sess, sgroup_name) + match sgroup_opts.agent_selection_strategy: - # The names correspond to the entrypoint names (backendai_agentselector_v10). case AgentSelectionStrategy.LEGACY: agselector_name = "legacy" case AgentSelectionStrategy.ROUNDROBIN: agselector_name = "roundrobin" case AgentSelectionStrategy.CONCENTRATED: - agselector_name = "concentrated" + if session_type == SessionTypes.INFERENCE: + agselector_name = "roundrobin" + + # TODO Question: + # 이 시점에 이미 RoutingRow가 존재함... + # -> 이 방법으론 안 됨. + # result = await db_sess.execute(sa.select(RoutingRow).where(RoutingRow.session == pending_session.id)) + # routing_row = result.first() + # if routing_row is None: + # agselector_name = "concentrated" + # else: + # # 의도를 생각하면 여기에선 dispersed보다 roundrobin이 더 좋을 것 같긴 함. + # agselector_name = "roundrobin" + else: + agselector_name = "concentrated" case AgentSelectionStrategy.DISPERSED: agselector_name = "dispersed" case _ as unknown: @@ -430,13 +466,7 @@ async def _load_scheduler( f"Unknown agent selection strategy: {unknown!r}. Possible values: {[*AgentSelectionStrategy.__members__.keys()]}" ) - global_scheduler_opts = {} global_agselector_opts = {} - if self.shared_config["plugins"]["scheduler"]: - global_scheduler_opts = self.shared_config["plugins"]["scheduler"].get( - scheduler_name, {} - ) - scheduler_config = {**global_scheduler_opts, **sgroup_opts.config} if self.shared_config["plugins"]["agent-selector"]: global_agselector_opts = self.shared_config["plugins"]["agent-selector"].get( agselector_name, {} @@ -446,19 +476,13 @@ async def _load_scheduler( "agent-selection-resource-priority" ] - scheduler = load_scheduler( - scheduler_name, - sgroup_opts, - scheduler_config, - ) - agent_selector = load_agent_selector( + return load_agent_selector( agselector_name, sgroup_opts, agselector_config, agent_selection_resource_priority, self.shared_config, ) - return scheduler, agent_selector async def _schedule_in_sgroup( self, @@ -468,7 +492,8 @@ async def _schedule_in_sgroup( # Part 0: Load the scheduler and the agent selector. async with self.db.begin_readonly_session() as db_sess: - scheduler, agent_selector = await self._load_scheduler(db_sess, sgroup_name) + # 스케줄러 로드 -> pending session 리스트업 -> agent selector 로드 (pending session 타입에 따라 다른 agent selector 로드) + scheduler = await self._load_scheduler(db_sess, sgroup_name) existing_sessions, pending_sessions, cancelled_sessions = await _list_managed_sessions( db_sess, sgroup_name, scheduler.sgroup_opts.pending_timeout ) @@ -507,6 +532,10 @@ async def _schedule_in_sgroup( raise RuntimeError("should not reach here") pending_sess = pending_sessions.pop(picked_idx) log_fmt = "schedule(s:{}, prio:{}, type:{}, name:{}, ak:{}, cluster_mode:{}): " + + async with self.db.begin_readonly_session() as db_sess: + agent_selector = await self._load_agent_selector(db_sess, sgroup_name, pending_sess) + log_args = ( pending_sess.id, pending_sess.priority, From e91cb7ebfa8114bfca49a027656c009ceaaff6af Mon Sep 17 00:00:00 2001 From: jopemachine Date: Thu, 13 Feb 2025 06:49:50 +0000 Subject: [PATCH 2/3] fix: Remove temporary comment --- src/ai/backend/manager/scheduler/dispatcher.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/ai/backend/manager/scheduler/dispatcher.py b/src/ai/backend/manager/scheduler/dispatcher.py index 5e9849881b2..7cf6328f6c9 100644 --- a/src/ai/backend/manager/scheduler/dispatcher.py +++ b/src/ai/backend/manager/scheduler/dispatcher.py @@ -446,17 +446,6 @@ async def _load_agent_selector( case AgentSelectionStrategy.CONCENTRATED: if session_type == SessionTypes.INFERENCE: agselector_name = "roundrobin" - - # TODO Question: - # 이 시점에 이미 RoutingRow가 존재함... - # -> 이 방법으론 안 됨. - # result = await db_sess.execute(sa.select(RoutingRow).where(RoutingRow.session == pending_session.id)) - # routing_row = result.first() - # if routing_row is None: - # agselector_name = "concentrated" - # else: - # # 의도를 생각하면 여기에선 dispersed보다 roundrobin이 더 좋을 것 같긴 함. - # agselector_name = "roundrobin" else: agselector_name = "concentrated" case AgentSelectionStrategy.DISPERSED: From f9ad784a0886beaeef7642d15666ecf8fd27c8b2 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Thu, 13 Feb 2025 10:54:01 +0000 Subject: [PATCH 3/3] fix --- src/ai/backend/manager/scheduler/dispatcher.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/manager/scheduler/dispatcher.py b/src/ai/backend/manager/scheduler/dispatcher.py index 7cf6328f6c9..91ae7adbb00 100644 --- a/src/ai/backend/manager/scheduler/dispatcher.py +++ b/src/ai/backend/manager/scheduler/dispatcher.py @@ -436,7 +436,7 @@ async def _load_agent_selector( ) -> AbstractAgentSelector: session_type = pending_session.session_type - scheduler_name, sgroup_opts = await self._get_scaling_group_data(db_sess, sgroup_name) + _scheduler_name, sgroup_opts = await self._get_scaling_group_data(db_sess, sgroup_name) match sgroup_opts.agent_selection_strategy: case AgentSelectionStrategy.LEGACY: @@ -444,8 +444,10 @@ async def _load_agent_selector( case AgentSelectionStrategy.ROUNDROBIN: agselector_name = "roundrobin" case AgentSelectionStrategy.CONCENTRATED: + # TODO: If there are no services with the same model, it should operate as "concentrated". if session_type == SessionTypes.INFERENCE: - agselector_name = "roundrobin" + # TODO: Roundrobin? + agselector_name = "dispersed" else: agselector_name = "concentrated" case AgentSelectionStrategy.DISPERSED: