@@ -1102,3 +1102,39 @@ def setup(self, worker):
11021102
11031103 def teardown (self , worker ):
11041104 self ._exit_stack .close ()
1105+
1106+
1107+ class MultiprocessingAuthkeyPlugin (WorkerPlugin ):
1108+ """
1109+ A WorkerPlugin to propagate the main process's ``multiprocessing.current_process().authkey``
1110+ to Dask workers.
1111+
1112+ This is necessary when using a ``multiprocessing.Manager`` for communication between the
1113+ main process and its workers, especially in distributed settings such as with
1114+ ``dask_jobqueue.SLURMCluster``. In standard multiprocessing, the ``authkey`` is automatically
1115+ propagated to child processes, but in distributed clusters, this must be done manually.
1116+
1117+ This plugin securely forwards the ``authkey`` from the client process to all workers by
1118+ setting the environment variable ``_DASK_MULTIPROCESSING_AUTHKEY`` and updating the worker's
1119+ ``multiprocessing.current_process().authkey`` accordingly.
1120+
1121+ Examples
1122+ --------
1123+ >>> from distributed.diagnostics.plugin import MultiprocessingAuthkeyPlugin
1124+ >>> client.register_plugin(MultiprocessingAuthkeyPlugin())
1125+ """
1126+
1127+ name = "multiprocessing-authkey"
1128+ idempotent = True
1129+
1130+ def __init__ (self ) -> None :
1131+ import multiprocessing .process
1132+ os .environ ["_DASK_MULTIPROCESSING_AUTHKEY" ] = multiprocessing .process .current_process ().authkey .hex ()
1133+
1134+ def setup (self , worker : Worker ) -> None :
1135+ import multiprocessing .process
1136+
1137+ multiprocessing .process .current_process ().authkey = bytes .fromhex (
1138+ os .environ ["_DASK_MULTIPROCESSING_AUTHKEY" ]
1139+ )
1140+
0 commit comments