-
Notifications
You must be signed in to change notification settings - Fork 85
Description
Hi OpenGATE Team,
I am experiencing an issue where child processes randomly hang (freeze) when I try to run multiple GATE simulations in parallel using Python's subprocess module on a Windows system.
I have a "manager" script that launches multiple instances of another Python script (creat_sim.py). This child script imports the opengate library, sets up a simulation, and then runs it.
The Problem
My manager script successfully launches the jobs, and most of them complete correctly. However, a few processes (it seems random) will hang indefinitely. They never exit, so proc.poll() always returns None, and my manager script gets stuck waiting for them to finish.
There are no error messages, as the processes never exit with an error code. They just stop making progress.
Process Management Logic
I suspect the problem might be related to multi-threading or resource conflicts, even though my intention is for each child process to be single-threaded.
My manager script (manager.py) is multi-processing. To avoid resource conflicts, I am explicitly setting each child simulation in creat_sim.py to use only one thread by calling sim.set_g4_mt_nb_threads(1).
Despite forcing each child process to be single-threaded, the random hanging/freezing still occurs.
Code Example
Here is my manager script (manager.py).
import subprocess
import sys,os
import json
import time
from datetime import datetime
if __name__ == "__main__":
from pathlib import Path
data_path = Path("./data") # Data directory
max_workers = (os.cpu_count() - 2) or 1
procs = []
finished = 0
angles_idx = range(0, 36, 1) # Unit: keV (This is from my original comment)
start_time = datetime.now()
print("======================================================")
# Note: The print statement mentions ProcessPoolExecutor, but the code uses Popen
print(f"MANAGER: Starting parallel simulation using ProcessPoolExecutor")
print(f"MANAGER: Total {len(angles_idx)} tasks, using max {max_workers} CPU cores.")
print(f"Simulation start time :{start_time.strftime("%Y-%m-%d %H:%M:%S")}")
print("======================================================\n")
from tqdm import tqdm
with tqdm(total=len(angles_idx), bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} ') as pbar:
for p in angles_idx:
while len(procs) >= max_workers:
for proc, param in procs:
ret = proc.poll()
if ret is not None:
out, err = proc.communicate()
if ret != 0:
print(f"\nParameter {param} failed to run: {err}\n")
else:
pbar.update(1) # Progress bar +1
procs.remove((proc, param))
time.sleep(0.01)
proc = subprocess.Popen(
[sys.executable, "creat_sim.py", str(p)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
procs.append((proc, p))
time.sleep(0.01)
# Wait for all remaining processes
while len(procs) != 0:
for proc, param in procs:
ret = proc.poll()
if ret is not None:
out, err = proc.communicate()
if ret != 0:
print(f"Parameter {param} failed to run: {err}")
else:
pbar.update(1) # Progress bar +1
procs.remove((proc, param))
time.sleep(0.01)
end_time = datetime.now()
total_time = end_time - start_time
print("\n======================================================")
print("MANAGER: All simulation tasks have been executed!")
print(f"Simulation end time :{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}")
print(f"Total time: {total_time}.")
print("======================================================")
Child Script (creat_sim.py)
import sys
import os
import opengate as gate
import numpy as np
from scipy.spatial.transform import Rotation
from pathlib import Path
import json
from dimension import *
from sim_init import sim_init # 导入初始化脚本
from CT_init import CT_init # 导入CT初始化脚本
data_path = Path("./data") # 数据目录
def singleSimRun(i=0,bool_start_new_process=False):
from filelock import FileLock
lockfile = "gobal_init.lock"
with FileLock(lockfile, timeout=300):
with open("config.json", "r", encoding="utf-8") as f:
config = json.load(f)
# 创建模拟实例
sim = gate.Simulation()
sim_init(sim) # 初始化模拟参数
# 定义世界体素(模拟空间)
world = sim.world
patient = sim.add_volume("Box", "patient")
patient.size = [5 * mm, 50 * mm, 50 * mm]
patient.translation = [0 * cm, 0 * cm, 0 * cm]
patient.material = "G4_WATER" # 使用水作为患者材料
patient.color = [0, 0, 1, 1] # this is RGBa (a=alpha=opacity), so blue here
patient.rotation = Rotation.from_euler("x", 0, degrees=True).as_matrix() # 设置患者姿态
gantry, detector_plane , source = CT_init(sim) # 初始化CT组件
# 设置探测器数据采集器
detector_actor = sim.add_actor("FluenceActor", "detector_actor")
detector_actor.attached_to = detector_plane
detector_actor.spacing = [1.6 * mm, 1.6 * mm, 128*1.6 * mm] # 探测器像素大小:厚度10mm,xy平面1mm
detector_actor.size = [1, 128, 1] # 探测器分辨率:1层,242×242像素,完全匹配探测器物理尺寸
# 统计信息收集器
stats = sim.add_actor("SimulationStatisticsActor", "stats")
stats.track_types_flag = True # 记录粒子类型
stats.output_filename = "stats"+str(i)+".txt" # 统计输出文件
stats.write_to_disk = True # 保存到磁盘
# 添加 DigitizerHitsCollectionActor
# hits_actor1 = sim.add_actor("DigitizerHitsCollectionActor", "PhotonHits1")
# hits_actor1.attached_to = detector_plane # 绑定到探测器平面
# hits_actor1.output_filename = "photon_hits.root" # 输出文件名
# hits_actor1.attributes = [
# "KineticEnergy", "PostPosition", "GlobalTime", "ParticleName"
# ] # 记录属性
# # 添加能量过滤器
# energy_filter1 = sim.add_filter("KineticEnergyFilter", "EnergyFilter1")
# energy_filter1.energy_min = 20 * keV # 设置能量下限为 10 keV
# detector_actor.filters.append(energy_filter1) # 将过滤器附加到 Actor
# hits_actor2 = sim.add_actor("DigitizerHitsCollectionActor", "PhotonHits2")
# hits_actor2.attached_to = detector_plane # 绑定到探测器平面
# hits_actor2.output_filename = "photon_hits.root" # 输出文件名
# hits_actor2.attributes = [
# "KineticEnergy", "PostPosition", "GlobalTime", "ParticleName"
# ] # 记录属性
# # 添加能量过滤器
# energy_filter2 = sim.add_filter("KineticEnergyFilter", "EnergyFilter2")
# energy_filter2.energy_min = 40 * keV # 设置能量下限为 10 keV
# hits_actor2.filters.append(energy_filter2) # 将过滤器附加到 Actor
# hits_actor3 = sim.add_actor("DigitizerHitsCollectionActor", "PhotonHits3")
# hits_actor3.attached_to = detector_plane # 绑定到探测器平面
# hits_actor3.output_filename = "photon_hits.root" # 输出文件名
# hits_actor3.attributes = [
# "KineticEnergy", "PostPosition", "GlobalTime", "ParticleName"
# ] # 记录属性
# # 添加能量过滤器
# energy_filter3 = sim.add_filter("KineticEnergyFilter", "EnergyFilter3")
# energy_filter3.energy_min = 60 * keV # 设置能量下限为 10 keV
# hits_actor3.filters.append(energy_filter3) # 将过滤器附加到 Actor
# 运行模拟
detector_actor.output_filename = str(i) + ".mhd" # 输出文件名
angle_deg = -(i/config["geo"]["projectNum"])
patient.rotation = Rotation.from_euler("x", angle_deg, degrees=True).as_matrix() # 设置患者姿态
try:
sim.run(bool_start_new_process)
# print(f"[进程 {os.getpid()}] 完成任务 {i}")
# 返回成功信息或结果文件路径
# return f"成功: 任务 {index} 在角度 {angle}° 完成"
except Exception as e:
print(f"[进程 {os.getpid()}] 任务 {i} 失败: {e}")
finally:
return str(i)
if __name__ == "__main__":
try:
i = int(sys.argv[1]) if len(sys.argv) > 1 else 0
except Exception:
i = 0
singleSimRun(i)