change dask default client#499
Conversation
|
Just to be really pedantic, it isn't that the threaded scheduler isn't helpful on our cluster but that it isn't helpful for the workload we run on it! Because most of the runtime hotspots in pseudopeople can only effectively use one thread. |
be9103b to
1709f35
Compare
93ec3ec to
d465017
Compare
| # Generate a new (non-fixture) dataset for a single year but mocked such | ||
| # that no noise actually happens (otherwise the years would get noised and | ||
| # we couldn't tell if the filter was working properly) | ||
| mocker.patch("pseudopeople.dataset.Dataset._noise_dataset") |
There was a problem hiding this comment.
These mocks were added prior to implementing the config = psp.NO_NOISE option - but it doesn't work for a distributed cluster since the no_noise call is split among different processes
| #################### | ||
| # HELPER FUNCTIONS # | ||
| #################### | ||
| def _get_column_noise_level( |
There was a problem hiding this comment.
This just wasn't being used anymore
| available_memory = float(os.environ["SLURM_MEM_PER_NODE"]) / 1024 | ||
| except KeyError: | ||
| raise RuntimeError( | ||
| "You are on Slurm but SLURM_MEM_PER_NODE is not set. " |
There was a problem hiding this comment.
Happy to work on this message. It only shows up if you run pytest while SSHed into a cluster node (the obvious example being you're using vscode on the cluster)
| from dask.system import CPU_COUNT | ||
|
|
||
| # extract the memory limit from the environment variable | ||
| cluster = LocalCluster( # type: ignore [no-untyped-call] |
There was a problem hiding this comment.
I couldn't get mypy to be happy without just ignoring these dask untyped calls. It does mean that the added @overload blocks I added throughout this file aren't strictly required for mypy - but they are more correct regardless.
| @@ -126,6 +155,8 @@ def _generate_dataset( | |||
| import dask | |||
| import dask.dataframe as dd | |||
There was a problem hiding this comment.
@zmbc Remind me why you prefer having dask imports in local scope? It leads to a weird thing where we import dask.dataframe as dd if typechecking and then import it here during runtime if the engine is dask.
There was a problem hiding this comment.
It's so that a user doesn't have to have dask in their environment to run pseudopeople if they are using the pandas engine.
|
|
||
|
|
||
| @overload | ||
| def _generate_dataset( |
There was a problem hiding this comment.
This is for type-hinting to work?
There was a problem hiding this comment.
Well, kinda. Technically mypy didn't care anyway b/c I am ignoring the dask_data.map_partitions() call anyway ([no-untyped-arg]) and so mypy has no way of actually knowing what type it is. But this seems to be the correct way to handle return types that are arguemnt-specific
| @@ -126,6 +155,8 @@ def _generate_dataset( | |||
| import dask | |||
| import dask.dataframe as dd | |||
There was a problem hiding this comment.
It's so that a user doesn't have to have dask in their environment to run pseudopeople if they are using the pandas engine.
zmbc
left a comment
There was a problem hiding this comment.
Looks great, thank you for all the tricky debugging you did to get this working @stevebachmeier!
| @overload | ||
| def generate_decennial_census( | ||
| source: Path | str | None = None, | ||
| seed: int = 0, | ||
| config: Path | str | dict[str, Any] | None = None, | ||
| year: int | None = 2020, | ||
| state: str | None = None, | ||
| verbose: bool = False, | ||
| engine: Literal["pandas", "dask"] = "pandas", | ||
| engine: Literal["pandas"] = "pandas", | ||
| ) -> pd.DataFrame: | ||
| ... | ||
|
|
||
|
|
||
| @overload | ||
| def generate_decennial_census( | ||
| source: Path | str | None, | ||
| seed: int, | ||
| config: Path | str | dict[str, Any] | None, | ||
| year: int | None, | ||
| state: str | None, | ||
| verbose: bool, | ||
| engine: Literal["dask"], | ||
| ) -> dd.DataFrame: | ||
| ... |
There was a problem hiding this comment.
Note: I'm curious how overloads look in the docs, will take a look at this after I finish reviewing the code.
There was a problem hiding this comment.
The overloads don't show up 😞 Oh well, at least they could be helpful for autocomplete etc
There was a problem hiding this comment.
Hmm, that's too bad. I didn't think to check myself but sphinx def worked on it.
| n_workers=CPU_COUNT, | ||
| threads_per_worker=1, | ||
| ) | ||
| cluster.get_client() # type: ignore [no-untyped-call] |
There was a problem hiding this comment.
I have a vague memory of needing to have this client object in scope for it to be used. But I presume your testing here has ensured this client is used? I wonder if .get_client is even necessary?
There was a problem hiding this comment.
Oh, I should probably add to the test that the client is actually type LocalCluster. Then I think I'd be happy b/c that's def not just the default
There was a problem hiding this comment.
Shoot, the dask-default client happens to be a LocalCluster as well. I'll add a name to our new default (name = "pseudopeople_dask_cluster" unless you disagree) and assert that it's correct
Co-authored-by: Zeb Burke-Conte <zmbc@users.noreply.github.com>
Change default Dask client
Description
Dask by default uses a threaded scheduler which isn't helpful for these workloads.
This fixes it so that if first checks to see if a dask cluster is set up and, if so,
just uses that. If one isn't, it uses a
LocalClusterwith threads per node = 1.Testing
pytest --runslow)