9
9
from .manager import LlmSystemManager
10
10
import asyncio
11
11
12
+ from typing import override
13
+
12
14
13
15
class FiberPool :
14
16
"""
@@ -32,65 +34,105 @@ def __init__(
32
34
self .sysman : LlmSystemManager = sysman
33
35
self .name : str = name
34
36
35
- # Name mangle to make outside access harder.
36
- self .__fiber_pool : list [sf .Fiber ] = []
37
- self .__workers : list [sf .Worker ] = []
37
+ self ._fiber_pool : list [sf .Fiber ] = []
38
+ self ._workers : list [sf .Worker ] = []
38
39
# Keep track of how many extra fibers were created
39
40
# during runtime if `resizable` is set to True.
40
- self .__extra_fibers : int = 0
41
- self .__index_queue = asyncio .Queue ()
41
+ self ._extra_fibers : int = 0
42
+ self ._index_queue = asyncio .Queue ()
43
+
44
+ self ._initialize_pool ()
42
45
43
- self .__initialize_pool ()
46
+ def resize (self ):
47
+ new_worker = self .sysman .ls .create_worker (
48
+ f"{ self .name } -new-worker-{ self ._extra_fibers } "
49
+ )
50
+ self ._workers .append (new_worker )
51
+ fiber = self .sysman .ls .create_fiber (new_worker )
52
+ self ._fiber_pool .append (fiber )
53
+ self ._extra_fibers += 1
54
+
55
+ return [self .size () - 1 , fiber ]
44
56
45
57
async def get (self ) -> tuple [int , sf .Fiber ]:
46
58
try :
47
- idx = self .__index_queue .get_nowait ()
59
+ idx = self ._index_queue .get_nowait ()
48
60
return (
49
61
idx ,
50
- self .__fiber_pool [idx ],
62
+ self ._fiber_pool [idx ],
51
63
)
52
64
except asyncio .QueueEmpty :
53
65
if self .resizable :
54
66
# Resize the fiber pool by adding a new fiber.
55
- devices = self .sysman .ls .devices
56
- num_devices = len (devices )
57
- new_worker = self .sysman .ls .create_worker (
58
- f"{ self .name } -new-worker-{ self .__extra_fibers } "
59
- )
60
- self .__workers .append (new_worker )
61
-
62
- fiber = self .sysman .ls .create_fiber (
63
- new_worker , devices = [devices [self .size () % num_devices ]]
64
- )
65
- self .__fiber_pool .append (fiber )
66
- self .__extra_fibers += 1
67
- return [self .size () - 1 , fiber ]
68
-
69
- available_index = await self .__index_queue .get ()
70
- return (available_index , self .__fiber_pool [available_index ])
67
+ return self .resize ()
68
+
69
+ available_index = await self ._index_queue .get ()
70
+ return (available_index , self ._fiber_pool [available_index ])
71
71
72
72
def pool (self ) -> list [sf .Fiber ]:
73
- return self .__fiber_pool
73
+ return self ._fiber_pool
74
74
75
- def __initialize_pool (self ):
76
- devices = self .sysman .ls .devices
77
- num_devices = len (devices )
75
+ def _initialize_pool (self ):
78
76
for idx in range (self .init_size ):
79
77
worker = self .sysman .ls .create_worker (f"{ self .name } -init-worker-{ idx } " )
80
- self .__workers .append (worker )
81
-
82
- fiber = self .sysman .ls .create_fiber (
83
- worker , devices = [devices [idx % num_devices ]]
84
- )
85
- self .__fiber_pool .append (fiber )
78
+ self ._workers .append (worker )
79
+ fiber = self .sysman .ls .create_fiber (worker )
80
+ self ._fiber_pool .append (fiber )
86
81
assert idx < self .size ()
87
- self .__index_queue .put_nowait (idx )
82
+ self ._index_queue .put_nowait (idx )
88
83
89
84
def return_fiber (self , indices : int | list [int ]):
90
85
if not isinstance (indices , list ):
91
86
indices = [indices ]
92
87
for idx in indices :
93
- self .__index_queue .put_nowait (idx )
88
+ self ._index_queue .put_nowait (idx )
94
89
95
90
def size (self ) -> int :
96
- return len (self .__fiber_pool )
91
+ return len (self ._fiber_pool )
92
+
93
+
94
+ class DisaggregatedFiberPool (FiberPool ):
95
+ def __init__ (
96
+ self ,
97
+ sysman : LlmSystemManager ,
98
+ init_size : int ,
99
+ resizable : bool = True ,
100
+ name : str = "default-disagg-fiber-pool" ,
101
+ ):
102
+ super ().__init__ (
103
+ sysman = sysman ,
104
+ init_size = init_size ,
105
+ resizable = resizable ,
106
+ name = name ,
107
+ )
108
+
109
+ @override
110
+ def resize (self ):
111
+ devices = self .sysman .ls .devices
112
+ num_devices = len (devices )
113
+ new_worker = self .sysman .ls .create_worker (
114
+ f"{ self .name } -new-worker-{ self ._extra_fibers } "
115
+ )
116
+ self ._workers .append (new_worker )
117
+
118
+ fiber = self .sysman .ls .create_fiber (
119
+ new_worker , devices = [devices [self .size () % num_devices ]]
120
+ )
121
+ self ._fiber_pool .append (fiber )
122
+ self ._extra_fibers += 1
123
+ return [self .size () - 1 , fiber ]
124
+
125
+ @override
126
+ def _initialize_pool (self ):
127
+ devices = self .sysman .ls .devices
128
+ num_devices = len (devices )
129
+ for idx in range (self .init_size ):
130
+ worker = self .sysman .ls .create_worker (f"{ self .name } -init-worker-{ idx } " )
131
+ self ._workers .append (worker )
132
+
133
+ fiber = self .sysman .ls .create_fiber (
134
+ worker , devices = [devices [idx % num_devices ]]
135
+ )
136
+ self ._fiber_pool .append (fiber )
137
+ assert idx < self .size ()
138
+ self ._index_queue .put_nowait (idx )
0 commit comments