Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Development]
<!-- Do Not Erase This Section - Used for tracking unreleased changes -->
### Fixed
- **GFQL / GPU traversal**: Added a narrow one-hop undirected `hop()` fast path that avoids doubled edge-pair materialization for the common no-predicate traversal shape. On DGX-backed RAPIDS validation, warm `gplus` pipeline time improved `-39.67%` on `25.02` and `-39.27%` on `26.02`.

## [0.53.8 - 2026-03-31]

Expand Down
10 changes: 9 additions & 1 deletion graphistry/compute/dataframe_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional, Sequence
from typing import Any, Optional, Sequence, cast

import pandas as pd

Expand All @@ -20,6 +20,14 @@ def df_cons(template_df: DataFrameT, data: dict) -> DataFrameT:
return pd.DataFrame(data)


def column_frame(df: DataFrameT, col: str) -> DataFrameT:
return cast(DataFrameT, df[[col]])


def column_values(df: DataFrameT, col: str) -> SeriesT:
return cast(SeriesT, df[col])


def concat_frames(frames: Sequence[DataFrameT]) -> Optional[DataFrameT]:
non_empty = [f for f in frames if f is not None and len(f) > 0]
if not non_empty:
Expand Down
115 changes: 82 additions & 33 deletions graphistry/compute/hop.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .filter_by_dict import filter_by_dict
from graphistry.Engine import safe_merge
from .typing import DataFrameT, DomainT
from .dataframe_utils import column_frame, column_values
from .util import generate_safe_column_name


Expand Down Expand Up @@ -314,28 +315,6 @@ def _build_allowed_ids(
allowed_target_intermediate = base_target_nodes[node_col]
allowed_target_final = target_wave_front[[node_col]].drop_duplicates()[node_col]

pairs: DataFrameT
FROM_COL: str
TO_COL: str
FROM_COL = generate_safe_column_name('__gfql_from__', edges_indexed, prefix='__gfql_', suffix='__')
TO_COL = generate_safe_column_name('__gfql_to__', edges_indexed, prefix='__gfql_', suffix='__')

def _build_pairs(src_col: str, dst_col: str) -> DataFrameT:
return edges_indexed[[src_col, dst_col, EDGE_ID]].rename(
columns={src_col: FROM_COL, dst_col: TO_COL}
)

if direction == 'forward':
pairs = _build_pairs(g2._source, g2._destination)
elif direction == 'reverse':
pairs = _build_pairs(g2._destination, g2._source)
else:
pairs = concat(
[_build_pairs(g2._source, g2._destination), _build_pairs(g2._destination, g2._source)],
ignore_index=True,
sort=False,
).drop_duplicates(subset=[FROM_COL, TO_COL, EDGE_ID])

node_hop_records = None
edge_hop_records = None
seen_node_ids = None
Expand Down Expand Up @@ -365,12 +344,80 @@ def _build_pairs(src_col: str, dst_col: str) -> DataFrameT:
if fast_path_override in {"0", "false", "off", "no"}:
fast_path_enabled = False

simple_single_hop_undirected_fast_path = (
fast_path_enabled
and direction == 'undirected'
and not to_fixed_point
and resolved_max_hops == 1
and resolved_min_hops is not None
and resolved_min_hops <= 1
)

def _concat_node_frames(frames: List[DataFrameT]) -> DataFrameT:
non_empty = [df for df in frames if len(df) > 0]
if len(non_empty) == 0:
return starting_nodes[[node_col]][:0]
if len(non_empty) == 1:
return non_empty[0].drop_duplicates(subset=[node_col])
return concat(non_empty, ignore_index=True, sort=False).drop_duplicates(subset=[node_col])

first_iter = True
combined_node_ids = None
current_hop = 0
max_reached_hop = 0
skip_full_loop = False
if fast_path_enabled:

pairs: DataFrameT
FROM_COL: str
TO_COL: str
FROM_COL = generate_safe_column_name('__gfql_from__', edges_indexed, prefix='__gfql_', suffix='__')
TO_COL = generate_safe_column_name('__gfql_to__', edges_indexed, prefix='__gfql_', suffix='__')

if simple_single_hop_undirected_fast_path:
seed_ids = _domain_unique(starting_nodes[node_col])
if _domain_is_empty(seed_ids):
matches_nodes = starting_nodes[[node_col]][:0]
matches_edges = edges_indexed[[EDGE_ID]][:0]
else:
incident_mask = edges_indexed[source_col].isin(seed_ids) | edges_indexed[destination_col].isin(seed_ids)
incident_edges = edges_indexed[incident_mask]
src_hits = incident_edges[incident_edges[source_col].isin(seed_ids)]
dst_hits = incident_edges[incident_edges[destination_col].isin(seed_ids)]

node_frames: List[DataFrameT] = [
src_hits[[destination_col]].rename(columns={destination_col: node_col}),
dst_hits[[source_col]].rename(columns={source_col: node_col}),
]
if not return_as_wave_front:
node_frames.extend([
src_hits[[source_col]].rename(columns={source_col: node_col}),
dst_hits[[destination_col]].rename(columns={destination_col: node_col}),
])

matches_nodes = _concat_node_frames(node_frames)
matches_edges = incident_edges[[EDGE_ID]]
if len(matches_nodes) > 0:
max_reached_hop = 1
skip_full_loop = True
pairs = edges_indexed[[EDGE_ID]][:0]
else:
def _build_pairs(src_col: str, dst_col: str) -> DataFrameT:
return edges_indexed[[src_col, dst_col, EDGE_ID]].rename(
columns={src_col: FROM_COL, dst_col: TO_COL}
)

if direction == 'forward':
pairs = _build_pairs(g2._source, g2._destination)
elif direction == 'reverse':
pairs = _build_pairs(g2._destination, g2._source)
else:
pairs = concat(
[_build_pairs(g2._source, g2._destination), _build_pairs(g2._destination, g2._source)],
ignore_index=True,
sort=False,
).drop_duplicates(subset=[FROM_COL, TO_COL, EDGE_ID])

if fast_path_enabled and not skip_full_loop:
frontier_ids = _domain_unique(starting_nodes[node_col])
visited_node_ids = None
visited_edge_ids = None
Expand Down Expand Up @@ -897,8 +944,8 @@ def _undirected_cycle_nodes(edges_df: DataFrameT) -> set:
pass
seeds_mask = None
if seeds_provided and not label_seeds and starting_nodes is not None and node_col in starting_nodes.columns:
seed_ids = starting_nodes[[node_col]].drop_duplicates()
seeds_mask = g_out._nodes[node_col].isin(seed_ids[node_col])
seed_ids_df = cast(DataFrameT, column_frame(starting_nodes, node_col).drop_duplicates())
seeds_mask = g_out._nodes[node_col].isin(column_values(seed_ids_df, node_col))
missing_mask = g_out._nodes[node_hop_col].isna()
if seeds_mask is not None:
missing_mask = missing_mask & ~seeds_mask
Expand Down Expand Up @@ -981,8 +1028,8 @@ def _undirected_cycle_nodes(edges_df: DataFrameT) -> set:
).drop_duplicates(subset=[node_col])
mask = mask | g_out._nodes[node_col].isin(endpoint_ids[node_col])
if label_seeds and seeds_provided and starting_nodes is not None and node_col in starting_nodes.columns:
seed_ids = starting_nodes[[node_col]].drop_duplicates()
mask = mask | g_out._nodes[node_col].isin(seed_ids[node_col])
label_seed_ids_df = cast(DataFrameT, column_frame(starting_nodes, node_col).drop_duplicates())
mask = mask | g_out._nodes[node_col].isin(column_values(label_seed_ids_df, node_col))
g_out = g_out.nodes(g_out._nodes[mask].drop_duplicates(subset=[node_col]))
except Exception:
pass
Expand All @@ -1004,21 +1051,23 @@ def _undirected_cycle_nodes(edges_df: DataFrameT) -> set:
and starting_nodes is not None
and node_col in starting_nodes.columns
):
seed_ids = starting_nodes[[node_col]].drop_duplicates()
wavefront_seed_ids_df = cast(DataFrameT, column_frame(starting_nodes, node_col).drop_duplicates())
if direction == 'undirected' and to_fixed_point:
keep_seed_ids = _undirected_component_seed_keep_ids(final_edges, seed_ids)
keep_seed_ids = _undirected_component_seed_keep_ids(final_edges, wavefront_seed_ids_df)
keep_seed_ids |= _undirected_cycle_nodes(final_edges)
seed_mask = g_out._nodes[node_col].isin(seed_ids[node_col])
seed_mask = g_out._nodes[node_col].isin(column_values(wavefront_seed_ids_df, node_col))
if keep_seed_ids:
keep_mask = g_out._nodes[node_col].isin(list(keep_seed_ids))
filtered_nodes = g_out._nodes[~seed_mask | keep_mask]
else:
filtered_nodes = g_out._nodes[~seed_mask]
else:
seeds_not_reached = seed_ids
seeds_not_reached_df = wavefront_seed_ids_df
if matches_nodes is not None and node_col in matches_nodes.columns:
seeds_not_reached = seed_ids[~seed_ids[node_col].isin(matches_nodes[node_col])]
filtered_nodes = g_out._nodes[~g_out._nodes[node_col].isin(seeds_not_reached[node_col])]
seeds_not_reached_df = wavefront_seed_ids_df[
~column_values(wavefront_seed_ids_df, node_col).isin(column_values(matches_nodes, node_col))
]
filtered_nodes = g_out._nodes[~g_out._nodes[node_col].isin(column_values(seeds_not_reached_df, node_col))]
g_out = g_out.nodes(filtered_nodes)

return g_out
Loading
Loading