-
Notifications
You must be signed in to change notification settings - Fork 19
Implement WorkerDisconnectNotification (WDN) Protocol #497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
0ac3fb6
99848a7
0a84c8f
475ccde
d2b5075
7734d4c
59bf30a
10d7354
157cede
562269b
db1c15d
877abea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,27 +10,22 @@ | |
| import zmq.asyncio | ||
|
|
||
| from scaler.config.defaults import PROFILING_INTERVAL_SECONDS | ||
| from scaler.config.types.network_backend import NetworkBackend | ||
| from scaler.config.types.object_storage_server import ObjectStorageAddressConfig | ||
| from scaler.config.types.zmq import ZMQConfig, ZMQType | ||
| from scaler.io.async_binder import ZMQAsyncBinder | ||
| from scaler.io.mixins import AsyncBinder, AsyncConnector, AsyncObjectStorageConnector | ||
| from scaler.io.utility import ( | ||
| create_async_connector, | ||
| create_async_object_storage_connector, | ||
| get_scaler_network_backend_from_env, | ||
| ) | ||
| from scaler.io.utility import create_async_connector, create_async_object_storage_connector | ||
| from scaler.io.ymq import ymq | ||
| from scaler.protocol.python.message import ( | ||
| ClientDisconnect, | ||
| DisconnectRequest, | ||
| DisconnectResponse, | ||
| ObjectInstruction, | ||
| ProcessorInitialized, | ||
| Task, | ||
| TaskCancel, | ||
| TaskLog, | ||
| TaskResult, | ||
| WorkerDisconnectNotification, | ||
| WorkerHeartbeatEcho, | ||
| ) | ||
| from scaler.protocol.python.mixins import Message | ||
|
|
@@ -260,8 +255,7 @@ async def __get_loops(self): | |
| except Exception as e: | ||
| logging.exception(f"{self.identity!r}: failed with unhandled exception:\n{e}") | ||
|
|
||
| if get_scaler_network_backend_from_env() == NetworkBackend.tcp_zmq: | ||
| await self._connector_external.send(DisconnectRequest.new_msg(self.identity)) | ||
| await self._connector_external.send(WorkerDisconnectNotification.new_msg(self.identity)) | ||
|
|
||
| self._connector_external.destroy() | ||
| self._processor_manager.destroy("quit") | ||
|
|
@@ -275,14 +269,7 @@ def __run_forever(self): | |
| self._loop.run_until_complete(self._task) | ||
|
|
||
| def __register_signal(self): | ||
| backend = get_scaler_network_backend_from_env() | ||
| if backend == NetworkBackend.tcp_zmq: | ||
| self._loop.add_signal_handler(signal.SIGINT, self.__destroy) | ||
| elif backend == NetworkBackend.ymq: | ||
| self._loop.add_signal_handler(signal.SIGINT, lambda: asyncio.ensure_future(self.__graceful_shutdown())) | ||
|
|
||
| async def __graceful_shutdown(self): | ||
| await self._connector_external.send(DisconnectRequest.new_msg(self.identity)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we not sending
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed it back to sending when the worker exits as it's just a notification now and no response is expected. For YMQ I found that sometimes it gets a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I cannot reproduce the hang and the SegFault. Please try to reproduce on main branch on your machine and give at least some traces.
Would be nice to have test cases it hangs on.
Likely not, the SegFault is definitely not.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just saw
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have attached gdb to the frozen process, here's the 10 most recent stack frames: It seems like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe #445 is related? total guess
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Likely not. In fact that issue addressed a possible cause of hanging. Inspecting the log, I see
which is not valid because assumption is that the argument must represent a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I ran
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @magniloquency please resolve this and make issue about what's left (if you are doing it in misc fixes branch then there's no need for issue). |
||
| self._loop.add_signal_handler(signal.SIGINT, self.__destroy) | ||
|
|
||
| def __destroy(self): | ||
| self._task.cancel() | ||
Uh oh!
There was an error while loading. Please reload this page.