Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/scaler/protocol/capnp/message.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ struct DisconnectResponse {
worker @0 :Data;
}

struct WorkerDisconnectNotification {
worker @0 :Data;
}

struct ClientDisconnect {
disconnectType @0 :DisconnectType;

Expand Down Expand Up @@ -214,5 +218,6 @@ struct Message {

informationRequest @23 :InformationRequest;
informationResponse @24 :InformationResponse;
workerDisconnectNotification @25 :WorkerDisconnectNotification;
}
}
14 changes: 14 additions & 0 deletions src/scaler/protocol/python/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,19 @@ def new_msg(worker: WorkerID) -> "DisconnectResponse":
return DisconnectResponse(_message.DisconnectResponse(worker=bytes(worker)))


class WorkerDisconnectNotification(Message):
def __init__(self, msg):
super().__init__(msg)

@property
def worker(self) -> WorkerID:
return WorkerID(self._msg.worker)

@staticmethod
def new_msg(worker: WorkerID) -> "WorkerDisconnectNotification":
return WorkerDisconnectNotification(_message.WorkerDisconnectNotification(worker=bytes(worker)))


class ClientDisconnect(Message):
class DisconnectType(enum.Enum):
Disconnect = _message.ClientDisconnect.DisconnectType.disconnect
Expand Down Expand Up @@ -735,6 +748,7 @@ def workers(self) -> Dict[WorkerID, WorkerHeartbeat]:
"workerHeartbeatEcho": WorkerHeartbeatEcho,
"disconnectRequest": DisconnectRequest,
"disconnectResponse": DisconnectResponse,
"workerDisconnectNotification": WorkerDisconnectNotification,
"stateClient": StateClient,
"stateObject": StateObject,
"stateBalanceAdvice": StateBalanceAdvice,
Expand Down
5 changes: 5 additions & 0 deletions src/scaler/scheduler/controllers/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TaskCancel,
TaskCancelConfirm,
TaskResult,
WorkerDisconnectNotification,
WorkerHeartbeat,
)
from scaler.utility.identifiers import ClientID, ObjectID, TaskID, WorkerID
Expand Down Expand Up @@ -175,6 +176,10 @@ async def on_client_shutdown(self, client_id: ClientID):
async def on_disconnect(self, worker_id: WorkerID, request: DisconnectRequest):
raise NotImplementedError()

@abc.abstractmethod
async def on_disconnect_notification(self, worker_id: WorkerID, notification: WorkerDisconnectNotification):
raise NotImplementedError()

@abc.abstractmethod
def has_available_worker(self) -> bool:
raise NotImplementedError()
Expand Down
4 changes: 4 additions & 0 deletions src/scaler/scheduler/controllers/worker_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
StateWorker,
Task,
TaskCancel,
WorkerDisconnectNotification,
WorkerHeartbeat,
WorkerHeartbeatEcho,
)
Expand Down Expand Up @@ -80,6 +81,9 @@ async def on_disconnect(self, worker_id: WorkerID, request: DisconnectRequest):
await self.__disconnect_worker(request.worker)
await self._binder.send(worker_id, DisconnectResponse.new_msg(request.worker))

async def on_disconnect_notification(self, worker_id: WorkerID, notification: WorkerDisconnectNotification):
await self.__disconnect_worker(notification.worker)

async def routine(self):
await self.__clean_workers()

Expand Down
5 changes: 5 additions & 0 deletions src/scaler/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TaskCancelConfirm,
TaskLog,
TaskResult,
WorkerDisconnectNotification,
WorkerHeartbeat,
)
from scaler.protocol.python.mixins import Message
Expand Down Expand Up @@ -205,6 +206,10 @@ async def on_receive_message(self, source: bytes, message: Message):
await self._worker_controller.on_disconnect(WorkerID(source), message)
return

if isinstance(message, WorkerDisconnectNotification):
await self._worker_controller.on_disconnect_notification(WorkerID(source), message)
return

# =====================================================================================
# object manager
if isinstance(message, ObjectInstruction):
Expand Down
33 changes: 14 additions & 19 deletions src/scaler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,22 @@
import zmq.asyncio

from scaler.config.defaults import PROFILING_INTERVAL_SECONDS
from scaler.config.types.network_backend import NetworkBackend
from scaler.config.types.object_storage_server import ObjectStorageAddressConfig
from scaler.config.types.zmq import ZMQConfig, ZMQType
from scaler.io.async_binder import ZMQAsyncBinder
from scaler.io.mixins import AsyncBinder, AsyncConnector, AsyncObjectStorageConnector
from scaler.io.utility import (
create_async_connector,
create_async_object_storage_connector,
get_scaler_network_backend_from_env,
)
from scaler.io.utility import create_async_connector, create_async_object_storage_connector
from scaler.io.ymq import ymq
from scaler.protocol.python.message import (
ClientDisconnect,
DisconnectRequest,
DisconnectResponse,
ObjectInstruction,
ProcessorInitialized,
Task,
TaskCancel,
TaskLog,
TaskResult,
WorkerDisconnectNotification,
WorkerHeartbeatEcho,
)
from scaler.protocol.python.mixins import Message
Expand Down Expand Up @@ -269,8 +264,16 @@ async def __get_loops(self):
except Exception as e:
logging.exception(f"{self.identity!r}: failed with unhandled exception:\n{e}")

if get_scaler_network_backend_from_env() == NetworkBackend.tcp_zmq:
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
try:
await self._connector_external.send(WorkerDisconnectNotification.new_msg(self.identity))
except ymq.YMQException as e:

# this means that the scheduler shut down before we could send our notification
# we don't consider this to be an error
if e.code == ymq.ErrorCode.ConnectorSocketClosedByRemoteEnd:
pass
else:
raise

self._connector_external.destroy()
self._processor_manager.destroy("quit")
Expand All @@ -281,16 +284,8 @@ async def __get_loops(self):
logging.info(f"{self.identity!r}: quit")

def __register_signal(self):
backend = get_scaler_network_backend_from_env()
if backend == NetworkBackend.tcp_zmq:
self._loop.add_signal_handler(signal.SIGINT, self.__destroy)
self._loop.add_signal_handler(signal.SIGTERM, self.__destroy)
elif backend == NetworkBackend.ymq:
self._loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(self.__graceful_shutdown()))
self._loop.add_signal_handler(signal.SIGTERM, lambda: asyncio.ensure_future(self.__graceful_shutdown()))

async def __graceful_shutdown(self):
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not sending WorkerDisconnectNotification when receiving signals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it back to sending when the worker exits as it's just a notification now and no response is expected. For YMQ I found that sometimes it gets a ConnectorSocketClosedByRemoteEnd exception, so I've just added a check for that, but for some reasons it seems to hang sometimes (difficult to reproduce, seems kind of random). One time I also got a segmentation fault during the test_graph_fail test. But I'm not sure if these are related to my changes?

Copy link
Contributor

@gxuu gxuu Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot reproduce the hang and the SegFault. Please try to reproduce on main branch on your machine and give at least some traces.

for some reasons it seems to hang sometimes

Would be nice to have test cases it hangs on.

But I'm not sure if these are related to my changes?

Likely not, the SegFault is definitely not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like test_graph_fail and test_graph_error cause it to hang or segfault, I've seen both, but it doesn't always happen, sometimes there's no error

Copy link
Contributor Author

@magniloquency magniloquency Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw test_send_object hang. The hangs usually happen when exiting / quitting

Copy link
Contributor Author

@magniloquency magniloquency Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have attached gdb to the frozen process, here's the 10 most recent stack frames:

#0  0x00007fe6c1f1872d in syscall () from /usr/lib/libc.so.6
#1  0x00007fe6c0edf000 in std::__atomic_futex_unsigned_base::_M_futex_wait_until (this=<optimized out>, __addr=0x55f483ca0b60, __val=2147483648, __has_timeout=<optimized out>, __s=...,
    __ns=...) at /usr/src/debug/gcc/gcc/libstdc++-v3/src/c++11/futex.cc:122
#2  0x00007fe6bfd42b49 in std::__atomic_futex_unsigned<2147483648u>::_M_load_and_test_until (this=0x55f483ca0b60, __assumed=0, __operand=1, __equal=true,
    __mo=std::memory_order::acquire, __has_timeout=false, __s=std::chrono::duration = { 0s }, __ns=std::chrono::duration = { 0ns }) at /usr/include/c++/15.2.1/bits/atomic_futex.h:111
#3  0x00007fe6bfd41636 in std::__atomic_futex_unsigned<2147483648u>::_M_load_and_test (this=0x55f483ca0b60, __assumed=0, __operand=1, __equal=true, __mo=std::memory_order::acquire)
    at /usr/include/c++/15.2.1/bits/atomic_futex.h:160
#4  0x00007fe6bfd3f2a0 in std::__atomic_futex_unsigned<2147483648u>::_M_load_when_equal (this=0x55f483ca0b60, __val=1, __mo=std::memory_order::acquire)
    at /usr/include/c++/15.2.1/bits/atomic_futex.h:218
#5  std::__future_base::_State_baseV2::wait (this=0x55f483ca0b50) at /usr/include/c++/15.2.1/future:362
#6  0x00007fe6bfd41512 in std::__basic_future<void>::wait (this=0x7ffcabc784d0) at /usr/include/c++/15.2.1/future:725
#7  0x00007fe6bfd3de02 in scaler::ymq::IOContext::removeIOSocket (this=0x55f47e3d9630, socket=std::shared_ptr<scaler::ymq::IOSocket> (empty) = {...})
    at /home/xxx/work/opengris-scaler/src/cpp/scaler/ymq/io_context.cpp:94
#8  0x00007fe6bfce63a1 in PyIOSocket_dealloc (self=0x7fe6abe72f40) at /home/xxx/work/opengris-scaler/src/cpp/scaler/ymq/pymod_ymq/io_socket.h:40
#9  0x00007fe6c22fb85d in _Py_DECREF (op=<optimized out>) at ./Include/object.h:500
#10 _Py_XDECREF (op=<optimized out>) at ./Include/object.h:567

It seems like IOContext::removeIOSocket() is hanging for some reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe #445 is related? total guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe #445 is related? total guess

Likely not. In fact that issue addressed a possible cause of hanging.

Inspecting the log, I see

#7 0x00007fe6bfd3de02 in scaler::ymq::IOContext::removeIOSocket (this=0x55f47e3d9630, socket=std::shared_ptrscaler::ymq::IOSocket (empty) = {...})

which is not valid because assumption is that the argument must represent a shared_ptr that's not empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran test_send_object 500 times and it didn't fail. More info you can share?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magniloquency please resolve this and make issue about what's left (if you are doing it in misc fixes branch then there's no need for issue).

self._loop.add_signal_handler(signal.SIGINT, self.__destroy)
self._loop.add_signal_handler(signal.SIGTERM, self.__destroy)

def __destroy(self):
self._task.cancel()
Loading