Skip to content

Commit 711e9e9

Browse files
Merge branch 'main' into orb
2 parents 90f608a + e7214ae commit 711e9e9

File tree

76 files changed

+2205
-1127
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+2205
-1127
lines changed

docs/source/tutorials/scaling.rst

Lines changed: 35 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ The scaling system consists of two main components:
1111
1. **Scaling Controller**: A policy that monitors task queues and worker availability to make scaling decisions.
1212
2. **Worker Adapter**: A component that handles the actual creation and destruction of worker groups (e.g., starting containers, launching processes).
1313

14+
The Scaling Controller runs within the Scheduler and communicates with Worker Adapters via Cap'n Proto messages. Worker Adapters connect to the Scheduler and receive scaling commands directly.
15+
1416
The scaling policy is configured via the ``policy_content`` setting in the scheduler configuration:
1517

1618
.. code:: bash
@@ -72,8 +74,7 @@ This policy is straightforward and works well for homogeneous workloads where al
7274
.. code:: bash
7375
7476
scaler_scheduler tcp://127.0.0.1:8516 \
75-
--policy-content "allocate=even_load; scaling=vanilla" \
76-
--adapter-webhook-urls "http://localhost:8080/webhook"
77+
--policy-content "allocate=even_load; scaling=vanilla"
7778
7879
7980
Capability Scaling (``capability``)
@@ -88,6 +89,7 @@ The capability scaling controller is designed for heterogeneous workloads where
8889
* Scales worker groups per capability set independently
8990
* Ensures tasks are matched to workers that can handle them
9091
* Prevents scaling down the last worker group capable of handling pending tasks
92+
* Prevents thrashing by checking if scale-down would immediately trigger scale-up
9193

9294
**How It Works:**
9395

@@ -107,8 +109,7 @@ The capability scaling controller is designed for heterogeneous workloads where
107109
.. code:: bash
108110
109111
scaler_scheduler tcp://127.0.0.1:8516 \
110-
--policy-content "allocate=capability; scaling=capability" \
111-
--adapter-webhook-urls "http://localhost:8080/webhook"
112+
--policy-content "allocate=capability; scaling=capability"
112113
113114
**Example Scenario:**
114115

@@ -138,127 +139,69 @@ With the capability scaling policy:
138139
3. Idle GPU workers can be shut down without affecting CPU task processing.
139140

140141

141-
**Worker Adapter Integration:**
142-
143-
The capability scaling controller communicates with the worker adapter via HTTP webhooks. When requesting a new worker group, it includes the required capabilities:
144-
145-
.. code:: json
146-
147-
{
148-
"action": "start_worker_group",
149-
"capabilities": {"gpu": 1}
150-
}
151-
152-
The worker adapter should provision workers with the requested capabilities and return:
153-
154-
.. code:: json
155-
156-
{
157-
"worker_group_id": "group-abc123",
158-
"worker_ids": ["worker-1", "worker-2"],
159-
"capabilities": {"gpu": 1}
160-
}
161-
162-
163142
Fixed Elastic Scaling (``fixed_elastic``)
164143
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
165144

166-
The fixed elastic scaling controller supports hybrid scaling with two worker adapters:
145+
The fixed elastic scaling controller supports hybrid scaling with multiple worker adapters:
167146

168-
* **Primary Adapter**: Limited number of worker groups (e.g., on-premise resources)
169-
* **Secondary Adapter**: Overflow capacity (e.g., cloud burst)
147+
* **Primary Adapter**: A single worker group (identified by ``max_worker_groups == 1``) that starts once and never shuts down
148+
* **Secondary Adapter**: Elastic capacity (``max_worker_groups > 1``) that scales based on demand
170149

171-
This is useful for scenarios where you have a fixed pool of dedicated resources but want to burst to cloud resources during peak demand.
150+
This is useful for scenarios where you have a fixed pool of dedicated resources but want to burst to additional resources during peak demand.
172151

173152
.. code:: bash
174153
175154
scaler_scheduler tcp://127.0.0.1:8516 \
176-
--policy-content "allocate=even_load; scaling=fixed_elastic" \
177-
--adapter-webhook-urls "http://localhost:8080/primary" "http://localhost:8081/secondary"
155+
--policy-content "allocate=even_load; scaling=fixed_elastic"
178156
179157
**Behavior:**
180158

181-
* New worker groups are created from the primary adapter until its limit is reached
182-
* Once primary is at capacity, new groups are created from the secondary adapter
183-
* When scaling down, secondary adapter groups are shut down first
159+
* The primary adapter's worker group is started once and never shut down
160+
* Secondary adapter groups are created when demand exceeds primary capacity
161+
* When scaling down, only secondary adapter groups are shut down
184162

185163

186164
Worker Adapter Protocol
187165
-----------------------
188166

189-
Scaling controllers communicate with worker adapters via HTTP POST requests to a webhook URL. The adapter must implement the following actions:
190-
191-
**Get Adapter Info:**
192-
193-
Request:
194-
195-
.. code:: json
196-
197-
{"action": "get_worker_adapter_info"}
198-
199-
Response:
200-
201-
.. code:: json
202-
203-
{
204-
"max_worker_groups": 10
205-
}
206-
207-
**Start Worker Group:**
208-
209-
Request:
210-
211-
.. code:: json
212-
213-
{
214-
"action": "start_worker_group",
215-
"capabilities": {"gpu": 1}
216-
}
217-
218-
Response (success - HTTP 200):
219-
220-
.. code:: json
221-
222-
{
223-
"worker_group_id": "group-abc123",
224-
"worker_ids": ["worker-1", "worker-2"],
225-
"capabilities": {"gpu": 1}
226-
}
227-
228-
Response (capacity exceeded - HTTP 429):
167+
Scaling controllers, running within the scheduler process, communicate with worker adapters using Cap'n Proto messages through the connection that worker adapters use to communicate with the scheduler. The protocol uses the following message types:
229168

230-
.. code:: json
169+
**WorkerAdapterHeartbeat (Adapter -> Scheduler):**
231170

232-
{"error": "Capacity exceeded"}
171+
Worker adapters periodically send heartbeats to the scheduler containing their capacity information:
233172

234-
**Shutdown Worker Group:**
173+
* ``max_worker_groups``: Maximum number of worker groups this adapter can manage
174+
* ``workers_per_group``: Number of workers in each group
175+
* ``capabilities``: Default capabilities for workers from this adapter
235176

236-
Request:
177+
**WorkerAdapterCommand (Scheduler -> Adapter):**
237178

238-
.. code:: json
179+
The scheduler sends commands to worker adapters:
239180

240-
{
241-
"action": "shutdown_worker_group",
242-
"worker_group_id": "group-abc123"
243-
}
181+
* ``StartWorkerGroup``: Request to start a new worker group
244182

245-
Response (success - HTTP 200):
183+
* ``worker_group_id``: Empty for new groups (adapter assigns ID)
184+
* ``capabilities``: Required capabilities for the worker group
246185

247-
.. code:: json
186+
* ``ShutdownWorkerGroup``: Request to shut down an existing worker group
248187

249-
{"status": "shutdown"}
188+
* ``worker_group_id``: ID of the group to shut down
250189

251-
Response (not found - HTTP 404):
190+
**WorkerAdapterCommandResponse (Adapter -> Scheduler):**
252191

253-
.. code:: json
192+
Worker adapters respond to commands with status and details:
254193

255-
{"error": "Worker group not found"}
194+
* ``worker_group_id``: ID of the affected worker group
195+
* ``command``: The command type this response is for
196+
* ``status``: Result status (``Success``, ``WorkerGroupTooMuch``, ``WorkerGroupIDNotFound``)
197+
* ``worker_ids``: List of worker IDs in the group (for start commands)
198+
* ``capabilities``: Actual capabilities of the started workers
256199

257200

258201
Example Worker Adapter
259202
----------------------
260203

261-
Here is an example of a simple worker adapter using the ECS (Amazon Elastic Container Service) integration:
204+
Here is an example of a worker adapter using the ECS (Amazon Elastic Container Service) integration:
262205

263206
.. literalinclude:: ../../../src/scaler/worker_adapter/ecs.py
264207
:language: python
@@ -276,3 +219,4 @@ Tips
276219

277220
3. **Monitor scaling events**: Use Scaler's monitoring tools (``scaler_top``) to observe scaling behavior and tune policies.
278221

222+
4. **Worker Adapter Placement**: Run worker adapters on machines that can provision the required resources (e.g., run the ECS adapter where it has AWS credentials, run the native adapter on the target machine).

src/cpp/scaler/utility/pymod/indexed_queue.cpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,8 @@ static PyObject* PyIndexedQueueRemove(PyIndexedQueue* self, PyObject* args)
7777
Py_RETURN_NONE;
7878
}
7979

80-
static int PyIndexedQueueContains(PyObject* self, PyObject* args)
80+
static int PyIndexedQueueContains(PyObject* self, PyObject* item)
8181
{
82-
PyObject* item {};
83-
if (!PyArg_Parse(args, "O", &item)) {
84-
return -1;
85-
}
86-
8782
return ((PyIndexedQueue*)self)->queue.contains(OwnedPyObject<>::fromBorrowed(item));
8883
}
8984

src/cpp/scaler/utility/pymod/one_to_many_dict.cpp

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -255,24 +255,13 @@ static PyObject* PyOneToManyDictItems(PyOneToManyDict* self, PyObject* args)
255255
return itemList.take();
256256
}
257257

258-
// called when using the 'in' operator (__contains__)
259-
static PyObject* PyOneToManyDictContains(PyOneToManyDict* self, PyObject* args)
258+
static int PyOneToManyDictContains(PyObject* self, PyObject* key)
260259
{
261-
PyObject* key {};
262-
if (!PyArg_ParseTuple(args, "O", &key)) {
263-
return nullptr; // Invalid arguments
264-
}
265-
266-
if (self->dict.hasKey(OwnedPyObject<>::fromBorrowed(key))) {
267-
Py_RETURN_TRUE;
268-
} else {
269-
Py_RETURN_FALSE;
270-
}
260+
return ((PyOneToManyDict*)self)->dict.hasKey(OwnedPyObject<>::fromBorrowed(key));
271261
}
272262

273263
// Define the methods for the OneToManyDict Python class
274264
static PyMethodDef PyOneToManyDictMethods[] = {
275-
{"__contains__", (PyCFunction)PyOneToManyDictContains, METH_VARARGS, "__contains__ method"},
276265
{"keys", (PyCFunction)PyOneToManyDictKeys, METH_VARARGS, "Get Keys from the dictionary"},
277266
{"values", (PyCFunction)PyOneToManyDictValues, METH_VARARGS, "Get Values from the dictionary"},
278267
{"items", (PyCFunction)PyOneToManyDictItems, METH_VARARGS, "Get Items from the dictionary"},
@@ -310,6 +299,7 @@ static PyType_Slot PyOneToManyDictSlots[] = {
310299
{Py_tp_new, (void*)PyOneToManyDictNew},
311300
{Py_tp_methods, PyOneToManyDictMethods},
312301
{Py_tp_iter, (void*)PyOneToManyDictIteratorIter},
302+
{Py_sq_contains, (void*)PyOneToManyDictContains},
313303
{0, nullptr},
314304
};
315305

src/cpp/scaler/utility/pymod/stable_priority_queue.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,18 @@ static Py_ssize_t PyStablePriorityQueueSize(PyObject* self)
132132
return ((PyStablePriorityQueue*)self)->queue.size();
133133
}
134134

135+
static int PyStablePriorityQueueContains(PyObject* self, PyObject* item)
136+
{
137+
return ((PyStablePriorityQueue*)self)->queue._locator.count(OwnedPyObject<>::fromBorrowed(item)) > 0;
138+
}
139+
135140
static PyType_Slot PyStablePriorityQueueSlots[] = {
136141
{Py_tp_new, (void*)PyStablePriorityQueueNew},
137142
{Py_tp_init, (void*)PyStablePriorityQueueInit},
138143
{Py_tp_dealloc, (void*)PyStablePriorityQueueDealloc},
139144
{Py_tp_methods, (void*)PyStablePriorityQueueMethods},
140145
{Py_sq_length, (void*)PyStablePriorityQueueSize},
146+
{Py_sq_contains, (void*)PyStablePriorityQueueContains},
141147
{Py_tp_doc, (void*)"StablePriorityQueue"},
142148
{0, nullptr},
143149
};
Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,17 @@
11
add_library(uv_ymq_objs OBJECT
2-
accept_server.h
3-
accept_server.cpp
4-
52
address.h
63
address.cpp
74

85
binder_socket.h
96
binder_socket.cpp
107

11-
connect_client.h
12-
connect_client.cpp
13-
148
connector_socket.h
159
connector_socket.cpp
1610

17-
event_loop_thread.h
18-
event_loop_thread.cpp
19-
2011
io_context.h
2112
io_context.cpp
22-
23-
message_connection.h
24-
message_connection.cpp
2513
)
14+
15+
add_subdirectory(internal)
16+
add_subdirectory(future)
17+
add_subdirectory(sync)

src/cpp/scaler/uv_ymq/address.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "scaler/uv_ymq/address.h"
22

33
#include <cassert>
4+
#include <utility>
45

56
namespace scaler {
67
namespace uv_ymq {
@@ -72,21 +73,35 @@ const scaler::wrapper::uv::SocketAddress& Address::asTCP() const noexcept
7273

7374
const std::string& Address::asIPC() const noexcept
7475
{
75-
assert(type() == Type::TCP);
76+
assert(type() == Type::IPC);
7677
return std::get<std::string>(_value);
7778
}
7879

79-
std::expected<Address, scaler::ymq::Error> Address::fromString(const std::string& address) noexcept
80+
std::expected<std::string, scaler::ymq::Error> Address::toString() const noexcept
8081
{
81-
static constexpr std::string_view tcpPrefix = "tcp://";
82-
static constexpr std::string_view ipcPrefix = "ipc://";
82+
switch (type()) {
83+
case Type::TCP: {
84+
auto tcpAddrStr = asTCP().toString();
85+
if (!tcpAddrStr.has_value()) {
86+
return std::unexpected {scaler::ymq::Error {
87+
scaler::ymq::Error::ErrorCode::InvalidAddressFormat, "Failed to convert TCP address to string"}};
88+
}
89+
90+
return std::string(_tcpPrefix) + tcpAddrStr.value();
91+
}
92+
case Type::IPC: return std::string(_ipcPrefix) + asIPC();
93+
default: std::unreachable();
94+
};
95+
}
8396

84-
if (address.starts_with(tcpPrefix)) {
85-
return details::fromTCPString(address.substr(tcpPrefix.size()));
97+
std::expected<Address, scaler::ymq::Error> Address::fromString(const std::string& address) noexcept
98+
{
99+
if (address.starts_with(_tcpPrefix)) {
100+
return details::fromTCPString(address.substr(_tcpPrefix.size()));
86101
}
87102

88-
if (address.starts_with(ipcPrefix)) {
89-
return Address(address.substr(ipcPrefix.size()));
103+
if (address.starts_with(_ipcPrefix)) {
104+
return Address(address.substr(_ipcPrefix.size()));
90105
}
91106

92107
return std::unexpected {scaler::ymq::Error {

src/cpp/scaler/uv_ymq/address.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class Address {
3434

3535
const std::string& asIPC() const noexcept;
3636

37+
std::expected<std::string, scaler::ymq::Error> toString() const noexcept;
38+
3739
// Try to parse a string to an Address instance.
3840
//
3941
// Example of string values are:
@@ -45,6 +47,9 @@ class Address {
4547
static std::expected<Address, scaler::ymq::Error> fromString(const std::string& address) noexcept;
4648

4749
private:
50+
static constexpr std::string_view _tcpPrefix = "tcp://";
51+
static constexpr std::string_view _ipcPrefix = "ipc://";
52+
4853
std::variant<scaler::wrapper::uv::SocketAddress, std::string> _value;
4954
};
5055

0 commit comments

Comments
 (0)