Skip to content

Commit 56ff953

Browse files
committed
migration: add gc collect
1 parent ac71248 commit 56ff953

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# pylint: disable=invalid-name, missing-docstring, c-extension-no-member
22

3-
import logging, random, time, os
3+
import logging, random, time, os, gc
44
import multiprocessing as mp
55
from collections import defaultdict
66
from datetime import datetime, timezone
@@ -175,6 +175,7 @@ def _update_cross_edges_helper(args):
175175
edges.PARENTS_CACHE.clear()
176176
edges.CHILDREN_CACHE.clear()
177177
cg.client.write(rows)
178+
gc.collect()
178179

179180

180181
def update_chunk(
@@ -215,7 +216,7 @@ def update_chunk(
215216
logging.info(f"total elaspsed time: {time.time() - start}")
216217
return
217218

218-
task_size = int(os.environ.get("TASK_SIZE", 10))
219+
task_size = int(os.environ.get("TASK_SIZE", 1))
219220
chunked_nodes = chunked(nodes, task_size)
220221
chunked_nodes_ts = chunked(nodes_ts, task_size)
221222
cg_info = cg.get_serialized_info()
@@ -225,7 +226,8 @@ def update_chunk(
225226
args = (cg_info, layer, chunk, ts_chunk)
226227
tasks.append(args)
227228

228-
processes = min(mp.cpu_count() * 5, len(tasks))
229+
process_multiplier = int(os.environ.get("PROCESS_MULTIPLIER", 5))
230+
processes = min(mp.cpu_count() * process_multiplier, len(tasks))
229231
logging.info(f"processing {len(nodes)} nodes with {processes} workers.")
230232
with mp.Pool(processes) as pool:
231233
_ = list(
@@ -235,3 +237,4 @@ def update_chunk(
235237
)
236238
)
237239
logging.info(f"total elaspsed time: {time.time() - start}")
240+
gc.collect()

0 commit comments

Comments
 (0)