Skip to content

Outdated and/or unclear documentation about SchedulerPlugin #8719

Open
@RaphaelRobidas

Description

@RaphaelRobidas

Describe the issue:
The current documentation regarding SchedulerPlugin with full task state access gives the following code as example:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

The dask_setup function seems to appear out of no-where and click isn't imported. Moreover, this approach does not seem to correctly register the plugin. For example, the follow code runs successfully:

import click
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)
    ret = client.submit(job, 1).result()
    print(ret)

The console log reads "2", while we would expect an Exception to be raised.

It is unclear how to register a scheduler plugin that can access the full task state. client.scheduler does not seem to be serializable, so the following code does not work and crashes with the error TypeError: cannot pickle 'coroutine' object:

from distributed.diagnostics.plugin import SchedulerPlugin
from distributed import Client, LocalCluster

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         ts = self.scheduler.tasks[key]
         raise Exception

def job(i):
    return i*2

if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    plugin = MyPlugin(client.scheduler)
    client.register_scheduler_plugin(plugin)

    ret = client.submit(job, 1).result()
    print(ret)

Environment:

  • Dask version: 2024.4.2
  • Python version: 3.10.12
  • Operating System: Linux Mint 21.2
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    documentationImprove or add to documentationenhancementImprove existing functionality or make things work better

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions