1010import numpy as np
1111from tqdm import tqdm
1212
13- from pychunkedgraph .graph import ChunkedGraph
13+ from pychunkedgraph .graph import ChunkedGraph , edges
1414from pychunkedgraph .graph .attributes import Connectivity , Hierarchy
15- from pychunkedgraph .graph .edges import get_latest_edges_wrapper
1615from pychunkedgraph .graph .utils import serializers
1716from pychunkedgraph .graph .types import empty_2d
1817from pychunkedgraph .utils .general import chunked
@@ -105,7 +104,6 @@ def _populate_cx_edges_with_timestamps(
105104 row_id = serializers .serialize_uint64 (node )
106105 val_dict = {Hierarchy .StaleTimeStamp : 0 }
107106 rows .append (cg .client .mutate_row (row_id , val_dict , time_stamp = node_end_ts ))
108-
109107 cg .client .write (rows )
110108
111109
@@ -119,7 +117,7 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts) -> list:
119117 for ts , cx_edges_d in CX_EDGES [node ].items ():
120118 if ts < node_ts :
121119 continue
122- cx_edges_d , edge_nodes = get_latest_edges_wrapper (cg , cx_edges_d , parent_ts = ts )
120+ cx_edges_d , edge_nodes = edges . get_latest_edges_wrapper (cg , cx_edges_d , parent_ts = ts )
123121 if edge_nodes .size == 0 :
124122 continue
125123
@@ -138,13 +136,7 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts) -> list:
138136 return rows
139137
140138
141- def _update_cross_edges_helper_thread (args ):
142- cg , layer , node , node_ts = args
143- return update_cross_edges (cg , layer , node , node_ts )
144-
145-
146139def _update_cross_edges_helper (args ):
147- rows = []
148140 clean_task = os .environ .get ("CLEAN_CHUNKS" , "false" ) == "clean"
149141 cg_info , layer , nodes , nodes_ts = args
150142 cg = ChunkedGraph (** cg_info )
@@ -167,12 +159,9 @@ def _update_cross_edges_helper(args):
167159 fix_corrupt_nodes (cg , corrupt_nodes , CHILDREN )
168160 return
169161
170- with ThreadPoolExecutor (max_workers = 4 ) as executor :
171- futures = [
172- executor .submit (_update_cross_edges_helper_thread , task ) for task in tasks
173- ]
174- for future in tqdm (as_completed (futures ), total = len (futures )):
175- rows .extend (future .result ())
162+ rows = []
163+ for task in tasks :
164+ rows .extend (update_cross_edges (* task ))
176165 cg .client .write (rows )
177166
178167
@@ -204,12 +193,13 @@ def update_chunk(
204193
205194 if debug :
206195 rows = []
196+ logging .info (f"processing { len (nodes )} nodes with 1 worker." )
207197 for node , node_ts in zip (nodes , nodes_ts ):
208198 rows .extend (update_cross_edges (cg , layer , node , node_ts ))
209199 logging .info (f"total elaspsed time: { time .time () - start } " )
210200 return
211201
212- task_size = int (math .ceil (len (nodes ) / mp .cpu_count () / 2 ))
202+ task_size = int (math .ceil (len (nodes ) / mp .cpu_count ()))
213203 chunked_nodes = chunked (nodes , task_size )
214204 chunked_nodes_ts = chunked (nodes_ts , task_size )
215205 cg_info = cg .get_serialized_info ()
0 commit comments