Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions nf_core/configs/create/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from nf_core.configs.create.utils import ConfigsCreateConfig, generate_config_entry
from nf_core.configs.create.serial import NextflowSerial
from re import sub
from pathlib import Path

Expand Down Expand Up @@ -356,7 +357,10 @@ def write_to_file(self):
else:
raise ValueError(f'Invalid config type: {self.config_type}')

serial_data = NextflowSerial.dumps(self.template_config.serial())

with open(filename, "w+") as file:
## Write params
file.write(params_section_str)
file.write(process_section_str)
file.write(serial_data)
#file.write(params_section_str)
#file.write(process_section_str)
60 changes: 60 additions & 0 deletions nf_core/configs/create/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Any, Optional

class NextflowSerial:
def __init__(self, data_dict: Optional[dict], tab_indent: Optional[int] = 4):
self.data_dict = data_dict

def __getitem__(self, key: Optional[Any]) -> Any:
"""Returns value from data"""
return self.data_dict[key]

def __setitem__(self, key: Optional[Any], value: Optional[any]) -> None:
"""Sets value in data"""
self.data_dict[key] = value

@staticmethod
def _stringify(data: Optional[Any], quote: Optional[bool] = True) -> str:
"""Return nextflow compatible value"""
quote = "'"*quote
if isinstance(data, str):
return f"{quote}{data}{quote}"
if isinstance(data, bool):
return 'true' if data else 'false'
if isinstance(data, type(None)):
return 'null'
return str(data) # Fallback on the Python str function if no matches

@staticmethod
def dumps(data_dict: Optional[Any], tab_indent: Optional[int] = 4, current_indent: Optional[int] = 0, end: Optional[str] = '\n', indent_start: Optional[bool] = True, one_line: Optional[bool] = False) -> str:
"""Recursive function to create configuration file"""
output = ""
if isinstance(data_dict, dict):
for k, v in data_dict.items():
if isinstance(v, dict):
output += " "*current_indent*int(indent_start)
output += f"{k} {{\n"
output += NextflowSerial.dumps(v, tab_indent = tab_indent, current_indent = current_indent + tab_indent, end = end)
output += " "*current_indent
output += f"}}{end}"
elif isinstance(v, list):
output += " "*current_indent
output += f"{k} = ["
vi = []
for i in v:
oneliner = bool(len(i.keys()) != 1)
if isinstance(i, dict):
vi.append("{"*oneliner + NextflowSerial.dumps(i, tab_indent, current_indent=0, end = '', indent_start = False, one_line = not oneliner) + "}"*oneliner)
continue
vi.append(NextflowSerial.dumps(i, tab_indent, current_indent=0, end = end))
output += ", ".join(vi)
output += f"]{end}"
else:
output += " "*current_indent*indent_start
if one_line:
# False is set here to disable quotes on strings
output += f"{k}: {NextflowSerial._stringify(v, False)}{end}"
else:
output += f"{k} = {NextflowSerial._stringify(v)}{end}"
return output
else:
return NextflowSerial._stringify(data_dict)
71 changes: 71 additions & 0 deletions nf_core/configs/create/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,77 @@ def __init__(self, /, **data: Any) -> None:
context=_init_context_var.get(),
)

def serial_hpc(self):
"""Returns a dictionary of the config"""
ret = {
"params": {
"config_profile_contact": self.config_profile_contact,
"config_profile_description": self.config_profile_description,
"config_profile_url": self.config_profile_url,
"igenomes_base": self.igenomes_cachedir
},
"process": {
"executor": self.scheduler,
"queue": self.queue,
"resourceLimits": [
{"cpus": int(self.cpus)},
{"memory": self.memory + "GB"},
{"time": str(float(self.time)) + "h"}
],
"scratch": self.scratch_dir,
"maxRetries": int(self.retries),
},
self.container_system: {
"enabled": True,
"cacheDir": self.cachedir,
"autoMounts": True
},
"cleanup": self.delete_work_dir
}

return ret

def serial_pipeline(self):
"""Returns a dictionary of the pipeline config"""
ret = {
"params": {
"config_profile_description": self.config_profile_description,
},
"process": {
"cpus": self.default_process_ncpus,
"memory": self.default_process_memgb + "GB",
"time": self.default_process_hours + "h"
}
}

for named in self.named_process_resources.keys():
ret["process"][f"withName: '{named}'"] = {
"cpus": self.named_process_resources[named]["custom_process_ncpus"],
"memory": self.named_process_resources[named]["custom_process_memgb"],
"time": self.named_process_resources[named]["custom_process_hours"] + 'h'
}
if "custom_process_queue" in self.named_process_resources[named].keys():
ret["process"][f"withName: '{named}'"]["queue"] = self.named_process_resources[named]["custom_process_queue"]
if "executor" in self.named_process_resources[named].keys():
ret["process"][f"withName: '{named}'"]["executor"] = self.named_process_resources[named]["custom_process_queue"]
for labelled in self.labelled_process_resources.keys():
ret["process"][f"withLabel: '{labelled}'"] = {
"cpus": self.labelled_process_resources[labelled]["custom_process_ncpus"],
"memory": self.labelled_process_resources[labelled]["custom_process_memgb"],
"time": self.labelled_process_resources[labelled]["custom_process_hours"] + 'h'
}
if "custom_process_queue" in self.labelled_process_resources[labelled].keys():
ret["process"][f"withLabel: '{labelled}'"]["queue"] = self.labelled_process_resources[labelled]["custom_process_queue"]
if "executor" in self.labelled_process_resources[labelled].keys():
ret["process"][f"withLabel: '{labelled}'"]["executor"] = self.labelled_process_resources[labelled]["executor"]
return ret

def serial(self):
if self.is_infrastructure:
return self.serial_hpc()
else:
return self.serial_pipeline()

@field_validator("general_config_name", "config_profile_description")
@classmethod
def notempty(cls, v: str) -> str:
Expand Down
Loading