Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added traversal algorithm to nx_parallel #91

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the isolated nodes as input and returns an iterable `isolate_chunks`. The default chunking is done by slicing the `isolates` into `n_jobs` number of chunks."
},
},
"parallel_bfs": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/traversal/breadth_first_search.py#L10",
"additional_docs": "Perform a parallelized Breadth-First Search (BFS) on the graph.",
"additional_parameters": {
"G : graph": 'A NetworkX graph. source : node, optional Starting node for the BFS traversal. If None, BFS is performed for all nodes. get_chunks : str or function (default="chunks") A function to divide nodes into chunks for parallel processing. If "chunks", the nodes are split into `n_jobs` chunks automatically. n_jobs : int, optional Number of jobs to run in parallel. If None, defaults to the number of CPUs.',
"bfs_result : dict": "A dictionary where keys are nodes and values are their BFS traversal order.",
},
},
"square_clustering": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/cluster.py#L11",
"additional_docs": "The nodes are chunked into `node_chunks` and then the square clustering coefficient for all `node_chunks` are computed in parallel over `n_jobs` number of CPU cores.",
Expand Down
2 changes: 2 additions & 0 deletions nx_parallel/algorithms/traversal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .depth_first_search import *

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from .depth_first_search import *

from .breadth_first_search import *
96 changes: 96 additions & 0 deletions nx_parallel/algorithms/traversal/breadth_first_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from joblib import Parallel, delayed
from networkx.utils import py_random_state
import nx_parallel as nxp

__all__ = ["parallel_bfs"]


@nxp._configure_if_nx_active()
@py_random_state(3)
def parallel_bfs(G, source=None, get_chunks="chunks", n_jobs=None):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n_jobs is included in the configurations. so no need to include it as a parameter here. (see - https://github.com/networkx/nx-parallel/blob/main/Config.md)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the names of the functions in the main networkx project and in this nx-parallel project are usually kept same. (see - https://github.com/networkx/nx-parallel/blob/main/CONTRIBUTING.md#general-guidelines-on-adding-a-new-algorithm)

"""
Perform a parallelized Breadth-First Search (BFS) on the graph.

Parameters
----------
G : graph
A NetworkX graph.
source : node, optional
Starting node for the BFS traversal. If None, BFS is performed for all nodes.
Comment on lines +16 to +19

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to document the parameters that are already documented in the main networkx repo :)

get_chunks : str or function (default="chunks")
A function to divide nodes into chunks for parallel processing.
If "chunks", the nodes are split into `n_jobs` chunks automatically.
n_jobs : int, optional
Number of jobs to run in parallel. If None, defaults to the number of CPUs.

Returns
-------
bfs_result : dict
A dictionary where keys are nodes and values are their BFS traversal order.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

if source is None:
nodes = G.nodes
else:
nodes = [source]

if n_jobs is None:
n_jobs = nxp.get_n_jobs()

# Create node chunks
if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

# Run BFS on each chunk in parallel
bfs_results = Parallel(n_jobs=n_jobs)(
delayed(_bfs_chunk)(G, chunk) for chunk in node_chunks
)

# Combine results from all chunks
combined_result = {}
for result in bfs_results:
combined_result.update(result)

return combined_result


def _bfs_chunk(G, nodes):
"""
Perform BFS for a subset of nodes.

Parameters
----------
G : graph
A NetworkX graph.
nodes : list
A list of nodes to start BFS from.

Returns
-------
bfs_result : dict
BFS traversal order for the given subset of nodes.
"""
bfs_result = {}
for node in nodes:
if node not in bfs_result:
visited = set()
queue = [node]
order = 0

while queue:
current = queue.pop(0)
if current not in visited:
visited.add(current)
bfs_result[current] = order
order += 1
queue.extend(
neighbor
for neighbor in G.neighbors(current)
if neighbor not in visited
)

return bfs_result
Loading