Skip to content

Commit 728ea31

Browse files
committed
Fix mypy errors, restore web_config, and update WorkerDisconnectNotification handling
- Restore missing web_config in NativeWorkerAdapterConfig - Fix type incompatibility in Cluster for per_worker_capabilities - Ensure async worker group operations are properly awaited in scaling policies and adapters - Update protocol documentation and implementation for WorkerDisconnectNotification (WDN) - Reformat modified files with black and isort
1 parent 0ac3fb6 commit 728ea31

File tree

6 files changed

+20
-20
lines changed

6 files changed

+20
-20
lines changed

src/scaler/config/section/native_worker_adapter.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
@dataclasses.dataclass
1313
class NativeWorkerAdapterConfig(ConfigClass):
14-
web_config: WebConfig
1514
worker_adapter_config: WorkerAdapterConfig
1615
worker_config: WorkerConfig = dataclasses.field(default_factory=WorkerConfig)
1716
logging_config: LoggingConfig = dataclasses.field(default_factory=LoggingConfig)

src/scaler/protocol/worker.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,11 +227,11 @@ consider the worker dead.
227227

228228
When a `DisconnectRequest` message is sent, the worker should disconnect from the scheduler.
229229

230-
### WorkerDisconnectNotification `WD` (optional)
230+
### WorkerDisconnectNotification `WDN` (optional)
231231

232232
| message_type | worker |
233233
|:------------:|:-------:|
234-
| b"WD" | X bytes |
234+
| b"WDN" | X bytes |
235235

236236
* worker: Worker ID
237237

src/scaler/scheduler/controllers/mixins.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
TaskCancel,
1414
TaskCancelConfirm,
1515
TaskResult,
16+
WorkerDisconnectNotification,
1617
WorkerHeartbeat,
1718
)
1819
from scaler.utility.identifiers import ClientID, ObjectID, TaskID, WorkerID
@@ -175,6 +176,10 @@ async def on_client_shutdown(self, client_id: ClientID):
175176
async def on_disconnect(self, worker_id: WorkerID, request: DisconnectRequest):
176177
raise NotImplementedError()
177178

179+
@abc.abstractmethod
180+
async def on_disconnect_notification(self, worker_id: WorkerID, notification: WorkerDisconnectNotification):
181+
raise NotImplementedError()
182+
178183
@abc.abstractmethod
179184
def has_available_worker(self) -> bool:
180185
raise NotImplementedError()

src/scaler/scheduler/controllers/worker_controller.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
StateWorker,
1212
Task,
1313
TaskCancel,
14+
WorkerDisconnectNotification,
1415
WorkerHeartbeat,
1516
WorkerHeartbeatEcho,
1617
)
@@ -80,6 +81,9 @@ async def on_disconnect(self, worker_id: WorkerID, request: DisconnectRequest):
8081
await self.__disconnect_worker(request.worker)
8182
await self._binder.send(worker_id, DisconnectResponse.new_msg(request.worker))
8283

84+
async def on_disconnect_notification(self, worker_id: WorkerID, notification: WorkerDisconnectNotification):
85+
await self.__disconnect_worker(notification.worker)
86+
8387
async def routine(self):
8488
await self.__clean_workers()
8589

src/scaler/scheduler/scheduler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
TaskCancelConfirm,
2525
TaskLog,
2626
TaskResult,
27+
WorkerDisconnectNotification,
2728
WorkerHeartbeat,
2829
)
2930
from scaler.protocol.python.mixins import Message
@@ -205,6 +206,10 @@ async def on_receive_message(self, source: bytes, message: Message):
205206
await self._worker_controller.on_disconnect(WorkerID(source), message)
206207
return
207208

209+
if isinstance(message, WorkerDisconnectNotification):
210+
await self._worker_controller.on_disconnect_notification(WorkerID(source), message)
211+
return
212+
208213
# =====================================================================================
209214
# object manager
210215
if isinstance(message, ObjectInstruction):

src/scaler/worker/worker.py

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,22 @@
1010
import zmq.asyncio
1111

1212
from scaler.config.defaults import PROFILING_INTERVAL_SECONDS
13-
from scaler.config.types.network_backend import NetworkBackend
1413
from scaler.config.types.object_storage_server import ObjectStorageAddressConfig
1514
from scaler.config.types.zmq import ZMQConfig, ZMQType
1615
from scaler.io.async_binder import ZMQAsyncBinder
1716
from scaler.io.mixins import AsyncBinder, AsyncConnector, AsyncObjectStorageConnector
18-
from scaler.io.utility import (
19-
create_async_connector,
20-
create_async_object_storage_connector,
21-
get_scaler_network_backend_from_env,
22-
)
17+
from scaler.io.utility import create_async_connector, create_async_object_storage_connector
2318
from scaler.io.ymq import ymq
2419
from scaler.protocol.python.message import (
2520
ClientDisconnect,
26-
DisconnectRequest,
2721
DisconnectResponse,
2822
ObjectInstruction,
2923
ProcessorInitialized,
3024
Task,
3125
TaskCancel,
3226
TaskLog,
3327
TaskResult,
28+
WorkerDisconnectNotification,
3429
WorkerHeartbeatEcho,
3530
)
3631
from scaler.protocol.python.mixins import Message
@@ -260,8 +255,7 @@ async def __get_loops(self):
260255
except Exception as e:
261256
logging.exception(f"{self.identity!r}: failed with unhandled exception:\n{e}")
262257

263-
if get_scaler_network_backend_from_env() == NetworkBackend.tcp_zmq:
264-
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
258+
await self._connector_external.send(WorkerDisconnectNotification.new_msg(self.identity))
265259

266260
self._connector_external.destroy()
267261
self._processor_manager.destroy("quit")
@@ -275,14 +269,7 @@ def __run_forever(self):
275269
self._loop.run_until_complete(self._task)
276270

277271
def __register_signal(self):
278-
backend = get_scaler_network_backend_from_env()
279-
if backend == NetworkBackend.tcp_zmq:
280-
self._loop.add_signal_handler(signal.SIGINT, self.__destroy)
281-
elif backend == NetworkBackend.ymq:
282-
self._loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(self.__graceful_shutdown()))
283-
284-
async def __graceful_shutdown(self):
285-
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
272+
self._loop.add_signal_handler(signal.SIGINT, self.__destroy)
286273

287274
def __destroy(self):
288275
self._task.cancel()

0 commit comments

Comments
 (0)