Skip to content

[QST]: How to run multi GPU cugraph leiden for a large graph #4884

Open
@abs51295

Description

@abs51295

What is your question?

Hey, I am trying to run multi-GPU leiden clustering on a large graph with 3.7 billion edges. I have 4 NVIDIA A100 GPUs with 80G VRAM each. I am facing memory issues when I try to run it and I was wondering if you have any suggestions on how to handle such a large graph. Here's my code:

import argparse
from cuml.dask.common import utils as dask_utils
from cugraph.dask.common.read_utils import get_n_workers
from cugraph.dask.common.mg_utils import get_visible_devices, run_gc_on_dask_cluster
from cugraph.dask.common.part_utils import load_balance_func, persist_dask_df_equal_parts_per_worker
import dask
from dask_cuda import LocalCUDACluster
from distributed import Client
import dask_cudf
import numpy as np
from cugraph import Graph
from cugraph.dask import leiden as culeiden
import os
from cugraph.testing.mg_utils import (
    start_dask_client,
    stop_dask_client,
)
import math
from dask.distributed import wait
from cudf.utils.performance_tracking import print_memory_report
from rmm.statistics import enable_statistics
import cudf
from cugraph.dask.comms import comms as Comms

def parse_args():
    parser = argparse.ArgumentParser(description='Compute leiden using multiple GPUs')
    parser.add_argument('--file', required=True, help='Input numpy file path')
    parser.add_argument('--output-dir', required=True, help='Output directory')
    parser.add_argument('--resolution', type=float, default=0.5, help='Resolution for leiden')
    parser.add_argument('--memory-fraction', type=float, default=0.7, 
                       help='Fraction of GPU memory to use for RMM pool')
    return parser.parse_args()

def save_leiden_output(dask_df, resolution, output_dir):
    file_path = os.path.join(output_dir, f"leiden_resolution_{resolution}.parquet")
    print(f"Saving leiden output to {output_path}")
    file_path.to_parquet(file_path)
    
def load_and_persist_data(client, file_path):
    df = dask_cudf.read_parquet(
         path=file_path, columns=['source', 'destination', 'weights'], index=False, blocksize='1GiB',
    ).astype({'source': np.int64, 'destination': np.int64, 'weights': np.float32})
    
    (persisted_df,) = dask_utils.persist_across_workers(
        client, [df], workers=list(client.has_what().keys())
    )
    return persisted_df

def create_graph(persisted_df):
    g = Graph()
    g.from_dask_cudf_edgelist(
        persisted_df, source="source", destination="destination", weight="weights"
    )
    return g

def main():
    args = parse_args()

    dask.config.set({
        "dask.dataframe.backend": "cudf",
        "distributed.scheduler.worker-ttl": None
    })

    print(f"Devices available are: {get_visible_devices()}")

    client = Client(LocalCUDACluster(
        CUDA_VISIBLE_DEVICES="4,5,6,7", #should use all devices by default
        rmm_pool_size=args.memory_fraction,
        memory_limit=0.85,
        device_memory_limit=0.85,
        enable_cudf_spill=True,
        local_directory="/tmp",
        cudf_spill_stats=1,
        protocol="ucx",
        enable_tcp_over_ucx=True,
        enable_nvlink=True,
        enable_infiniband=False
    ))

    client.wait_for_workers(n_workers=4)
    Comms.initialize(p2p=True)
    
    print(f"Number of workers started are {len(client.has_what().keys())}")
    
    persisted_df = load_and_persist_data(client=client, file_path=args.file)
    wait(persisted_df)
    
    ## explicitly test if all workers have equal parts
    dist = np.array([len(v) for v in client.has_what().values()])
    print(f"Worker distribution is {dist}")
    #  assert np.all(dist == dist[0])

    print(f"Loaded data with shape: {persisted_df.shape} and number of partitions are {persisted_df.npartitions}")

    print("Creating a dask undirected weighted graph to run leiden.......")
    graph = create_graph(persisted_df)

    print("Running multi-gpu leiden algorithm for 100 iterations (default).........")
    leiden_parts, modularity = culeiden(
        graph,
        resolution=args.resolution,
        random_state=0,
        max_iter=100,
    )

    print(f"Leiden completed with modularity score of {modularity}")
    save_leiden_output(dask_df = leiden_parts, resolution = args.resolution, output_dir = args.output_dir)
    
    print("Processing completed successfully")

    client.shutdown()

if __name__ == "__main__":
    main()

Here's the error message I get:

Worker distribution is [8 8 8 8]
Loaded data with shape: (<dask_expr.expr.Scalar: expr=FromGraph(bca9c83).size() // 3, dtype=int64>, 3) and number of partitions are 32
Creating a dask undirected weighted graph to run leiden.......
/home/shaha4/miniforge3/envs/rapids-25.02/lib/python3.11/site-packages/cudf/core/reshape.py:384: FutureWarning: The behavior of array concatenation with empty entries is deprecated. In a future version, this will no longer exclude empty items when determining the result dtype. To retain the old behavior, exclude the empty entries before the concat operation.
  warnings.warn(
[2025-01-22 12:58:34.783] [RMM] [error] [A][Stream 0x2][Upstream 7451880192B][FAILURE maximum pool size exceeded]
[2025-01-22 12:58:37.504] [RMM] [error] [A][Stream 0x2][Upstream 7451880192B][FAILURE maximum pool size exceeded]
[2025-01-22 12:58:38.862] [RMM] [error] [A][Stream 0x2][Upstream 7451880192B][FAILURE maximum pool size exceeded]
[2025-01-22 12:58:42.402] [RMM] [error] [A][Stream 0x2][Upstream 1862663936B][FAILURE maximum pool size exceeded]
2025-01-22 12:58:43,473 - distributed.worker - ERROR - Compute Failed
Key:       _make_plc_graph-266693e6-ddc3-4888-8e4c-5ac4fc9bef3a
State:     executing
Task:  <Task '_make_plc_graph-266693e6-ddc3-4888-8e4c-5ac4fc9bef3a' _make_plc_graph(...)>
Exception: "RuntimeError('non-success value returned from cugraph_mg_graph_create(): CUGRAPH_UNKNOWN_ERROR NCCL error encountered at: file=/home/shaha4/miniforge3/envs/rapids-25.02/include/raft/comms/detail/std_comms.hpp line=632: ')"
Traceback: '  File "/home/shaha4/miniforge3/envs/rapids-25.02/lib/python3.11/site-packages/cugraph/structure/graph_implementation/simpleDistributedGraph.py", line 140, in _make_plc_graph\n    plc_graph = MGGraph(\n                ^^^^^^^^\n  File "graphs.pyx", line 485, in pylibcugraph.graphs.MGGraph.__cinit__\n  File "utils.pyx", line 53, in pylibcugraph.utils.assert_success\n'

2025-01-22 12:58:43,473 - distributed.worker - ERROR - Compute Failed
Key:       _make_plc_graph-e77be12f-3695-45b5-a3d7-a8f9db4c9e0f
State:     executing
Task:  <Task '_make_plc_graph-e77be12f-3695-45b5-a3d7-a8f9db4c9e0f' _make_plc_graph(...)>
Exception: "RuntimeError('non-success value returned from cugraph_mg_graph_create(): CUGRAPH_UNKNOWN_ERROR NCCL error encountered at: file=/home/shaha4/miniforge3/envs/rapids-25.02/include/raft/comms/detail/std_comms.hpp line=632: ')"
Traceback: '  File "/home/shaha4/miniforge3/envs/rapids-25.02/lib/python3.11/site-packages/cugraph/structure/graph_implementation/simpleDistributedGraph.py", line 140, in _make_plc_graph\n    plc_graph = MGGraph(\n                ^^^^^^^^\n  File "graphs.pyx", line 485, in pylibcugraph.graphs.MGGraph.__cinit__\n  File "utils.pyx", line 53, in pylibcugraph.utils.assert_success\n'

Code of Conduct

  • I agree to follow cuGraph's Code of Conduct
  • I have searched the open issues and have found no duplicates for this question

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Labels

questionFurther information is requested

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions