diff --git a/CHANGELOG.md b/CHANGELOG.md index c7fb386691..d293df076e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Development] +### Fixed +- **GFQL / GPU traversal**: Added a narrow one-hop undirected `hop()` fast path that avoids doubled edge-pair materialization for the common no-predicate traversal shape. On DGX-backed RAPIDS validation, warm `gplus` pipeline time improved `-39.67%` on `25.02` and `-39.27%` on `26.02`. ## [0.53.8 - 2026-03-31] diff --git a/graphistry/compute/dataframe_utils.py b/graphistry/compute/dataframe_utils.py index 02a01dc004..ece9c4af84 100644 --- a/graphistry/compute/dataframe_utils.py +++ b/graphistry/compute/dataframe_utils.py @@ -1,4 +1,4 @@ -from typing import Any, Optional, Sequence +from typing import Any, Optional, Sequence, cast import pandas as pd @@ -20,6 +20,14 @@ def df_cons(template_df: DataFrameT, data: dict) -> DataFrameT: return pd.DataFrame(data) +def column_frame(df: DataFrameT, col: str) -> DataFrameT: + return cast(DataFrameT, df[[col]]) + + +def column_values(df: DataFrameT, col: str) -> SeriesT: + return cast(SeriesT, df[col]) + + def concat_frames(frames: Sequence[DataFrameT]) -> Optional[DataFrameT]: non_empty = [f for f in frames if f is not None and len(f) > 0] if not non_empty: diff --git a/graphistry/compute/hop.py b/graphistry/compute/hop.py index 608047b942..fbfe20de3b 100644 --- a/graphistry/compute/hop.py +++ b/graphistry/compute/hop.py @@ -17,6 +17,7 @@ from .filter_by_dict import filter_by_dict from graphistry.Engine import safe_merge from .typing import DataFrameT, DomainT +from .dataframe_utils import column_frame, column_values from .util import generate_safe_column_name @@ -314,28 +315,6 @@ def _build_allowed_ids( allowed_target_intermediate = base_target_nodes[node_col] allowed_target_final = target_wave_front[[node_col]].drop_duplicates()[node_col] - pairs: DataFrameT - FROM_COL: str - TO_COL: str - FROM_COL = generate_safe_column_name('__gfql_from__', edges_indexed, prefix='__gfql_', suffix='__') - TO_COL = generate_safe_column_name('__gfql_to__', edges_indexed, prefix='__gfql_', suffix='__') - - def _build_pairs(src_col: str, dst_col: str) -> DataFrameT: - return edges_indexed[[src_col, dst_col, EDGE_ID]].rename( - columns={src_col: FROM_COL, dst_col: TO_COL} - ) - - if direction == 'forward': - pairs = _build_pairs(g2._source, g2._destination) - elif direction == 'reverse': - pairs = _build_pairs(g2._destination, g2._source) - else: - pairs = concat( - [_build_pairs(g2._source, g2._destination), _build_pairs(g2._destination, g2._source)], - ignore_index=True, - sort=False, - ).drop_duplicates(subset=[FROM_COL, TO_COL, EDGE_ID]) - node_hop_records = None edge_hop_records = None seen_node_ids = None @@ -365,12 +344,80 @@ def _build_pairs(src_col: str, dst_col: str) -> DataFrameT: if fast_path_override in {"0", "false", "off", "no"}: fast_path_enabled = False + simple_single_hop_undirected_fast_path = ( + fast_path_enabled + and direction == 'undirected' + and not to_fixed_point + and resolved_max_hops == 1 + and resolved_min_hops is not None + and resolved_min_hops <= 1 + ) + + def _concat_node_frames(frames: List[DataFrameT]) -> DataFrameT: + non_empty = [df for df in frames if len(df) > 0] + if len(non_empty) == 0: + return starting_nodes[[node_col]][:0] + if len(non_empty) == 1: + return non_empty[0].drop_duplicates(subset=[node_col]) + return concat(non_empty, ignore_index=True, sort=False).drop_duplicates(subset=[node_col]) + first_iter = True combined_node_ids = None current_hop = 0 max_reached_hop = 0 skip_full_loop = False - if fast_path_enabled: + + pairs: DataFrameT + FROM_COL: str + TO_COL: str + FROM_COL = generate_safe_column_name('__gfql_from__', edges_indexed, prefix='__gfql_', suffix='__') + TO_COL = generate_safe_column_name('__gfql_to__', edges_indexed, prefix='__gfql_', suffix='__') + + if simple_single_hop_undirected_fast_path: + seed_ids = _domain_unique(starting_nodes[node_col]) + if _domain_is_empty(seed_ids): + matches_nodes = starting_nodes[[node_col]][:0] + matches_edges = edges_indexed[[EDGE_ID]][:0] + else: + incident_mask = edges_indexed[source_col].isin(seed_ids) | edges_indexed[destination_col].isin(seed_ids) + incident_edges = edges_indexed[incident_mask] + src_hits = incident_edges[incident_edges[source_col].isin(seed_ids)] + dst_hits = incident_edges[incident_edges[destination_col].isin(seed_ids)] + + node_frames: List[DataFrameT] = [ + src_hits[[destination_col]].rename(columns={destination_col: node_col}), + dst_hits[[source_col]].rename(columns={source_col: node_col}), + ] + if not return_as_wave_front: + node_frames.extend([ + src_hits[[source_col]].rename(columns={source_col: node_col}), + dst_hits[[destination_col]].rename(columns={destination_col: node_col}), + ]) + + matches_nodes = _concat_node_frames(node_frames) + matches_edges = incident_edges[[EDGE_ID]] + if len(matches_nodes) > 0: + max_reached_hop = 1 + skip_full_loop = True + pairs = edges_indexed[[EDGE_ID]][:0] + else: + def _build_pairs(src_col: str, dst_col: str) -> DataFrameT: + return edges_indexed[[src_col, dst_col, EDGE_ID]].rename( + columns={src_col: FROM_COL, dst_col: TO_COL} + ) + + if direction == 'forward': + pairs = _build_pairs(g2._source, g2._destination) + elif direction == 'reverse': + pairs = _build_pairs(g2._destination, g2._source) + else: + pairs = concat( + [_build_pairs(g2._source, g2._destination), _build_pairs(g2._destination, g2._source)], + ignore_index=True, + sort=False, + ).drop_duplicates(subset=[FROM_COL, TO_COL, EDGE_ID]) + + if fast_path_enabled and not skip_full_loop: frontier_ids = _domain_unique(starting_nodes[node_col]) visited_node_ids = None visited_edge_ids = None @@ -897,8 +944,8 @@ def _undirected_cycle_nodes(edges_df: DataFrameT) -> set: pass seeds_mask = None if seeds_provided and not label_seeds and starting_nodes is not None and node_col in starting_nodes.columns: - seed_ids = starting_nodes[[node_col]].drop_duplicates() - seeds_mask = g_out._nodes[node_col].isin(seed_ids[node_col]) + seed_ids_df = cast(DataFrameT, column_frame(starting_nodes, node_col).drop_duplicates()) + seeds_mask = g_out._nodes[node_col].isin(column_values(seed_ids_df, node_col)) missing_mask = g_out._nodes[node_hop_col].isna() if seeds_mask is not None: missing_mask = missing_mask & ~seeds_mask @@ -981,8 +1028,8 @@ def _undirected_cycle_nodes(edges_df: DataFrameT) -> set: ).drop_duplicates(subset=[node_col]) mask = mask | g_out._nodes[node_col].isin(endpoint_ids[node_col]) if label_seeds and seeds_provided and starting_nodes is not None and node_col in starting_nodes.columns: - seed_ids = starting_nodes[[node_col]].drop_duplicates() - mask = mask | g_out._nodes[node_col].isin(seed_ids[node_col]) + label_seed_ids_df = cast(DataFrameT, column_frame(starting_nodes, node_col).drop_duplicates()) + mask = mask | g_out._nodes[node_col].isin(column_values(label_seed_ids_df, node_col)) g_out = g_out.nodes(g_out._nodes[mask].drop_duplicates(subset=[node_col])) except Exception: pass @@ -1004,21 +1051,23 @@ def _undirected_cycle_nodes(edges_df: DataFrameT) -> set: and starting_nodes is not None and node_col in starting_nodes.columns ): - seed_ids = starting_nodes[[node_col]].drop_duplicates() + wavefront_seed_ids_df = cast(DataFrameT, column_frame(starting_nodes, node_col).drop_duplicates()) if direction == 'undirected' and to_fixed_point: - keep_seed_ids = _undirected_component_seed_keep_ids(final_edges, seed_ids) + keep_seed_ids = _undirected_component_seed_keep_ids(final_edges, wavefront_seed_ids_df) keep_seed_ids |= _undirected_cycle_nodes(final_edges) - seed_mask = g_out._nodes[node_col].isin(seed_ids[node_col]) + seed_mask = g_out._nodes[node_col].isin(column_values(wavefront_seed_ids_df, node_col)) if keep_seed_ids: keep_mask = g_out._nodes[node_col].isin(list(keep_seed_ids)) filtered_nodes = g_out._nodes[~seed_mask | keep_mask] else: filtered_nodes = g_out._nodes[~seed_mask] else: - seeds_not_reached = seed_ids + seeds_not_reached_df = wavefront_seed_ids_df if matches_nodes is not None and node_col in matches_nodes.columns: - seeds_not_reached = seed_ids[~seed_ids[node_col].isin(matches_nodes[node_col])] - filtered_nodes = g_out._nodes[~g_out._nodes[node_col].isin(seeds_not_reached[node_col])] + seeds_not_reached_df = wavefront_seed_ids_df[ + ~column_values(wavefront_seed_ids_df, node_col).isin(column_values(matches_nodes, node_col)) + ] + filtered_nodes = g_out._nodes[~g_out._nodes[node_col].isin(column_values(seeds_not_reached_df, node_col))] g_out = g_out.nodes(filtered_nodes) return g_out diff --git a/graphistry/tests/compute/test_hop.py b/graphistry/tests/compute/test_hop.py index df0f17fe86..8e2a9391cd 100644 --- a/graphistry/tests/compute/test_hop.py +++ b/graphistry/tests/compute/test_hop.py @@ -7,6 +7,45 @@ from graphistry.tests.test_compute import CGFull +def _sorted_records(df: pd.DataFrame): + cols = sorted(df.columns) + if not cols: + return [] + return df.sort_values(cols).reset_index(drop=True).to_dict(orient='records') + + +def _assert_one_hop_undirected_matches_fallback( + g: CGFull, + seed_nodes: pd.DataFrame, + *, + return_as_wave_front: bool, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv('GRAPHISTRY_HOP_FAST_PATH', raising=False) + fast = g.hop( + nodes=seed_nodes, + hops=1, + to_fixed_point=False, + direction='undirected', + return_as_wave_front=return_as_wave_front, + ) + + monkeypatch.setenv('GRAPHISTRY_HOP_FAST_PATH', '0') + slow = g.hop( + nodes=seed_nodes, + hops=1, + to_fixed_point=False, + direction='undirected', + return_as_wave_front=return_as_wave_front, + ) + monkeypatch.delenv('GRAPHISTRY_HOP_FAST_PATH', raising=False) + + assert sorted(fast._nodes.columns.tolist()) == sorted(slow._nodes.columns.tolist()) + assert sorted(fast._edges.columns.tolist()) == sorted(slow._edges.columns.tolist()) + assert _sorted_records(fast._nodes) == _sorted_records(slow._nodes) + assert _sorted_records(fast._edges) == _sorted_records(slow._edges) + + @pytest.fixture(scope='module') def g_long_forwards_chain() -> CGFull: """ @@ -182,6 +221,242 @@ def test_hop_fixedpoint_undirected_does_not_revisit_seed_via_same_edge(self, g_l {'s': 'd', 'd': 'e'} ] + def test_hop_one_hop_undirected_keeps_rediscovered_seed_from_other_seed(self, g_long_forwards_chain: CGFull): + seed_nodes = g_long_forwards_chain._nodes[g_long_forwards_chain._nodes['v'].isin(['a', 'b'])] + + g2 = g_long_forwards_chain.hop( + nodes=seed_nodes, + hops=1, + to_fixed_point=False, + direction='undirected', + return_as_wave_front=True + ) + assert set(g2._nodes['v'].tolist()) == {'a', 'b', 'c'} + assert g2._edges[['s', 'd']].sort_values(['s', 'd']).to_dict(orient='records') == [ + {'s': 'a', 'd': 'b'}, + {'s': 'b', 'd': 'c'} + ] + + def test_hop_one_hop_undirected_excludes_disconnected_explicit_seed(self): + g_disconnected = ( + CGFull() + .edges( + pd.DataFrame( + { + 's': ['a'], + 'd': ['b'], + } + ), + 's', + 'd', + ) + .nodes(pd.DataFrame({'v': ['a', 'b', 'x']}), 'v') + ) + seed_nodes = g_disconnected._nodes[g_disconnected._nodes['v'].isin(['a', 'x'])] + + g2 = g_disconnected.hop( + nodes=seed_nodes, + hops=1, + to_fixed_point=False, + direction='undirected', + return_as_wave_front=False + ) + assert set(g2._nodes['v'].tolist()) == {'a', 'b'} + assert g2._edges[['s', 'd']].sort_values(['s', 'd']).to_dict(orient='records') == [ + {'s': 'a', 'd': 'b'} + ] + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_duplicate_seed_rows_match_fallback(self, g_long_forwards_chain: CGFull, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + seed_nodes = pd.DataFrame({'v': ['a', 'a', 'b']}) + + _assert_one_hop_undirected_matches_fallback( + g_long_forwards_chain, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_numeric_ids_match_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_numeric = ( + CGFull() + .edges( + pd.DataFrame( + { + 's': [0, 1, 1, 2], + 'd': [1, 2, 2, 3], + } + ), + 's', + 'd', + ) + .nodes(pd.DataFrame({'v': [0, 1, 2, 3]}), 'v') + ) + seed_nodes = g_numeric._nodes[g_numeric._nodes['v'].isin([0, 1])] + + _assert_one_hop_undirected_matches_fallback( + g_numeric, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_self_loop_matches_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_loop = ( + CGFull() + .edges( + pd.DataFrame( + { + 's': ['a', 'a'], + 'd': ['a', 'b'], + } + ), + 's', + 'd', + ) + .nodes(pd.DataFrame({'v': ['a', 'b']}), 'v') + ) + seed_nodes = g_loop._nodes[g_loop._nodes['v'].isin(['a'])] + + _assert_one_hop_undirected_matches_fallback( + g_loop, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_parallel_edges_match_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_parallel = ( + CGFull() + .edges( + pd.DataFrame( + { + 's': ['a', 'a', 'b'], + 'd': ['b', 'b', 'c'], + } + ), + 's', + 'd', + ) + .nodes(pd.DataFrame({'v': ['a', 'b', 'c']}), 'v') + ) + seed_nodes = g_parallel._nodes[g_parallel._nodes['v'].isin(['a'])] + + _assert_one_hop_undirected_matches_fallback( + g_parallel, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_overlapping_seed_neighborhood_matches_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_overlap = ( + CGFull() + .edges( + pd.DataFrame( + { + 's': ['a', 'b', 'b', 'c'], + 'd': ['b', 'c', 'd', 'e'], + } + ), + 's', + 'd', + ) + .nodes(pd.DataFrame({'v': ['a', 'b', 'c', 'd', 'e']}), 'v') + ) + seed_nodes = g_overlap._nodes[g_overlap._nodes['v'].isin(['a', 'c'])] + + _assert_one_hop_undirected_matches_fallback( + g_overlap, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_node_source_conflict_matches_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_conflict = ( + CGFull() + .edges( + pd.DataFrame( + { + 'id': ['a', 'b'], + 'dst': ['b', 'c'], + 'weight': [1.5, 2.5], + } + ), + 'id', + 'dst', + ) + .nodes(pd.DataFrame({'id': ['a', 'b', 'c'], 'group': ['x', 'y', 'z']}), 'id') + ) + seed_nodes = g_conflict._nodes[g_conflict._nodes['id'].isin(['a'])] + + _assert_one_hop_undirected_matches_fallback( + g_conflict, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_node_destination_conflict_matches_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_conflict = ( + CGFull() + .edges( + pd.DataFrame( + { + 'src': ['a', 'b'], + 'id': ['b', 'c'], + 'weight': [1.5, 2.5], + } + ), + 'src', + 'id', + ) + .nodes(pd.DataFrame({'id': ['a', 'b', 'c'], 'group': ['x', 'y', 'z']}), 'id') + ) + seed_nodes = g_conflict._nodes[g_conflict._nodes['id'].isin(['b'])] + + _assert_one_hop_undirected_matches_fallback( + g_conflict, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + + @pytest.mark.parametrize('return_as_wave_front', [False, True]) + def test_hop_one_hop_undirected_explicit_edge_id_and_attrs_match_fallback(self, return_as_wave_front: bool, monkeypatch: pytest.MonkeyPatch): + g_attr = ( + CGFull() + .edges( + pd.DataFrame( + { + 's': ['a', 'a', 'b'], + 'd': ['b', 'b', 'c'], + 'eid': ['e1', 'e2', 'e3'], + 'weight': [1.0, 2.0, 3.0], + } + ), + 's', + 'd', + 'eid', + ) + .nodes(pd.DataFrame({'v': ['a', 'b', 'c'], 'color': ['red', 'blue', 'green']}), 'v') + ) + seed_nodes = g_attr._nodes[g_attr._nodes['v'].isin(['a'])] + + _assert_one_hop_undirected_matches_fallback( + g_attr, + seed_nodes, + return_as_wave_front=return_as_wave_front, + monkeypatch=monkeypatch, + ) + def test_hop_fixedpoint_undirected_keeps_seed_when_reachable_via_real_cycle(self): g_cycle = ( CGFull()