Skip to content

Commit b4ebe01

Browse files
committed
Remove Cluster class, replace with FixedNativeWorkerAdapter
Cluster was a multiprocessing.Process subclass that wrapped FixedNativeWorkerAdapter in a subprocess with an asyncio event loop solely to handle signals. This intermediate process layer is removed; workers are now direct children of SchedulerClusterCombo. - Delete Cluster, ClusterConfig, and the cluster entry point module - Redirect scaler_cluster and run_cluster.py to the fixed native worker adapter entry point (with --num-of-workers alias for compat) - Add SIGINT/SIGTERM handling to the fixed native adapter entry point - Update SchedulerClusterCombo, tests, and examples to use FixedNativeWorkerAdapter directly - Update ECS adapter to use scaler_cluster with updated parameters (--max-workers replaces --num-of-workers/--worker-names; worker IDs are no longer pre-announced since workers self-assign UUIDs)
1 parent f199192 commit b4ebe01

File tree

15 files changed

+191
-361
lines changed

15 files changed

+191
-361
lines changed

examples/task_capabilities.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66

77
import math
88

9-
from scaler import Client, Cluster
9+
from scaler import Client
1010
from scaler.cluster.combo import SchedulerClusterCombo
1111
from scaler.config.common.logging import LoggingConfig
1212
from scaler.config.common.worker import WorkerConfig
13-
from scaler.config.section.cluster import ClusterConfig
13+
from scaler.config.common.worker_adapter import WorkerAdapterConfig
14+
from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig
1415
from scaler.config.section.scheduler import PolicyConfig
15-
from scaler.config.types.worker import WorkerCapabilities, WorkerNames
16+
from scaler.config.types.worker import WorkerCapabilities
17+
from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerAdapter
1618

1719

1820
def gpu_task(x: float) -> float:
@@ -36,34 +38,35 @@ def main():
3638
)
3739

3840
# Adds an additional worker with GPU support
39-
base_cluster = cluster._cluster
40-
regular_cluster = Cluster(
41-
config=ClusterConfig(
42-
scheduler_address=base_cluster._address,
43-
object_storage_address=None,
41+
base_adapter = cluster._worker_adapter
42+
gpu_adapter = FixedNativeWorkerAdapter(
43+
FixedNativeWorkerAdapterConfig(
44+
worker_adapter_config=WorkerAdapterConfig(
45+
scheduler_address=base_adapter._address,
46+
object_storage_address=None,
47+
max_workers=1,
48+
),
4449
preload=None,
45-
worker_names=WorkerNames(["gpu_worker"]),
46-
num_of_workers=1,
47-
event_loop=base_cluster._event_loop,
50+
event_loop=base_adapter._event_loop,
4851
worker_io_threads=1,
4952
worker_config=WorkerConfig(
5053
per_worker_capabilities=WorkerCapabilities({"gpu": -1}),
51-
per_worker_task_queue_size=base_cluster._per_worker_task_queue_size,
52-
heartbeat_interval_seconds=base_cluster._heartbeat_interval_seconds,
53-
task_timeout_seconds=base_cluster._task_timeout_seconds,
54-
death_timeout_seconds=base_cluster._death_timeout_seconds,
55-
garbage_collect_interval_seconds=base_cluster._garbage_collect_interval_seconds,
56-
trim_memory_threshold_bytes=base_cluster._trim_memory_threshold_bytes,
57-
hard_processor_suspend=base_cluster._hard_processor_suspend,
54+
per_worker_task_queue_size=base_adapter._task_queue_size,
55+
heartbeat_interval_seconds=base_adapter._heartbeat_interval_seconds,
56+
task_timeout_seconds=base_adapter._task_timeout_seconds,
57+
death_timeout_seconds=base_adapter._death_timeout_seconds,
58+
garbage_collect_interval_seconds=base_adapter._garbage_collect_interval_seconds,
59+
trim_memory_threshold_bytes=base_adapter._trim_memory_threshold_bytes,
60+
hard_processor_suspend=base_adapter._hard_processor_suspend,
5861
),
5962
logging_config=LoggingConfig(
60-
paths=base_cluster._logging_paths,
61-
level=base_cluster._logging_level,
62-
config_file=base_cluster._logging_config_file,
63+
paths=base_adapter._logging_paths,
64+
level=base_adapter._logging_level,
65+
config_file=base_adapter._logging_config_file,
6366
),
6467
)
6568
)
66-
regular_cluster.start()
69+
gpu_adapter.start()
6770

6871
with Client(address=cluster.get_address()) as client:
6972
print("Submitting tasks...")
@@ -82,6 +85,7 @@ def main():
8285
gpu_future.result()
8386
cpu_future.result()
8487

88+
gpu_adapter.shutdown()
8589
cluster.shutdown()
8690

8791

src/run_cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from scaler.entry_points.cluster import main
1+
from scaler.entry_points.worker_manager_baremetal_fixed_native import main
22

33
if __name__ == "__main__":
44
main()

src/scaler/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
from .about import __version__
22
from .client.client import Client, ScalerFuture
33
from .client.serializer.mixins import Serializer
4-
from .cluster.cluster import Cluster
54
from .cluster.combo import SchedulerClusterCombo
65
from .cluster.scheduler import Scheduler
76

87
assert isinstance(__version__, str)
98
assert isinstance(Client, type)
109
assert isinstance(ScalerFuture, type)
1110
assert isinstance(Scheduler, type)
12-
assert isinstance(Cluster, type)
1311
assert isinstance(SchedulerClusterCombo, type)
1412
assert isinstance(Serializer, type)

src/scaler/cluster/cluster.py

Lines changed: 0 additions & 117 deletions
This file was deleted.

src/scaler/cluster/combo.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import logging
2-
import socket
32
from typing import Dict, Optional, Tuple
43

5-
from scaler.cluster.cluster import Cluster
64
from scaler.cluster.object_storage_server import ObjectStorageServerProcess
75
from scaler.cluster.scheduler import SchedulerProcess
86
from scaler.config.common.logging import LoggingConfig
97
from scaler.config.common.worker import WorkerConfig
8+
from scaler.config.common.worker_adapter import WorkerAdapterConfig
109
from scaler.config.defaults import (
1110
DEFAULT_CLIENT_TIMEOUT_SECONDS,
1211
DEFAULT_GARBAGE_COLLECT_INTERVAL_SECONDS,
@@ -25,12 +24,13 @@
2524
DEFAULT_WORKER_DEATH_TIMEOUT,
2625
DEFAULT_WORKER_TIMEOUT_SECONDS,
2726
)
28-
from scaler.config.section.cluster import ClusterConfig
27+
from scaler.config.section.fixed_native_worker_adapter import FixedNativeWorkerAdapterConfig
2928
from scaler.config.section.scheduler import PolicyConfig
3029
from scaler.config.types.object_storage_server import ObjectStorageAddressConfig
31-
from scaler.config.types.worker import WorkerCapabilities, WorkerNames
30+
from scaler.config.types.worker import WorkerCapabilities
3231
from scaler.config.types.zmq import ZMQConfig
3332
from scaler.utility.network_util import get_available_tcp_port
33+
from scaler.worker_manager_adapter.baremetal.fixed_native import FixedNativeWorkerAdapter
3434

3535

3636
class SchedulerClusterCombo:
@@ -89,13 +89,14 @@ def __init__(
8989
self._object_storage.start()
9090
self._object_storage.wait_until_ready() # object storage should be ready before starting the cluster
9191

92-
self._cluster = Cluster(
93-
config=ClusterConfig(
94-
scheduler_address=self._address,
95-
object_storage_address=self._object_storage_address,
92+
self._worker_adapter = FixedNativeWorkerAdapter(
93+
FixedNativeWorkerAdapterConfig(
94+
worker_adapter_config=WorkerAdapterConfig(
95+
scheduler_address=self._address,
96+
object_storage_address=self._object_storage_address,
97+
max_workers=n_workers,
98+
),
9699
preload=None,
97-
worker_names=WorkerNames([f"{socket.gethostname().split('.')[0]}" for _ in range(n_workers)]),
98-
num_of_workers=n_workers,
99100
event_loop=event_loop,
100101
worker_io_threads=worker_io_threads,
101102
worker_config=WorkerConfig(
@@ -131,7 +132,7 @@ def __init__(
131132
policy=scaler_policy,
132133
)
133134

134-
self._cluster.start()
135+
self._worker_adapter.start()
135136
self._scheduler.start()
136137
logging.info(f"{self.__get_prefix()} started")
137138

@@ -143,9 +144,8 @@ def shutdown(self):
143144
self._shutdown_called = True
144145

145146
logging.info(f"{self.__get_prefix()} shutdown")
146-
self._cluster.terminate()
147+
self._worker_adapter.shutdown()
147148
self._scheduler.terminate()
148-
self._cluster.join()
149149
self._scheduler.join()
150150

151151
# object storage should terminate after the cluster and scheduler.

src/scaler/config/section/cluster.py

Lines changed: 0 additions & 66 deletions
This file was deleted.

src/scaler/config/section/fixed_native_worker_adapter.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import argparse
12
import dataclasses
23
from typing import Optional
34

@@ -25,6 +26,11 @@ class FixedNativeWorkerAdapterConfig(ConfigClass):
2526
metadata=dict(short="-wit", help="set the number of io threads for io backend per worker"),
2627
)
2728

29+
@classmethod
30+
def configure_parser(cls, parser) -> None:
31+
super().configure_parser(parser)
32+
parser.add_argument("--num-of-workers", dest="max_workers", type=int, help=argparse.SUPPRESS)
33+
2834
def __post_init__(self) -> None:
2935
if self.worker_io_threads <= 0:
3036
raise ValueError("worker_io_threads must be a positive integer.")

src/scaler/entry_points/cluster.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
from scaler.cluster.cluster import Cluster
2-
from scaler.config.section.cluster import ClusterConfig
3-
from scaler.utility.event_loop import register_event_loop
1+
from scaler.entry_points.worker_manager_baremetal_fixed_native import main
42

5-
6-
def main():
7-
cluster_config = ClusterConfig.parse("Scaler Standalone Compute Cluster", "cluster")
8-
register_event_loop(cluster_config.event_loop)
9-
cluster = Cluster(cluster_config)
10-
cluster.run()
3+
__all__ = ["main"]

0 commit comments

Comments
 (0)