Skip to content

Increasing value of OMP_NUM_THREADS reduces performance even when controlling for n_workers and threads_per_worker #8985

Open
@Obi-Wan

Description

@Obi-Wan

Description:

I am working with external libraries that rely on NumPy's internal parallelization for certain heavy operations (like .dot between large matrices). I would like to distribute a certain number of these large calculations with distributed, but I encounter bad performance. In particular, when I change OMP_NUM_THREADS (and related variables) to the number of desired threads, performance gets worse!
I make sure not to oversubscribe the CPU, because I explicitly balance the number of OMP threads with the number of workers/worker threads, to match the system number of cores.

The example here below shows using one worker with one thread.
The performance of distributed is already underwhelming with one thread (even compared to the ProcessPoolExecutor), but with more threads it gets progressively worse.

As a side note, the CPU utilization on the system does indeed raise to the number of threads selected, despite the lower performance.
I am not aware of any other issue open on this subject or similar subjects.

Minimal Complete Verifiable Example:

(The commented lines were used to make sure the correct value of the variables were used, and yes, tqdm should not be used for performance assessment, but the difference is pretty clear, it is just a convenience tool)

from distributed import Client, get_worker, LocalCluster
from mkl import get_max_threads
import numpy as np
from tqdm.auto import tqdm
from numpy.typing import NDArray
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

NUM_THREADS = 16


def get_env(num_threads: int = NUM_THREADS) -> dict[str, str]:
    return {var: f"{num_threads}" for var in ["OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS"]}


def op(M: NDArray, y: NDArray, ii: int) -> float:
    # try:
    #     print(f"{ii = } - {get_worker() = }, {get_max_threads() = }")
    # except:
    #     print(f"{ii = } - {get_max_threads() = }")
    x = np.zeros(5_000)
    n1 = np.abs(M).dot(np.ones_like(x))
    n2 = np.abs(M).T.dot(np.ones_like(y))
    for _ in range(1_000):
        x += M.T.dot((y - M.dot(x)) / n1) / n2
    return float(np.linalg.norm(y - M.dot(x)))


if __name__ == "__main__":
    M = np.random.randn(500, 5_000)
    y = np.random.randn(500)

    N_TRIES = 11

    res_f = [op(M, y, ii) for ii in tqdm(range(N_TRIES), desc="For loop")]

    with ThreadPoolExecutor(max_workers=1) as executor:
        futures = [executor.submit(op, M, y, ii) for ii in range(N_TRIES)]
        res_d = [f.result() for f in tqdm(futures, desc=f"ThreadPoolExecutor ({NUM_THREADS})", total=N_TRIES)]

    with ProcessPoolExecutor(max_workers=1) as executor:
        futures = [executor.submit(op, M, y, ii) for ii in range(N_TRIES)]
        res_d = [f.result() for f in tqdm(futures, desc=f"ProcessPoolExecutor ({NUM_THREADS})", total=N_TRIES)]

    with LocalCluster(n_workers=1, threads_per_worker=1) as cluster:
        with Client(cluster) as client:
            print(client.dashboard_link)
            M_dd = client.scatter(M, broadcast=True)
            y_dd = client.scatter(y, broadcast=True)
            futures = [client.submit(op, M_dd, y_dd, ii) for ii in range(N_TRIES)]
            res_d = [f.result() for f in tqdm(futures, desc="Distributed (1)", total=N_TRIES)]

    with LocalCluster(n_workers=1, threads_per_worker=1, env=get_env()) as cluster:
        with Client(cluster) as client:
            print(client.dashboard_link)
            M_dd = client.scatter(M, broadcast=True)
            y_dd = client.scatter(y, broadcast=True)
            futures = [client.submit(op, M_dd, y_dd, ii) for ii in range(N_TRIES)]
            res_d = [f.result() for f in tqdm(futures, desc=f"Distributed ({NUM_THREADS})", total=N_TRIES)]

Output of the script:

ThreadPoolExecutor (16): 100%|██████████████████████████████████████████████████████| 11/11 [00:02<00:00,  5.14it/s]
ProcessPoolExecutor (16): 100%|█████████████████████████████████████████████████████| 11/11 [00:03<00:00,  2.98it/s]
http://127.0.0.1:8787/status
Distributed (1): 100%|██████████████████████████████████████████████████████████████| 11/11 [02:30<00:00, 13.72s/it]
http://127.0.0.1:8787/status
Distributed (16): 100%|█████████████████████████████████████████████████████████████| 11/11 [03:16<00:00, 17.88s/it]

Environment:

  • Dask version: 2024.8.2
  • Python version: any from 3.10 until 3.12 included
  • Operating System: Linux (ubuntu 2020.4)
  • Install method (conda, pip, source): conda

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokendiscussionDiscussing a topic with no specific actions yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions