@@ -815,7 +815,7 @@ impl<'a> Pregel<'a> {
815815 let initial_message = & self . initial_message ;
816816 let mut current_vertices = vertices
817817 . to_owned ( )
818- . select ( vec ! [
818+ . select ( [
819819 all ( ) , // we select all the columns of the graph vertices
820820 initial_message
821821 . to_owned ( )
@@ -895,28 +895,32 @@ impl<'a> Pregel<'a> {
895895 col ( Column :: Id . as_ref ( ) ) , // id column of the current_vertices DataFrame
896896 Column :: msg ( Some ( Column :: Id ) ) , // msg.id column of the message_df DataFrame
897897 )
898- . with_column (
899- // we replace the null values by 0 for the aggregation to work properly
900- when ( Column :: msg ( None ) . is_null ( ) ) // if a node has no incoming edges, the msg column is null
901- . then ( self . replace_nulls . to_owned ( ) )
902- . otherwise ( Column :: msg ( None ) )
903- . alias ( Column :: Pregel . as_ref ( ) ) ,
904- )
905- . select ( vec ! [
898+ . with_column ( Column :: msg ( None ) . fill_null ( self . replace_nulls . to_owned ( ) ) )
899+ . select ( & [
906900 col ( Column :: Id . as_ref ( ) ) ,
907901 v_prog ( ) . alias ( self . vertex_column . as_ref ( ) ) ,
908902 ] ) ;
909903 // We update the `current_vertices` DataFrame with the new values for the vertices. We
910904 // do so by performing an inner join between the `current_vertices` DataFrame and the
911905 // `vertex_columns` DataFrame. The join is performed on the `id` column of the
912906 // `current_vertices` DataFrame and the `id` column of the `vertex_columns` DataFrame.
913- current_vertices = vertices
914- . to_owned ( )
915- . inner_join (
916- vertex_columns,
917- col ( Column :: Id . as_ref ( ) ) ,
918- col ( Column :: Id . as_ref ( ) ) ,
919- )
907+ let current_vertices_lf = vertices. to_owned ( ) . inner_join (
908+ vertex_columns,
909+ col ( Column :: Id . as_ref ( ) ) ,
910+ col ( Column :: Id . as_ref ( ) ) ,
911+ ) ;
912+
913+ println ! (
914+ "{}" ,
915+ current_vertices_lf
916+ . clone( )
917+ . with_common_subplan_elimination( false )
918+ . with_streaming( true )
919+ . describe_optimized_plan( )
920+ . unwrap( )
921+ ) ;
922+
923+ current_vertices = current_vertices_lf
920924 . with_common_subplan_elimination ( false )
921925 . with_streaming ( true )
922926 . collect ( ) ?;
0 commit comments