Skip to content

Commit 503ca16

Browse files
authored
Waterfall Engine and Scaling Policy (#579)
* waterfall scaling controller for priority-based scaling Introduces an "advance" policy engine with a waterfall scaling strategy that cascades worker scaling across prioritized adapters. Priority-1 adapters fill first; overflow goes to priority-2, then priority-3. Shutdown is reversed. - Add workerAdapterID (Data) to WorkerAdapterHeartbeat protocol - Extend ScalingController interface with worker_adapter_snapshots parameter - WorkerAdapterController builds cross-adapter snapshots from heartbeat state - Add WaterfallScalingController, AdvancePolicy, and supporting types/factories Signed-off-by: gxu <georgexu420@163.com> * Replace WaterfallV1PolicyController with WaterfallV1Policy(ScalerPolicy) - Route waterfall through VanillaPolicyController like all other policies - Remove create_policy_controller; add WATERFALL_V1 case to create_policy - Remove default value for worker_manager_id in WorkerManagerHeartbeat.new_msg - Rename WaterfallRule.worker_manager_id to adapter_id_prefix for clarity Signed-off-by: gxu <georgexu420@163.com> * Align waterfall terminology with spec: worker_type, max_task_concurrency - WaterfallRule.adapter_id_prefix -> worker_type (matches Worker Manager ID spec) - WaterfallRule.max_workers -> max_task_concurrency - Config format: priority,worker_type,max_task_concurrency - Update docstrings and comments to use worker_type terminology Signed-off-by: gxu <georgexu420@163.com> * format Signed-off-by: gxu <georgexu420@163.com> * Move test helper functions to bottom of file Signed-off-by: gxu <georgexu420@163.com> * Move test helper functions to bottom of test_scaling.py Signed-off-by: gxu <georgexu420@163.com> * Bump version Signed-off-by: gxu <georgexu420@163.com> * Fix comment Signed-off-by: gxu <georgexu420@163.com> --------- Signed-off-by: gxu <georgexu420@163.com>
1 parent e4aa021 commit 503ca16

File tree

30 files changed

+1085
-88
lines changed

30 files changed

+1085
-88
lines changed

docs/source/tutorials/configuration.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,39 @@ You can override any value from the TOML file by providing it as a command-line
257257
scaler_cluster tcp://127.0.0.1:6378 --config example_config.toml --num-of-workers 12
258258
259259
The cluster will start with **12 workers**, but all other settings (like ``task_timeout_seconds``) will still be loaded from the ``[cluster]`` section of ``example_config.toml``.
260+
261+
262+
**Scenario 3: Waterfall Scaling Configuration**
263+
264+
To use the ``waterfall_v1`` policy engine for priority-based scaling across multiple worker adapters, set ``policy_engine_type = "waterfall_v1"`` and provide rules in ``policy_content`` (one rule per line, ``#`` comments supported):
265+
266+
**waterfall_config.toml**
267+
268+
.. code-block:: toml
269+
270+
[scheduler]
271+
object_storage_address = "tcp://127.0.0.1:6379"
272+
monitor_address = "tcp://127.0.0.1:6380"
273+
logging_level = "INFO"
274+
policy_engine_type = "waterfall_v1"
275+
policy_content = """
276+
# priority, adapter_id_prefix, max_workers
277+
1, NAT, 8
278+
2, ECS, 50
279+
"""
280+
281+
[native_worker_adapter]
282+
max_workers = 8
283+
284+
[ecs_worker_adapter]
285+
max_workers = 50
286+
287+
Then start the scheduler and worker adapters:
288+
289+
.. code-block:: bash
290+
291+
scaler_scheduler tcp://127.0.0.1:8516 --config waterfall_config.toml &
292+
scaler_worker_adapter_native tcp://127.0.0.1:8516 --config waterfall_config.toml &
293+
scaler_worker_adapter_ecs tcp://127.0.0.1:8516 --config waterfall_config.toml &
294+
295+
Local ``NAT`` workers will scale up first. When they reach capacity, ``ECS`` workers will begin scaling. On scale-down, ECS workers drain before local workers.

docs/source/tutorials/scaling.rst

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ Scaler provides several built-in scaling policies:
4545
- Capability-aware scaling. Scales worker groups based on task-required capabilities (e.g., GPU, memory).
4646
* - ``fixed_elastic``
4747
- Hybrid scaling using primary and secondary worker managers with configurable limits.
48+
* - ``waterfall_v1``
49+
- Priority-based cascading across multiple worker managers. Higher-priority managers fill first; overflow goes to lower-priority.
4850

4951

5052
No Scaling (``no``)
@@ -161,6 +163,32 @@ This is useful for scenarios where you have a fixed pool of dedicated resources
161163
* When scaling down, only secondary manager groups are shut down
162164

163165

166+
Waterfall Scaling (``waterfall_v1``)
167+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
168+
169+
The waterfall scaling policy cascades worker scaling across prioritized worker managers. Higher-priority managers fill first; when they reach capacity, overflow goes to the next priority tier. When scaling down, the lowest-priority managers drain first.
170+
171+
This is useful for hybrid deployments where you want to prefer cheaper or lower-latency resources (e.g., local bare-metal) and only burst to more expensive resources (e.g., cloud) when needed.
172+
173+
**Configuration:**
174+
175+
The waterfall policy uses ``policy_engine_type = "waterfall_v1"`` and a newline-separated rule format for ``policy_content``. Each rule is a comma-separated line with three fields: ``priority``, ``manager_id_prefix``, ``max_workers``. Lines starting with ``#`` are comments.
176+
177+
.. code:: toml
178+
179+
[scheduler]
180+
policy_engine_type = "waterfall_v1"
181+
policy_content = """
182+
# priority, manager_id_prefix, max_workers
183+
# Use local workers first (cheap, low latency)
184+
1, NAT, 8
185+
# Overflow to ECS when local capacity is exhausted
186+
2, ECS, 50
187+
"""
188+
189+
Rules reference worker manager ID prefixes. At runtime, each worker manager generates a full ID like ``NAT|<pid>``; the prefix ``NAT`` matches any manager whose ID starts with ``NAT``. Multiple managers can share the same prefix and are governed by the same rule.
190+
191+
164192
Worker Manager Protocol
165193
-----------------------
166194

src/scaler/protocol/capnp/message.capnp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ struct WorkerManagerHeartbeat {
8989
maxWorkerGroups @0 :UInt32;
9090
workersPerGroup @1 :UInt32;
9191
capabilities @2 :List(CommonType.TaskCapability);
92+
workerManagerID @3 :Data;
9293
}
9394

9495
struct WorkerManagerHeartbeatEcho {

src/scaler/protocol/python/message.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,13 @@ def workers_per_group(self) -> int:
365365
def capabilities(self) -> Dict[str, int]:
366366
return {capability.name: capability.value for capability in self._msg.capabilities}
367367

368+
@property
369+
def worker_manager_id(self) -> bytes:
370+
return self._msg.workerManagerID
371+
368372
@staticmethod
369373
def new_msg(
370-
max_worker_groups: int, workers_per_group: int, capabilities: Dict[str, int]
374+
max_worker_groups: int, workers_per_group: int, capabilities: Dict[str, int], worker_manager_id: bytes
371375
) -> "WorkerManagerHeartbeat":
372376
return WorkerManagerHeartbeat(
373377
_message.WorkerManagerHeartbeat(
@@ -376,6 +380,7 @@ def new_msg(
376380
capabilities=[
377381
TaskCapability.new_msg(name, value).get_message() for name, value in capabilities.items()
378382
],
383+
workerManagerID=worker_manager_id,
379384
)
380385
)
381386

src/scaler/scheduler/controllers/mixins.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919
WorkerManagerHeartbeat,
2020
)
2121
from scaler.protocol.python.status import ScalingManagerStatus
22-
from scaler.scheduler.controllers.policies.simple_policy.scaling.types import WorkerGroupCapabilities, WorkerGroupState
22+
from scaler.scheduler.controllers.policies.simple_policy.scaling.types import (
23+
WorkerGroupCapabilities,
24+
WorkerGroupState,
25+
WorkerManagerSnapshot,
26+
)
2327
from scaler.utility.identifiers import ClientID, ObjectID, TaskID, WorkerID
2428
from scaler.utility.mixins import Reporter
2529

@@ -255,6 +259,7 @@ def get_scaling_commands(
255259
worker_manager_heartbeat: WorkerManagerHeartbeat,
256260
worker_groups: WorkerGroupState,
257261
worker_group_capabilities: WorkerGroupCapabilities,
262+
worker_manager_snapshots: Dict[bytes, WorkerManagerSnapshot],
258263
) -> List[WorkerManagerCommand]:
259264
"""Pure function: state in, commands out. Commands are either all start or all shutdown, never mixed."""
260265
raise NotImplementedError()

src/scaler/scheduler/controllers/policies/library/utility.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,9 @@ def create_policy(policy_engine_type: str, policy_content: str) -> ScalerPolicy:
1010

1111
return SimplePolicy(policy_content)
1212

13+
if engine_type == PolicyEngineType.WATERFALL_V1:
14+
from scaler.scheduler.controllers.policies.waterfall_v1.waterfall_v1_policy import WaterfallV1Policy
15+
16+
return WaterfallV1Policy(policy_content)
17+
1318
raise ValueError(f"Unknown policy_engine_type: {policy_engine_type}")

src/scaler/scheduler/controllers/policies/mixins.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
from scaler.protocol.python.message import InformationSnapshot, Task, WorkerManagerCommand, WorkerManagerHeartbeat
55
from scaler.protocol.python.status import ScalingManagerStatus
6-
from scaler.scheduler.controllers.policies.simple_policy.scaling.types import WorkerGroupCapabilities, WorkerGroupState
6+
from scaler.scheduler.controllers.policies.simple_policy.scaling.types import (
7+
WorkerGroupCapabilities,
8+
WorkerGroupState,
9+
WorkerManagerSnapshot,
10+
)
711
from scaler.utility.identifiers import TaskID, WorkerID
812

913

@@ -51,6 +55,7 @@ def get_scaling_commands(
5155
worker_manager_heartbeat: WorkerManagerHeartbeat,
5256
worker_groups: WorkerGroupState,
5357
worker_group_capabilities: WorkerGroupCapabilities,
58+
worker_manager_snapshots: Dict[bytes, WorkerManagerSnapshot],
5459
) -> List[WorkerManagerCommand]:
5560
raise NotImplementedError()
5661

src/scaler/scheduler/controllers/policies/simple_policy/scaling/capability_scaling.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
WorkerGroupCapabilities,
1515
WorkerGroupID,
1616
WorkerGroupState,
17+
WorkerManagerSnapshot,
1718
)
1819
from scaler.utility.identifiers import WorkerID
1920

@@ -40,6 +41,7 @@ def get_scaling_commands(
4041
worker_manager_heartbeat: WorkerManagerHeartbeat,
4142
worker_groups: WorkerGroupState,
4243
worker_group_capabilities: WorkerGroupCapabilities,
44+
worker_manager_snapshots: Dict[bytes, WorkerManagerSnapshot],
4345
) -> List[WorkerManagerCommand]:
4446
# Derive worker_groups_by_capability from worker_groups + worker_group_capabilities
4547
worker_groups_by_capability = self._derive_worker_groups_by_capability(worker_groups, worker_group_capabilities)
@@ -176,7 +178,6 @@ def _get_shutdown_commands(
176178
if not worker_group_dict:
177179
continue
178180

179-
# Find tasks that these workers can handle
180181
task_count = 0
181182
for task_capability_keys, tasks in tasks_by_capability.items():
182183
if task_capability_keys <= capability_keys:
@@ -190,7 +191,6 @@ def _get_shutdown_commands(
190191

191192
task_ratio = task_count / worker_count
192193
if task_ratio < self._lower_task_ratio:
193-
# Find the worker group with the least queued tasks
194194
worker_group_task_counts: Dict[WorkerGroupID, int] = {}
195195
for worker_group_id, worker_ids in worker_group_dict.items():
196196
total_queued = sum(
@@ -203,17 +203,13 @@ def _get_shutdown_commands(
203203
if not worker_group_task_counts:
204204
continue
205205

206-
# Select the worker group with the fewest queued tasks to shut down
207206
least_busy_group_id = min(worker_group_task_counts, key=lambda gid: worker_group_task_counts[gid])
208207

209-
# Don't scale down if there are pending tasks and this would leave no capable workers
210208
workers_in_group = len(worker_group_dict.get(least_busy_group_id, []))
211209
remaining_worker_count = worker_count - workers_in_group
212210
if task_count > 0 and remaining_worker_count == 0:
213-
# This is the last worker group that can handle these tasks - don't shut it down
214211
continue
215212
if remaining_worker_count > 0 and (task_count / remaining_worker_count) > self._upper_task_ratio:
216-
# Shutting down this group would cause task ratio to exceed upper threshold and scale-up again
217213
continue
218214

219215
commands.append(

src/scaler/scheduler/controllers/policies/simple_policy/scaling/fixed_elastic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
WorkerGroupCapabilities,
1414
WorkerGroupID,
1515
WorkerGroupState,
16+
WorkerManagerSnapshot,
1617
)
1718

1819

@@ -42,6 +43,7 @@ def get_scaling_commands(
4243
worker_manager_heartbeat: WorkerManagerHeartbeat,
4344
worker_groups: WorkerGroupState,
4445
worker_group_capabilities: WorkerGroupCapabilities,
46+
worker_manager_snapshots: Dict[bytes, WorkerManagerSnapshot],
4547
) -> List[WorkerManagerCommand]:
4648
if not information_snapshot.workers:
4749
if information_snapshot.tasks:

src/scaler/scheduler/controllers/policies/simple_policy/scaling/mixins.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import abc
2-
from typing import List
2+
from typing import Dict, List
33

44
from scaler.protocol.python.message import InformationSnapshot, WorkerManagerCommand, WorkerManagerHeartbeat
55
from scaler.protocol.python.status import ScalingManagerStatus
6-
from scaler.scheduler.controllers.policies.simple_policy.scaling.types import WorkerGroupCapabilities, WorkerGroupState
6+
from scaler.scheduler.controllers.policies.simple_policy.scaling.types import (
7+
WorkerGroupCapabilities,
8+
WorkerGroupState,
9+
WorkerManagerSnapshot,
10+
)
711

812

913
class ScalingPolicy:
@@ -21,6 +25,7 @@ def get_scaling_commands(
2125
worker_manager_heartbeat: WorkerManagerHeartbeat,
2226
worker_groups: WorkerGroupState,
2327
worker_group_capabilities: WorkerGroupCapabilities,
28+
worker_manager_snapshots: Dict[bytes, WorkerManagerSnapshot],
2429
) -> List[WorkerManagerCommand]:
2530
"""
2631
Pure function: state in, commands out.

0 commit comments

Comments
 (0)