Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions gridfm_graphkit/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ def main():
default=False,
help="Enable TF32 on Ampere+ GPUs via torch.set_float32_matmul_precision('high').",
)
_mp_context_kwargs = dict(
dest="mp_context",
type=str,
default="spawn",
choices=["spawn", "fork", "forkserver"],
help=(
"Multiprocessing start method for DataLoader workers. "
"'spawn' (default) is safest and works everywhere. "
"'fork' avoids re-importing modules but is unsafe after CUDA init. "
"'forkserver' uses a clean server process but requires file-descriptor passing."
),
)

# ---- TRAIN SUBCOMMAND ----
train_parser = subparsers.add_parser("train", help="Run training")
Expand Down Expand Up @@ -143,6 +155,7 @@ def main():
action="store_true",
help="Print the last training epoch time and a single test metric to stdout.",
)
train_parser.add_argument("--mp_context", **_mp_context_kwargs)

# ---- FINETUNE SUBCOMMAND ----
finetune_parser = subparsers.add_parser("finetune", help="Run fine-tuning")
Expand Down Expand Up @@ -196,6 +209,7 @@ def main():
action="store_true",
help="Print the last training epoch time and a single test metric to stdout.",
)
finetune_parser.add_argument("--mp_context", **_mp_context_kwargs)

# ---- EVALUATE SUBCOMMAND ----
evaluate_parser = subparsers.add_parser(
Expand Down Expand Up @@ -262,6 +276,7 @@ def main():
"--save_output",
action="store_true",
)
evaluate_parser.add_argument("--mp_context", **_mp_context_kwargs)

# ---- PREDICT SUBCOMMAND ----
predict_parser = subparsers.add_parser("predict", help="Run prediction")
Expand Down Expand Up @@ -312,6 +327,7 @@ def main():
default=None,
choices=["simple", "advanced", "pytorch"],
)
predict_parser.add_argument("--mp_context", **_mp_context_kwargs)

# ---- BENCHMARK SUBCOMMAND ----
benchmark_parser = subparsers.add_parser(
Expand Down Expand Up @@ -350,6 +366,7 @@ def main():
default=[],
help="Python packages to import for plugin registration.",
)
benchmark_parser.add_argument("--mp_context", **_mp_context_kwargs)

args = parser.parse_args()

Expand Down
11 changes: 9 additions & 2 deletions gridfm_graphkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def benchmark_cli(args):
args.data_path,
dataset_wrapper=dataset_wrapper,
dataset_wrapper_cache_dir=dataset_wrapper_cache_dir,
multiprocessing_context=getattr(args, "mp_context", "spawn") or "spawn",
)
dm.setup(stage="fit")
setup_time = time.perf_counter() - t0
Expand Down Expand Up @@ -161,6 +162,12 @@ def main_cli(args):
run_name=args.run_name,
)

# When using torch.compile with Triton, dynamic graph support can cause
# out-of-memory errors during autotuning on some kernels.
# Disabling dynamic graph support allows those kernels
# to be skipped gracefully instead of causing errors.
torch._inductor.config.triton.cudagraph_skip_dynamic_graphs = True

with open(args.config, "r") as f:
base_config = yaml.safe_load(f)

Expand Down Expand Up @@ -190,6 +197,7 @@ def main_cli(args):
normalizer_stats_path=normalizer_stats_path,
dataset_wrapper=dataset_wrapper,
dataset_wrapper_cache_dir=dataset_wrapper_cache_dir,
multiprocessing_context=getattr(args, "mp_context", "spawn") or "spawn",
)
model = get_task(config_args, litGrid.data_normalizers)
if args.command != "train":
Expand Down Expand Up @@ -234,9 +242,8 @@ def main_cli(args):
if _accelerator not in ("mps", "cpu") and isinstance(_strategy, str) and _strategy in (
"auto",
"ddp",
"ddp_find_unused_parameters_true",
): # when using mps, we don't want to use ddp.
_strategy = DDPStrategy(find_unused_parameters=True)
_strategy = DDPStrategy(find_unused_parameters=False)

trainer = L.Trainer(
logger=logger,
Expand Down
18 changes: 4 additions & 14 deletions gridfm_graphkit/datasets/hetero_powergrid_datamodule.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,13 @@ def __init__(
normalizer_stats_path: str = None,
dataset_wrapper: str = None,
dataset_wrapper_cache_dir: str = None,
multiprocessing_context: str = "spawn",
):
super().__init__()
self.data_dir = data_dir
self.dataset_wrapper = dataset_wrapper
self.dataset_wrapper_cache_dir = dataset_wrapper_cache_dir
self.multiprocessing_context = multiprocessing_context
self.batch_size = int(args.training.batch_size)
self.split_by_load_scenario_idx = getattr(
args.data,
Expand Down Expand Up @@ -425,20 +427,8 @@ def _dataloader_kwargs(self):
pin_memory=torch.cuda.is_available(),
persistent_workers=num_workers > 0,
)
# Use 'fork' on Linux. It avoids the forkserver intermediary pipe which
# is fragile when the process has many threads (e.g. OpenBLAS). In
# container environments (Kubernetes) fork works correctly. On
# traditional HPC systems with strict fd-passing restrictions the
# original 'forkserver' may be needed, but the pipe truncation it
# produces under thread pressure is worse than the ancdata warning.
if (
num_workers > 0
and torch.multiprocessing.get_start_method(allow_none=True) != "spawn"
):
import platform

if platform.system() == "Linux":
kwargs["multiprocessing_context"] = "fork"
if num_workers > 0:
kwargs["multiprocessing_context"] = self.multiprocessing_context
return kwargs

def train_dataloader(self):
Expand Down
Loading