-
Notifications
You must be signed in to change notification settings - Fork 2
refactor: amplicon_covs.py #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
florianzwagemaker
merged 1 commit into
refactor_scripts_dir
from
IDSBIO-472-refactor-amplicon-covs
May 26, 2025
+701
−275
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Empty file.
240 changes: 240 additions & 0 deletions
240
ViroConstrictor/workflow/scripts/amplicon_arg_parser.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
""" | ||
This module provides functions for parsing and validating command-line arguments for the amplicon_covs script. | ||
|
||
The script requires the following input files: | ||
- A BED file with primers as given by AmpliGone. | ||
- A TSV file with coverages as given by TrueConsense. | ||
- A sample ID. | ||
- An output TSV file for average coverage per amplicon. | ||
|
||
The module includes the following functions: | ||
- _common_file_checks: Perform common checks on a file to ensure it exists, is not empty, and is readable. | ||
- check_primers_file: Validate the primers file to ensure it meets the required criteria. | ||
- check_coverages_file: Validate the coverages file to ensure it meets the required criteria. | ||
- check_output_file: Validate the output file to ensure it meets the required criteria. | ||
- parse_args: Parse and validate command-line arguments for the script. | ||
|
||
Each function raises an ArgumentTypeError if the input does not meet the required criteria. | ||
|
||
Example usage: | ||
-------------- | ||
To use this module, import it and call the parse_args function with the appropriate arguments. | ||
|
||
from amplicon_arg_parser import parse_args | ||
|
||
args = parse_args() | ||
print(args.primers) | ||
print(args.coverages) | ||
print(args.key) | ||
print(args.output) | ||
""" | ||
|
||
import os | ||
from argparse import ArgumentParser, ArgumentTypeError, Namespace | ||
|
||
|
||
def _common_file_checks(filename: str) -> None: | ||
""" | ||
Perform common checks on a file to ensure it exists, is not empty, and is readable. | ||
|
||
This function raises an ArgumentTypeError if any of the following conditions are not met: | ||
- The file exists. | ||
- The file is not empty. | ||
- The file is readable. | ||
|
||
Parameters | ||
---------- | ||
filename : str | ||
The path to the file to be checked. | ||
|
||
Raises | ||
------ | ||
ArgumentTypeError | ||
If the file does not exist, is empty, or is not readable. | ||
""" | ||
if not os.path.isfile(filename): | ||
raise ArgumentTypeError(f"File '{filename}' does not exist.") | ||
|
||
if os.path.getsize(filename) == 0: | ||
raise ArgumentTypeError(f"File '{filename}' is empty.") | ||
|
||
if not os.access(filename, os.R_OK): | ||
raise ArgumentTypeError(f"File '{filename}' is not readable.") | ||
|
||
|
||
def check_primers_file(filename: str) -> str: | ||
""" | ||
Validate the primers file to ensure it meets the required criteria. | ||
Does not open the file or check its contents. | ||
|
||
This function performs the following checks: | ||
- The file exists, is not empty, and is readable (using _common_file_checks). | ||
- The file has a .bed extension. | ||
|
||
Parameters | ||
---------- | ||
filename : str | ||
The path to the primers file to be checked. | ||
|
||
Returns | ||
------- | ||
str | ||
The validated filename. | ||
|
||
Raises | ||
------ | ||
ArgumentTypeError | ||
If the file does not exist, is empty, is not readable, or does not have a .bed extension. | ||
""" | ||
_common_file_checks(filename) | ||
if not filename.lower().endswith(".bed"): | ||
raise ArgumentTypeError("Primers file must be a BED file.") | ||
|
||
return filename | ||
|
||
|
||
def check_coverages_file(filename: str) -> str: | ||
""" | ||
Validate the coverages file to ensure it meets the required criteria. | ||
Does not open the file or check its contents. | ||
|
||
This function performs the following checks: | ||
- The file exists, is not empty, and is readable (using _common_file_checks). | ||
- The file has a .tsv extension. | ||
|
||
Parameters | ||
---------- | ||
filename : str | ||
The path to the coverages file to be checked. | ||
|
||
Returns | ||
------- | ||
str | ||
The validated filename. | ||
|
||
Raises | ||
------ | ||
ArgumentTypeError | ||
If the file does not exist, is empty, is not readable, or does not have a .tsv extension. | ||
""" | ||
_common_file_checks(filename) | ||
|
||
if not filename.lower().endswith(".tsv"): | ||
raise ArgumentTypeError("Coverages file must be a TSV file.") | ||
|
||
return filename | ||
|
||
|
||
def check_output_file(filename: str) -> str: | ||
""" | ||
Validate the output file to ensure it meets the required criteria. | ||
Does not open the file or check its contents. | ||
|
||
This function performs the following checks: | ||
- The file has a .csv extension. | ||
- The file does not already exist. | ||
- The directory containing the file is writable. | ||
|
||
Parameters | ||
---------- | ||
filename : str | ||
The path to the output file to be checked. | ||
|
||
Returns | ||
------- | ||
str | ||
The validated filename. | ||
|
||
Raises | ||
------ | ||
ArgumentTypeError | ||
If the file does not have a .csv extension, already exists, or the directory is not writable. | ||
""" | ||
if not filename.lower().endswith(".csv"): | ||
raise ArgumentTypeError("Output file must be a CSV file.") | ||
|
||
if os.path.isfile(filename): | ||
raise ArgumentTypeError( | ||
f"Output file '{filename}' already exists. Please choose another name." | ||
) | ||
|
||
if not os.access(os.path.dirname(filename), os.W_OK): | ||
raise ArgumentTypeError( | ||
f"Directory '{os.path.dirname(filename)}' is not writable." | ||
) | ||
|
||
return filename | ||
|
||
|
||
def parse_args(args: list[str] | None = None) -> Namespace: | ||
""" | ||
Parse command-line arguments for the amplicov_covs script. | ||
|
||
This function sets up the argument parser and defines the required arguments for the script. | ||
It validates the input files using the specified check functions. | ||
|
||
Parameters | ||
---------- | ||
args : list[str], optional | ||
A list of command-line arguments to parse. If None, the arguments are taken from sys.argv. | ||
This parameter is used for testing purposes. | ||
|
||
Returns | ||
------- | ||
Namespace | ||
An argparse.Namespace object containing the parsed arguments. | ||
|
||
Arguments | ||
--------- | ||
--primers : File | ||
Input BED file with primers as given by AmpliGone. This file is validated by check_primers_file. | ||
|
||
--coverages : File | ||
Input file with coverages as given by TrueConsense. This file is validated by check_coverages_file. | ||
|
||
--key : String | ||
Sample ID. | ||
|
||
--output : File | ||
Output file with average coverage per amplicon. This file is validated by check_output_file. | ||
|
||
Raises | ||
------ | ||
ArgumentTypeError | ||
If any of the input files do not meet the required criteria. | ||
""" | ||
parser = ArgumentParser() | ||
|
||
parser.add_argument( | ||
"--primers", | ||
metavar="File", | ||
type=check_primers_file, | ||
help="input BED file with primers as given by AmpliGone", | ||
required=True, | ||
) | ||
|
||
parser.add_argument( | ||
"--coverages", | ||
metavar="File", | ||
type=check_coverages_file, | ||
help="Input file with coverages as given by TrueConsense", | ||
required=True, | ||
) | ||
|
||
parser.add_argument( | ||
"--key", | ||
metavar="String", | ||
type=str, | ||
help="Sample ID", | ||
required=True, | ||
) | ||
|
||
parser.add_argument( | ||
"--output", | ||
metavar="File", | ||
type=check_output_file, | ||
help="Output file with average coverage per amplicon", | ||
required=True, | ||
) | ||
|
||
return parser.parse_args(args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,295 +1,481 @@ | ||
import argparse | ||
import sys | ||
""" | ||
This module provides functions and classes for parsing and validating command-line arguments, | ||
processing primer data, calculating amplicon start and end positions, and computing mean coverage | ||
for each amplicon. | ||
The script requires the following input files: | ||
- A BED file with primers as given by AmpliGone. | ||
- A TSV file with coverages as given by TrueConsense. | ||
- A sample ID. | ||
- An output CSV file for average coverage per amplicon. | ||
The module includes the following components: | ||
- ReadDirection: An enum class to standardize read directions. | ||
- standardize_direction: Function to standardize read direction strings. | ||
- open_tsv_file: Function to open a TSV file and return its contents as a pandas DataFrame. | ||
- split_primer_names: Function to split primer names in the DataFrame into separate columns. | ||
- calculate_amplicon_start_end: Function to calculate the start and end positions of amplicons. | ||
- create_amplicon_names_list: Function to create a list of unique amplicon names. | ||
- write_output: Function to write the calculated amplicon sizes to a CSV file. | ||
- main: Main function to execute the amplicon coverage calculation script. | ||
Example usage: | ||
-------------- | ||
To use this module, import it and call the main function with the appropriate arguments. | ||
from amplicon_covs import main | ||
if __name__ == "__main__": | ||
main() | ||
Command-line usage: | ||
------------------- | ||
To run the script from the command line: | ||
$ python amplicon_covs.py --primers primers.bed --coverages coverages.tsv --key sample1 --output output.csv | ||
""" | ||
|
||
from enum import Enum | ||
|
||
import pandas as pd | ||
from amplicon_arg_parser import parse_args | ||
|
||
|
||
class ReadDirection(Enum): | ||
""" | ||
An enum class to standardize read directions. | ||
Attributes | ||
---------- | ||
FORWARD : list of str | ||
List of strings representing forward read directions. | ||
REVERSE : list of str | ||
List of strings representing reverse read directions. | ||
Notes | ||
----- | ||
Add any new read directions to the respective value lists. | ||
""" | ||
|
||
FORWARD = ["FW", "F", "1", "LEFT", "POSITIVE", "FORWARD", "PLUS"] | ||
REVERSE = ["RV", "R", "2", "RIGHT", "NEGATIVE", "REVERSE", "MINUS"] | ||
|
||
|
||
def standardize_direction(direction: str) -> str: | ||
""" | ||
Standardizes the given read direction string to a standardized direction name. | ||
Also needs the ReadDirection class to be defined. | ||
Parameters | ||
---------- | ||
direction : str | ||
The read direction string to be standardized. | ||
Returns | ||
------- | ||
str | ||
The standardized direction name, either 'FORWARD' or 'REVERSE'. | ||
Raises | ||
------ | ||
ValueError | ||
If the given direction does not match any recognized read direction. | ||
Examples | ||
-------- | ||
>>> standardize_direction('F') | ||
'FORWARD' | ||
>>> standardize_direction('RIGHT') | ||
'REVERSE' | ||
""" | ||
for read_direction in ReadDirection: # ReadDirection.FORWARD, ReadDirection.REVERSE | ||
if direction.upper() in read_direction.value: | ||
return read_direction.name | ||
raise ValueError( | ||
f"Direction {direction} does not contain a recognized read direction." | ||
f"You can use {ReadDirection.FORWARD.value} for forward reads and {ReadDirection.REVERSE.value} for reverse reads." | ||
) | ||
|
||
args = argparse.ArgumentParser() | ||
|
||
args.add_argument( | ||
"--primers", | ||
metavar="File", | ||
type=str, | ||
help="input BED file with primers as given by AmpliGone", | ||
required=True, | ||
) | ||
|
||
args.add_argument( | ||
"--coverages", | ||
metavar="File", | ||
type=str, | ||
help="Input file with coverages as given by TrueConsense", | ||
required=True, | ||
) | ||
|
||
args.add_argument("--key", metavar="String", type=str, help="Sample ID", required=True) | ||
|
||
args.add_argument( | ||
"--output", | ||
metavar="File", | ||
type=str, | ||
help="Output file with average coverage per amplicon", | ||
required=True, | ||
) | ||
|
||
flags = args.parse_args() | ||
|
||
|
||
def split_frames(df): | ||
left = ["LEFT", "PLUS", "POSITIVE", "FORWARD"] | ||
right = ["RIGHT", "MINUS", "NEGATIVE", "REVERSE"] | ||
|
||
leftdf = pd.DataFrame(columns=df.columns) | ||
rightdf = pd.DataFrame(columns=df.columns) | ||
|
||
for x in df.itertuples(): | ||
if any(l in x[1] for l in left): | ||
leftdf = pd.concat( | ||
[ | ||
leftdf, | ||
pd.DataFrame( | ||
{"name": x.name, "start": x.start, "end": x.end}, index=[0] | ||
), | ||
], | ||
ignore_index=True, | ||
) | ||
if any(r in x[1] for r in right): | ||
rightdf = pd.concat( | ||
[ | ||
rightdf, | ||
pd.DataFrame( | ||
{"name": x.name, "start": x.start, "end": x.end}, index=[0] | ||
), | ||
], | ||
ignore_index=True, | ||
) | ||
|
||
leftdf.reset_index(inplace=True) | ||
rightdf.reset_index(inplace=True) | ||
|
||
leftdf = leftdf.drop(columns=["index"]) | ||
rightdf = rightdf.drop(columns=["index"]) | ||
|
||
return leftdf, rightdf | ||
|
||
|
||
def remove_keyword(prname): | ||
keywords = [ | ||
"LEFT", | ||
"RIGHT", | ||
"PLUS", | ||
"MINUS", | ||
"POSITIVE", | ||
"NEGATIVE", | ||
"FORWARD", | ||
"REVERSE", | ||
] | ||
sname = prname.split("_") | ||
for y, z in enumerate(sname): | ||
if z in keywords: | ||
sname.pop(y) | ||
return "_".join(sname) | ||
|
||
|
||
def remove_alt_keyword(df): | ||
for x, y in enumerate(df["name"]): | ||
if "alt" in y: | ||
z = y.split("_") | ||
for a, b in enumerate(z): | ||
if "alt" in b: | ||
z.pop(a) | ||
z = "_".join(z) | ||
df.at[x, "name"] = z | ||
return df | ||
def open_tsv_file(filename: str, index_col: int | None = None) -> pd.DataFrame: | ||
""" | ||
Opens a TSV file and returns its contents as a pandas DataFrame. | ||
This function checks if the file is properly tab-separated and contains no NaN values. | ||
def index_to_remove_ends(one, indexone, two, indextwo): | ||
end_one = one.get("end") | ||
end_two = two.get("end") | ||
if end_one > end_two: | ||
return indextwo | ||
if end_two > end_one: | ||
return indexone | ||
return None | ||
|
||
|
||
def index_to_remove_starts(one, indexone, two, indextwo): | ||
start_one = one.get("start") | ||
start_two = two.get("start") | ||
if start_one < start_two: | ||
return indextwo | ||
if start_two < start_one: | ||
return indexone | ||
return None | ||
|
||
|
||
def remove_alt_primer_l(df): | ||
xx = df.to_dict(orient="records") | ||
to_rm = [] | ||
lastindex = list(enumerate(xx))[-1][0] if xx else -1 | ||
for a, x in enumerate(xx): | ||
if a != lastindex and xx[a].get("name") == xx[a + 1].get("name"): | ||
rm_indx = index_to_remove_ends(xx[a], a, xx[a + 1], a + 1) | ||
if rm_indx is not None: | ||
to_rm.append(rm_indx) | ||
return df.drop(to_rm) | ||
|
||
|
||
def remove_alt_primer_r(df): | ||
xx = df.to_dict(orient="records") | ||
to_rm = [] | ||
lastindex = list(enumerate(xx))[-1][0] if xx else -1 | ||
for a, x in enumerate(xx): | ||
if a != lastindex and xx[a].get("name") == xx[a + 1].get("name"): | ||
rm_indx = index_to_remove_starts(xx[a], a, xx[a + 1], a + 1) | ||
if rm_indx is not None: | ||
to_rm.append(rm_indx) | ||
return df.drop(to_rm) | ||
|
||
|
||
def Find_NonOverlap(df): | ||
if not df.empty: | ||
dd = df.to_dict(orient="records") | ||
startingpoint = {} | ||
endingpoint = {} | ||
lastindex = list(enumerate(dd))[-1][0] | ||
firstindex = list(enumerate(dd))[0][0] | ||
for x, v in enumerate(dd): | ||
t_end = v.get("rightstart") | ||
s = dd[x - 1].get("rightstart") if x != firstindex else v.get("leftend") | ||
end_override = dd[x + 1].get("leftend") if x != lastindex else None | ||
primerstart = s | ||
if end_override is not None and end_override in range(primerstart, t_end): | ||
primerend = end_override | ||
else: | ||
primerend = t_end | ||
startingpoint[primerstart] = v.get("name") | ||
endingpoint[primerend] = v.get("name") | ||
|
||
startdf = ( | ||
pd.DataFrame.from_dict(startingpoint, orient="index") | ||
.reset_index() | ||
.rename(columns={0: "name", "index": "unique_start"}) | ||
) | ||
enddf = ( | ||
pd.DataFrame.from_dict(endingpoint, orient="index") | ||
.reset_index() | ||
.rename(columns={0: "name", "index": "unique_end"}) | ||
) | ||
df = pd.merge(df, startdf, on="name", how="inner") | ||
df = pd.merge(df, enddf, on="name", how="inner") | ||
return df | ||
else: | ||
return pd.DataFrame(columns=["name", "leftstart", "leftend", "rightstart", "rightend", "unique_start", "unique_end"]) | ||
|
||
|
||
def avg(lst): | ||
return round(sum(lst) / len(lst), 2) | ||
|
||
|
||
def Average_cov(primers, covs): | ||
covd = covs.to_dict(orient="records") | ||
primd = primers.to_dict(orient="records") | ||
averages = {} | ||
|
||
for v in primd: | ||
prstart = v.get("unique_start") | ||
prend = v.get("unique_end") | ||
pr_range = list(range(prstart, prend)) | ||
localcov = [covd[i].get("cov") for i in pr_range] | ||
averages[avg(localcov)] = v.get("name") | ||
|
||
avgdf = ( | ||
pd.DataFrame.from_dict(averages, orient="index") | ||
.reset_index() | ||
.rename(columns={0: "name", "index": "avg_cov"}) | ||
) | ||
primers = pd.merge(primers, avgdf, on="name", how="inner") | ||
When the input is parsed by amplicon_arg_parser, the file is guaranteed to exist and be readable. So there are no checks for that. | ||
return primers | ||
Parameters | ||
---------- | ||
filename : str | ||
The path to the TSV file to be opened. | ||
index_col : int or None, optional | ||
Column to set as index (default is None). | ||
def pad_name(name): | ||
name = name.split("_") | ||
name[-1] = name[-1].zfill(3) | ||
return "_".join(name) | ||
Returns | ||
------- | ||
pd.DataFrame | ||
The contents of the TSV file as a pandas DataFrame. | ||
Raises | ||
------ | ||
ValueError | ||
If the file contains NaN values. | ||
if __name__ == "__main__": | ||
covs = pd.read_csv( | ||
flags.coverages, sep="\t", names=["position", "cov"], index_col="position" | ||
) | ||
Examples | ||
-------- | ||
>>> type(open_tsv_file('example.tsv')) | ||
<class 'pandas.core.frame.DataFrame'> | ||
""" | ||
df = pd.read_csv(filename, sep="\t", header=None, index_col=index_col) | ||
if df.isnull().to_numpy().any(): | ||
raise ValueError(f"File {filename} contains NaN values.") | ||
return df | ||
|
||
try: | ||
prims = pd.read_csv( | ||
flags.primers, sep="\t", usecols=[1, 2, 3], names=["start", "end", "name"] | ||
)[["name", "start", "end"]] | ||
prims = prims.sort_values(by="start").reindex() | ||
|
||
except Exception: | ||
print("Error reading primers file") | ||
with open(flags.output, "w") as f: | ||
f.write( | ||
f"""name, | ||
{flags.key}, | ||
""" | ||
|
||
def split_primer_names(df: pd.DataFrame) -> pd.DataFrame: | ||
""" | ||
Splits the primer names in the DataFrame into separate columns. | ||
This function processes each row of the DataFrame by splitting the values in the 'split_name_list' column | ||
into 'name', 'count', 'alt', and 'direction' columns. | ||
Parameters | ||
---------- | ||
df : pd.DataFrame | ||
A pandas DataFrame containing primer data. The primer names should be in the third column (index 3). | ||
Returns | ||
------- | ||
pd.DataFrame | ||
The modified DataFrame with 'name', 'count', 'alt', and 'direction' columns populated. | ||
Raises | ||
------ | ||
ValueError | ||
If the 'split_name_list' column does not contain the expected number of elements (3 or 4). | ||
Examples | ||
-------- | ||
>>> df = pd.DataFrame({3: ["NAME_NUMBER_DIRECTION", "NAME_NUMBER_ALT_DIRECTION"]}) | ||
>>> split_primer_names(df) | ||
name count alt direction | ||
0 NAME NUMBER None DIRECTION | ||
1 NAME NUMBER ALT DIRECTION | ||
""" | ||
|
||
def _process_primer_row(row: pd.Series) -> pd.Series: | ||
""" | ||
Processes a row of primer data by splitting the values from the 'split_name_list' column into their own column. | ||
Parameters | ||
---------- | ||
row : pd.Series | ||
A pandas Series representing a row of primer data. The 'split_name_list' column should contain | ||
a list of strings split from the primer name. | ||
Returns | ||
------- | ||
pd.Series | ||
The modified row with 'name', 'count', 'alt', and 'direction' columns populated. | ||
Raises | ||
------ | ||
ValueError | ||
If the 'split_name_list' column does not contain the expected number of elements (3 or 4). | ||
Examples | ||
-------- | ||
>>> row = pd.Series({"split_name_list": ["NAME", "NUMBER", "DIRECTION"]}) | ||
>>> process_primer_row(row) | ||
name NAME | ||
count NUMBER | ||
alt None | ||
direction DIRECTION | ||
dtype: object | ||
""" | ||
split_names = row["split_name_list"] | ||
if len(split_names) == 4: | ||
row["name"], row["count"], row["alt"], row["direction"] = row[ | ||
"split_name_list" | ||
] | ||
elif len(split_names) == 3: | ||
row["name"], row["count"], row["alt"], row["direction"] = ( | ||
split_names[0], | ||
split_names[1], | ||
None, | ||
split_names[2], | ||
) | ||
sys.exit() | ||
|
||
if len(prims) <= 1: | ||
print("Primers file is empty, writing output and exiting...") | ||
with open(flags.output, "w") as f: | ||
f.write( | ||
f"""name, | ||
{flags.key}, | ||
""" | ||
else: | ||
raise ValueError( | ||
f"Primer name {row[3]} does not contain the expected number of underscores. " | ||
"It should either be NAME_NUMBER_DIRECTION or NAME_NUMBER_ALT_DIRECTION." | ||
) | ||
sys.exit() | ||
|
||
lf, rf = split_frames(prims) | ||
|
||
lf["name"] = lf["name"].apply(remove_keyword) | ||
rf["name"] = rf["name"].apply(remove_keyword) | ||
|
||
lf = remove_alt_primer_l(remove_alt_keyword(lf)) | ||
rf = remove_alt_primer_r(remove_alt_keyword(rf)) | ||
|
||
# if either lf or rf is empty, write empty csv and exit | ||
# csv will have one row with index "flags.key" and an empty value, no column name | ||
if len(lf) == 0 or len(rf) == 0: | ||
df = pd.DataFrame({flags.key: [None]}) | ||
print(df) | ||
df.to_csv(flags.output, sep=",", index=False, header=False) | ||
sys.exit(0) | ||
|
||
non_overlapping_points = Find_NonOverlap( | ||
pd.merge(lf, rf, on="name", how="inner") | ||
.rename( | ||
columns={ | ||
"start_x": "leftstart", | ||
"end_x": "leftend", | ||
"start_y": "rightstart", | ||
"end_y": "rightend", | ||
} | ||
) | ||
.drop_duplicates(subset="name") | ||
return row | ||
|
||
df["split_name_list"] = df[3].str.split("_", expand=False) | ||
df = df.apply(_process_primer_row, axis=1) | ||
return df.drop(columns=["split_name_list"]) | ||
|
||
|
||
def calculate_amplicon_start_end(primers: pd.DataFrame) -> pd.DataFrame: | ||
""" | ||
Calculates the start and end positions of amplicons based on primer data. | ||
This function processes the primer data to ensure that the start and end of an amplicon | ||
do not contain overlapping primers. It creates new rows with the minimum start and maximum end | ||
positions for each amplicon that has ALT primers, and then calculates the start and end positions | ||
for each amplicon. | ||
Parameters | ||
---------- | ||
primers : pd.DataFrame | ||
A pandas DataFrame containing primer data. The DataFrame should have columns for 'count', | ||
'alt', 'direction', and primer start and end positions. | ||
Returns | ||
------- | ||
pd.DataFrame | ||
A pandas DataFrame with columns 'amplicon_number', 'start', and 'end', representing the | ||
calculated start and end positions for each amplicon. | ||
Examples | ||
-------- | ||
>>> primers = pd.DataFrame({ | ||
... 'count': [1, 1, 2, 2], | ||
... 'alt': [None, 'ALT', None, 'ALT'], | ||
... 'direction': ['FORWARD', 'FORWARD', 'REVERSE', 'REVERSE'], | ||
... 1: [10, 20, 30, 40], | ||
... 2: [50, 60, 70, 80], | ||
... 3: ['name1', 'name2', 'name3', 'name4'] | ||
... }) | ||
>>> calculate_amplicon_start_end(primers) | ||
amplicon_number start end | ||
0 1 10 60 | ||
1 2 30 80 | ||
""" | ||
|
||
def _create_minmax_row(group: pd.DataFrame, direction: str) -> pd.Series: | ||
""" | ||
Creates a new row with the minimum of the primer start and the maximum of the primer end. | ||
This way the start and end of an amplicon will never contain overlapping primers. | ||
Parameters | ||
---------- | ||
group : pd.DataFrame | ||
A pandas DataFrame containing the group of primer data. | ||
direction : str | ||
The direction of the read (e.g., 'FORWARD' or 'REVERSE'). | ||
Returns | ||
------- | ||
pd.Series | ||
A pandas Series representing the new row with '_MINMAX' appended to the third column, | ||
the minimum value of the column with the primer start (column[1]), the maximum value of primer end (column[2]), | ||
and the direction. | ||
Examples | ||
-------- | ||
>>> group = pd.DataFrame({ | ||
... 1: [10, 20, 30], | ||
... 2: [40, 50, 60], | ||
... 3: ['SC2_LEFT', 'SC2_LEFT_ALT1', 'SC2_LEFT_ALT2'], | ||
... 4: ['FORWARD', 'FORWARD', 'FORWARD'] | ||
... }) | ||
>>> _create_minmax_row(group, 'FORWARD') | ||
1 10 | ||
2 60 | ||
3 SC2_LEFT_MINMAX | ||
4 FORWARD | ||
dtype: object | ||
""" | ||
new_row = group.iloc[0].copy() | ||
new_row[3] = new_row[3] + "_MINMAX" | ||
new_row[1] = group.loc[group[1].idxmin(), 1] | ||
new_row[2] = group.loc[group[2].idxmax(), 2] | ||
new_row[4] = direction | ||
return new_row | ||
|
||
df = pd.DataFrame(primers.loc[:, "count"].unique(), columns=["amplicon_number"]) | ||
|
||
for amplicon_number in df["amplicon_number"]: | ||
amplicon_number_group = primers[primers["count"] == amplicon_number] | ||
if "ALT" in list(amplicon_number_group["alt"]): | ||
fw_group = amplicon_number_group[ | ||
amplicon_number_group["direction"] == ReadDirection.FORWARD.name | ||
] | ||
rv_group = amplicon_number_group[ | ||
amplicon_number_group["direction"] == ReadDirection.REVERSE.name | ||
] | ||
|
||
new_fw_row = _create_minmax_row(fw_group, ReadDirection.FORWARD.name) | ||
new_rv_row = _create_minmax_row(rv_group, ReadDirection.REVERSE.name) | ||
|
||
primers.loc[len(primers)] = new_fw_row | ||
primers.loc[len(primers)] = new_rv_row | ||
|
||
idxes_to_drop = fw_group.index.append(rv_group.index) | ||
primers = primers.drop(idxes_to_drop) | ||
|
||
# x = amplicon number | ||
# rows[1] = start primer | ||
# rows[2] = end primer | ||
# .values[0] = start position primer | ||
# .values[1] = end position primer | ||
if int(amplicon_number) == 1: # make exceptions for first and last amplicon | ||
df.loc[df["amplicon_number"] == amplicon_number, "start"] = primers[ | ||
primers["count"] == amplicon_number | ||
][1].values[ | ||
0 | ||
] # no modification needed | ||
else: | ||
df.loc[df["amplicon_number"] == amplicon_number, "start"] = ( | ||
primers[primers["count"] == str(int(amplicon_number) - 1)][2].values[1] | ||
+ 1 | ||
) # add 1 to avoid overlapping primers | ||
if int(amplicon_number) == len(df["amplicon_number"]): | ||
df.loc[df["amplicon_number"] == amplicon_number, "end"] = primers[ | ||
primers["count"] == amplicon_number | ||
][2].values[1] | ||
else: | ||
df.loc[df["amplicon_number"] == amplicon_number, "end"] = primers[ | ||
primers["count"] == str(int(amplicon_number) + 1) | ||
][1].values[ | ||
0 | ||
] # no modification needed for end position | ||
return df | ||
|
||
|
||
def create_amplicon_names_list(primers: pd.DataFrame) -> list[str]: | ||
""" | ||
Create a list of unique amplicon names based on the primers DataFrame. | ||
This function generates a list of amplicon names by appending unique counts to the base amplicon name. | ||
Parameters | ||
---------- | ||
primers : pd.DataFrame | ||
A pandas DataFrame containing primer information. It must have columns "name" and "count". | ||
Returns | ||
------- | ||
list[str] | ||
A list of unique amplicon names. | ||
Examples | ||
-------- | ||
>>> primers = pd.DataFrame({"name": ["amplicon1", "amplicon1"], "count": [1, 2]}) | ||
>>> create_amplicon_names_list(primers) | ||
['amplicon1_1', 'amplicon1_2'] | ||
""" | ||
amplicon_name = primers.loc[0, "name"] | ||
return [f"{amplicon_name}_{x}" for x in primers["count"].unique()] | ||
|
||
|
||
def write_output(amplicon_sizes: pd.DataFrame, output_file: str) -> None: | ||
""" | ||
Writes the calculated amplicon sizes to a CSV file. | ||
Parameters | ||
---------- | ||
amplicon_sizes : pd.DataFrame | ||
A pandas DataFrame containing amplicon size data. | ||
output_file : str | ||
The path to the output file to be written. | ||
""" | ||
amplicon_sizes.to_csv(output_file, sep=",", index=True) | ||
|
||
|
||
def main(args: list[str] | None = None) -> None: | ||
""" | ||
Main function to execute the amplicon coverage calculation script. | ||
This function parses command-line arguments, reads input files, processes primer data, | ||
calculates amplicon start and end positions, computes mean coverage for each amplicon, | ||
and writes the results to an output file. | ||
Parameters | ||
---------- | ||
args : list[str], optional | ||
A list of command-line arguments to parse. If None, the arguments are taken from sys.argv. | ||
This parameter is used for testing purposes. | ||
Workflow | ||
-------- | ||
1. Parse command-line arguments using parse_args. | ||
2. Open and read the primers file using open_tsv_file. | ||
3. Split primer names into separate columns using split_primer_names. | ||
4. Standardize primer directions using standardize_direction. | ||
5. Calculate amplicon start and end positions using calculate_amplicon_start_end. | ||
6. Open and read the coverages file using open_tsv_file. | ||
7. Calculate mean coverage for each amplicon. | ||
8. Create a list of unique amplicon names using create_amplicon_names_list. | ||
9. Write the final DataFrame to the output file using write_output. | ||
Examples | ||
-------- | ||
To run the script from the command line: | ||
$ python amplicon_covs.py --primers primers.bed --coverages coverages.tsv --key sample1 --output output.csv | ||
""" | ||
flags = parse_args(args) | ||
primers = open_tsv_file(flags.primers) | ||
primers = split_primer_names(primers) | ||
primers["direction"] = primers.loc[:, "direction"].apply(standardize_direction) | ||
amplicon_sizes = calculate_amplicon_start_end(primers) | ||
|
||
coverages = open_tsv_file(flags.coverages, index_col=0) | ||
|
||
def calculate_mean(input_array: pd.Series, coverages: pd.DataFrame) -> pd.Series: | ||
""" | ||
Calculates the mean coverage for a given amplicon. | ||
This function computes the mean of all nucleotide coverages for the specified amplicon. | ||
It takes into account both complete and partial reads to measure primer efficiency. | ||
Parameters | ||
---------- | ||
input_array : pd.Series | ||
A pandas Series representing a row from the amplicon sizes DataFrame. It should contain | ||
the start and end positions of the amplicon. | ||
coverages : pd.DataFrame | ||
A pandas DataFrame containing coverage data. The index should represent nucleotide positions. | ||
Returns | ||
------- | ||
pd.Series | ||
A pandas Series with the mean coverage for the specified amplicon. | ||
Examples | ||
-------- | ||
>>> input_array = pd.Series({"start": 10, "end": 20}) | ||
>>> coverages = pd.DataFrame({"coverage": [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]}) | ||
>>> calculate_mean(input_array, coverages) | ||
coverage 55.0 | ||
dtype: float64 | ||
""" | ||
return coverages.iloc[ | ||
int(input_array.iloc[1]) - 1 : int(input_array.iloc[2]) | ||
].mean() | ||
|
||
amplicon_sizes["coverage"] = amplicon_sizes.apply( | ||
lambda x: calculate_mean(x, coverages), axis=1 | ||
) | ||
|
||
with_average = Average_cov(non_overlapping_points, covs) | ||
amplicon_sizes["amplicon_names"] = create_amplicon_names_list(primers) | ||
|
||
with_average = with_average.drop( | ||
columns=[ | ||
"leftstart", | ||
"leftend", | ||
"rightstart", | ||
"rightend", | ||
"unique_start", | ||
"unique_end", | ||
] | ||
).rename(columns={"avg_cov": flags.key}) | ||
final_df = pd.DataFrame( | ||
[amplicon_sizes["coverage"].values], | ||
columns=[amplicon_sizes["amplicon_names"]], | ||
index=[flags.key], | ||
) | ||
|
||
# ensure the values like "MeV_1" or "MeV_19" in column "name" are padded like 001, 002, 003, etc. | ||
with_average["name"] = with_average["name"].apply(pad_name) | ||
write_output(final_df, flags.output) | ||
|
||
with_average = with_average.transpose() | ||
|
||
with_average.to_csv(flags.output, sep=",", index=True, header=False) | ||
if __name__ == "__main__": | ||
main() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this might be a bit of a nitpick but we probably should change some of the function names and/or classnames in this file before we go to include it in the 1.6.0 release.
I think the structure of this file is already a big improvement when it comes to readability and maintainability in the long term, but using something like
ReadDirection
as a classname, combined with its description, might cause confusion in future maintenance as this class (and actually the entire script) is not actually dealing with the direction of reads, but the direction of primers.