Skip to content

Implement WorkerDisconnectNotification (WDN) Protocol#497

Open
magniloquency wants to merge 12 commits intofinos:mainfrom
magniloquency:feature/worker-disconnect-notification
Open

Implement WorkerDisconnectNotification (WDN) Protocol#497
magniloquency wants to merge 12 commits intofinos:mainfrom
magniloquency:feature/worker-disconnect-notification

Conversation

@magniloquency
Copy link
Contributor

@magniloquency magniloquency commented Jan 9, 2026

Closes #465

Overview

This PR introduces the WorkerDisconnectNotification (WDN) message to the Scaler protocol. This allows workers to proactively notify the scheduler when they are shutting down gracefully, enabling immediate state cleanup on the scheduler side without waiting for timeouts or heartbeat failures.

Changes

1. Protocol Definition

  • Cap'n Proto: Added WorkerDisconnectNotification struct to message.capnp.
  • Python Protocol: Implemented the WorkerDisconnectNotification class in src/scaler/protocol/python/message.py and registered it.
  • Documentation: Updated src/scaler/protocol/worker.md to include the new WDN message specification.

2. Worker Implementation

  • Graceful Shutdown: Updated the Worker class to send a WorkerDisconnectNotification message to the scheduler at the end of its run method, ensuring the scheduler is informed before the process terminates.
  • Refactoring: Simplified signal handling by removing the separate __graceful_shutdown async handler in favor of a unified shutdown sequence that ensures the notification is dispatched.

3. Scheduler Implementation

  • Handling WDN: Updated Scheduler.py to recognize the WorkerDisconnectNotification message and route it to the appropriate controller.
  • Controller Logic: Implemented on_disconnect_notification in WorkerController (and its mixin) to process the notification and trigger the immediate disconnection and cleanup of the worker state.

- Add WorkerDisconnectNotification struct to Cap'n Proto definition.
- Implement WorkerDisconnectNotification Python class and register it in PROTOCOL.
- Update worker protocol documentation.
@magniloquency magniloquency force-pushed the feature/worker-disconnect-notification branch 4 times, most recently from 728ea31 to 0cbfc00 Compare January 9, 2026 00:52
- Update Scheduler and WorkerController to handle WorkerDisconnectNotification (WDN)
- Update Worker to send WDN upon graceful shutdown
- Update protocol documentation for WDN
- Apply formatting fixes (black/isort) to modified files
@magniloquency magniloquency force-pushed the feature/worker-disconnect-notification branch from 0cbfc00 to 99848a7 Compare January 9, 2026 00:56
@magniloquency magniloquency changed the title Implement WorkerDisconnectNotification and fix Worker Adapter regressions Implement WorkerDisconnectNotification (WDN) Protocol Jan 9, 2026
@magniloquency magniloquency marked this pull request as ready for review January 9, 2026 01:10
@gxuu
Copy link
Contributor

gxuu commented Jan 9, 2026

Did you test with the case where ymq is the network backend?

Now, you can switch the value SCALER_NETWORK_BACKEND = NetworkBackend.tcp_zmq to be ymq to test it.

self._loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(self.__graceful_shutdown()))

async def __graceful_shutdown(self):
await self._connector_external.send(DisconnectRequest.new_msg(self.identity))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not sending WorkerDisconnectNotification when receiving signals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it back to sending when the worker exits as it's just a notification now and no response is expected. For YMQ I found that sometimes it gets a ConnectorSocketClosedByRemoteEnd exception, so I've just added a check for that, but for some reasons it seems to hang sometimes (difficult to reproduce, seems kind of random). One time I also got a segmentation fault during the test_graph_fail test. But I'm not sure if these are related to my changes?

Copy link
Contributor

@gxuu gxuu Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot reproduce the hang and the SegFault. Please try to reproduce on main branch on your machine and give at least some traces.

for some reasons it seems to hang sometimes

Would be nice to have test cases it hangs on.

But I'm not sure if these are related to my changes?

Likely not, the SegFault is definitely not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like test_graph_fail and test_graph_error cause it to hang or segfault, I've seen both, but it doesn't always happen, sometimes there's no error

Copy link
Contributor Author

@magniloquency magniloquency Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw test_send_object hang. The hangs usually happen when exiting / quitting

Copy link
Contributor Author

@magniloquency magniloquency Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have attached gdb to the frozen process, here's the 10 most recent stack frames:

#0  0x00007fe6c1f1872d in syscall () from /usr/lib/libc.so.6
#1  0x00007fe6c0edf000 in std::__atomic_futex_unsigned_base::_M_futex_wait_until (this=<optimized out>, __addr=0x55f483ca0b60, __val=2147483648, __has_timeout=<optimized out>, __s=...,
    __ns=...) at /usr/src/debug/gcc/gcc/libstdc++-v3/src/c++11/futex.cc:122
#2  0x00007fe6bfd42b49 in std::__atomic_futex_unsigned<2147483648u>::_M_load_and_test_until (this=0x55f483ca0b60, __assumed=0, __operand=1, __equal=true,
    __mo=std::memory_order::acquire, __has_timeout=false, __s=std::chrono::duration = { 0s }, __ns=std::chrono::duration = { 0ns }) at /usr/include/c++/15.2.1/bits/atomic_futex.h:111
#3  0x00007fe6bfd41636 in std::__atomic_futex_unsigned<2147483648u>::_M_load_and_test (this=0x55f483ca0b60, __assumed=0, __operand=1, __equal=true, __mo=std::memory_order::acquire)
    at /usr/include/c++/15.2.1/bits/atomic_futex.h:160
#4  0x00007fe6bfd3f2a0 in std::__atomic_futex_unsigned<2147483648u>::_M_load_when_equal (this=0x55f483ca0b60, __val=1, __mo=std::memory_order::acquire)
    at /usr/include/c++/15.2.1/bits/atomic_futex.h:218
#5  std::__future_base::_State_baseV2::wait (this=0x55f483ca0b50) at /usr/include/c++/15.2.1/future:362
#6  0x00007fe6bfd41512 in std::__basic_future<void>::wait (this=0x7ffcabc784d0) at /usr/include/c++/15.2.1/future:725
#7  0x00007fe6bfd3de02 in scaler::ymq::IOContext::removeIOSocket (this=0x55f47e3d9630, socket=std::shared_ptr<scaler::ymq::IOSocket> (empty) = {...})
    at /home/xxx/work/opengris-scaler/src/cpp/scaler/ymq/io_context.cpp:94
#8  0x00007fe6bfce63a1 in PyIOSocket_dealloc (self=0x7fe6abe72f40) at /home/xxx/work/opengris-scaler/src/cpp/scaler/ymq/pymod_ymq/io_socket.h:40
#9  0x00007fe6c22fb85d in _Py_DECREF (op=<optimized out>) at ./Include/object.h:500
#10 _Py_XDECREF (op=<optimized out>) at ./Include/object.h:567

It seems like IOContext::removeIOSocket() is hanging for some reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe #445 is related? total guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe #445 is related? total guess

Likely not. In fact that issue addressed a possible cause of hanging.

Inspecting the log, I see

#7 0x00007fe6bfd3de02 in scaler::ymq::IOContext::removeIOSocket (this=0x55f47e3d9630, socket=std::shared_ptrscaler::ymq::IOSocket (empty) = {...})

which is not valid because assumption is that the argument must represent a shared_ptr that's not empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran test_send_object 500 times and it didn't fail. More info you can share?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magniloquency please resolve this and make issue about what's left (if you are doing it in misc fixes branch then there's no need for issue).

@magniloquency magniloquency force-pushed the feature/worker-disconnect-notification branch 2 times, most recently from 71f0b7e to 832597e Compare January 13, 2026 00:35
@magniloquency magniloquency force-pushed the feature/worker-disconnect-notification branch from 832597e to 0a84c8f Compare January 13, 2026 00:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Workers do not acknowledge cancel request

3 participants