Skip to content

Commit 2144202

Browse files
Marcel-MueckPMBio
andauthored
added support to add absplice scores manually, add test cases (#170)
* added support to add absplice scores manually, add test cases * fixup! Format Python code with psf/black pull_request * added section in the doc to explain how to add absplice scores manually * added duckdb to environment * sorted values before testing equality in pytest as order may vary * fixup! Format Python code with psf/black pull_request * formatted docstrings * sorted values by gene ids as well * fixup! Format Python code with psf/black pull_request * removed unneccessray/commented out sections * fixup! Format Python code with psf/black pull_request * removed unnecessary/commented out sections * made test data smaller for faster run * pulled for loop out of recursive np.max function * trigger rerun * fixup! Format Python code with psf/black pull_request * minor changes to annotations.py to avoid recursion * fixup! Format Python code with psf/black pull_request * used only values of absplice columns to compute aggregated max --------- Co-authored-by: PMBio <[email protected]>
1 parent 1a151b9 commit 2144202

File tree

19 files changed

+371
-0
lines changed

19 files changed

+371
-0
lines changed

deeprvat/annotations/annotations.py

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from tqdm import tqdm, trange
2121
from fastparquet import ParquetFile
2222
import yaml
23+
import duckdb
24+
25+
import pyarrow as pa
26+
import pyarrow.parquet as pq
2327

2428

2529
def precision(y_true, y_pred):
@@ -2069,3 +2073,253 @@ def select_rename_fill_annotations(
20692073

20702074
if __name__ == "__main__":
20712075
cli()
2076+
2077+
2078+
@cli.command()
2079+
@click.option(
2080+
"--absplice-dir",
2081+
type=click.Path(exists=True, file_okay=False, dir_okay=True),
2082+
required=True,
2083+
help="Path to the directory containing AbSplice outputs",
2084+
)
2085+
@click.option(
2086+
"--ab-splice-agg-score-file",
2087+
type=click.Path(),
2088+
required=True,
2089+
help="Path to save the aggregated AbSplice score file",
2090+
)
2091+
def aggregate_and_concat_absplice(
2092+
absplice_dir: str, ab_splice_agg_score_file: str
2093+
) -> None:
2094+
"""
2095+
Aggregates AbSplice scores from multiple files in a directory and saves them to a single Parquet file.
2096+
This function performs the following steps:
2097+
1. Iterates through all files in the specified directory.
2098+
2. Reads each file and aggregates the AbSplice scores using the specified aggregation function.
2099+
3. Saves the final aggregated scores to a specified Parquet file.
2100+
2101+
Parameters:
2102+
- absplice_dir (str): Path to the directory containing AbSplice outputs.
2103+
- ab_splice_agg_score_file (str): Path to save the aggregated AbSplice score file.
2104+
2105+
Returns:
2106+
None
2107+
"""
2108+
abs_splice_res_dir = Path(absplice_dir)
2109+
2110+
tissue_agg_function = "max"
2111+
if not Path(ab_splice_agg_score_file).exists():
2112+
# logger.info(f"removing existing file {ab_splice_agg_score_file}")
2113+
# os.remove(ab_splice_agg_score_file)
2114+
for i, chrom_file in enumerate(os.listdir(abs_splice_res_dir)):
2115+
logger.info(f"Reading file {chrom_file}")
2116+
# ab_splice_res = pd.read_parquet(abs_splice_res_dir/ chrom_file).reset_index()
2117+
ab_splice_res = pd.read_table(abs_splice_res_dir / chrom_file).reset_index()
2118+
absplice_columns = [
2119+
j for j in ab_splice_res.columns if "AbSplice_DNA_" in j
2120+
]
2121+
#### aggregate tissue specific ab splice scores
2122+
ab_splice_res["AbSplice_DNA"] = np.max(
2123+
ab_splice_res[absplice_columns].values,
2124+
axis=1,
2125+
)
2126+
ab_splice_res["pos"] = ab_splice_res["pos"].astype(int)
2127+
logger.info(f"Number of rows of ab_splice df {len(ab_splice_res)}")
2128+
# merged = ab_splice_res.merge(current_annotations, how = 'left', on = ['chrom', 'pos', 'ref', 'alt'])
2129+
res = ab_splice_res[
2130+
["chrom", "pos", "ref", "alt", "gene_id", "AbSplice_DNA"]
2131+
]
2132+
del ab_splice_res
2133+
table = pa.Table.from_pandas(res)
2134+
if i == 0:
2135+
logger.info(f"creating new file {ab_splice_agg_score_file}")
2136+
pqwriter = pq.ParquetWriter(ab_splice_agg_score_file, table.schema)
2137+
pqwriter.write_table(table)
2138+
del res
2139+
2140+
if pqwriter:
2141+
pqwriter.close()
2142+
2143+
2144+
@cli.command()
2145+
@click.option(
2146+
"--annotations",
2147+
type=click.Path(exists=True),
2148+
required=True,
2149+
help="Path to annotations.parquet",
2150+
)
2151+
@click.option(
2152+
"--variants",
2153+
type=click.Path(exists=True),
2154+
required=True,
2155+
help="Path to variants.parquet",
2156+
)
2157+
@click.option(
2158+
"--genes", type=click.Path(exists=True), required=True, help="Path to genes.parquet"
2159+
)
2160+
@click.option(
2161+
"--ab-splice-agg-score-file",
2162+
type=click.Path(exists=True),
2163+
required=True,
2164+
help="Path to splice.parquet",
2165+
)
2166+
@click.option(
2167+
"--output",
2168+
type=click.Path(),
2169+
required=True,
2170+
help="Path to output splice_anno.parquet",
2171+
)
2172+
@click.option("--verbose", is_flag=True, help="Enable verbose logging")
2173+
@click.option("--mem-limit", type=int, default=0, help="memory limit for duck DB in GB")
2174+
@click.option("--fill-nan", is_flag=True, help="Fill AbSplice_DNA NA values with 0")
2175+
def merge_absplice_scores(
2176+
annotations,
2177+
variants,
2178+
genes,
2179+
ab_splice_agg_score_file,
2180+
output,
2181+
verbose,
2182+
mem_limit,
2183+
fill_nan,
2184+
):
2185+
"""
2186+
Processes and merges sbsplice scores to annotation parquet file
2187+
2188+
This function performs the following steps:
2189+
1. Loads and transforms genes.parquet by renaming columns and splitting the gene field.
2190+
2. Loads annotations.parquet, excluding specified columns if they exist.
2191+
3. Merges the modified genes table with the annotations table on the 'gene_id' column.
2192+
4. Loads variants.parquet and merges it with the previous result on the 'id' column.
2193+
5. Loads splice.parquet and merges it with the previous result on columns 'chrom', 'pos', 'ref', 'alt', and 'gene_id'.
2194+
6. Saves the final merged table to a specified output parquet file.
2195+
2196+
Parameters:
2197+
- annotations (str): Path to the annotations.parquet file.
2198+
- variants (str): Path to the variants.parquet file.
2199+
- genes (str): Path to the genes.parquet file.
2200+
- splice (str): Path to the splice.parquet file.
2201+
- output (str): Path to save the output splice_anno.parquet file.
2202+
- verbose (bool): If True, prints detailed logging information at each step.
2203+
2204+
Returns:
2205+
None
2206+
"""
2207+
2208+
con = duckdb.connect(database=":memory:")
2209+
2210+
if mem_limit > 0:
2211+
logger.info(f"Setting memory limit to {mem_limit}GB")
2212+
# Set memory limit for DuckDB
2213+
con.execute(f"SET memory_limit = '{mem_limit}GB'")
2214+
2215+
if verbose:
2216+
logger.info(f"Loading and transforming {genes}...")
2217+
2218+
con.execute(
2219+
f"""
2220+
CREATE TEMP TABLE genes_mod AS
2221+
SELECT
2222+
id AS gene_id,
2223+
split_part(gene, '.', 1) AS gene,
2224+
split_part(gene, '.', 2) AS exon
2225+
FROM read_parquet('{genes}');
2226+
"""
2227+
)
2228+
2229+
if verbose:
2230+
logger.info(f"Loading {annotations}...")
2231+
2232+
all_columns = (
2233+
con.execute(f"SELECT * FROM read_parquet('{annotations}') LIMIT 0")
2234+
.fetchdf()
2235+
.columns.tolist()
2236+
)
2237+
exclude_columns = {"chrom", "pos", "ref", "alt", "AbSplice_DNA"}
2238+
selected_columns = [col for col in all_columns if col not in exclude_columns]
2239+
select_clause = ", ".join(selected_columns)
2240+
con.execute(
2241+
f"""
2242+
CREATE TEMP TABLE annotations AS
2243+
SELECT {select_clause}
2244+
FROM read_parquet('{annotations}');
2245+
"""
2246+
)
2247+
2248+
if verbose:
2249+
logger.info("Merging genes_mod with annotations...")
2250+
con.execute(
2251+
"""
2252+
CREATE TEMP TABLE merged1 AS
2253+
SELECT a.*, g.gene, g.exon
2254+
FROM annotations a
2255+
LEFT JOIN genes_mod g
2256+
ON a.gene_id = g.gene_id;
2257+
"""
2258+
)
2259+
2260+
if verbose:
2261+
logger.info(f"Loading {variants}...")
2262+
con.execute(
2263+
f"""
2264+
CREATE TEMP TABLE variants AS
2265+
SELECT * FROM read_parquet('{variants}');
2266+
"""
2267+
)
2268+
2269+
if verbose:
2270+
logger.info("Merging merged1 with variants...")
2271+
con.execute(
2272+
"""
2273+
CREATE TEMP TABLE merged2 AS
2274+
SELECT m1.*, v.chrom, v.pos, v.ref, v.alt
2275+
FROM merged1 m1
2276+
LEFT JOIN variants v
2277+
ON m1.id = v.id;
2278+
"""
2279+
)
2280+
2281+
if verbose:
2282+
logger.info(f"Loading {ab_splice_agg_score_file}...")
2283+
con.execute(
2284+
f"""
2285+
CREATE TEMP TABLE splice AS
2286+
SELECT * FROM read_parquet('{ab_splice_agg_score_file}');
2287+
"""
2288+
)
2289+
2290+
if verbose:
2291+
logger.info("Merging merged2 with absplice scores...")
2292+
con.execute(
2293+
"""
2294+
CREATE TEMP TABLE final_merge AS
2295+
SELECT m2.*, s.AbSplice_DNA
2296+
FROM merged2 m2
2297+
LEFT JOIN splice s
2298+
ON m2.chrom = s.chrom
2299+
AND m2.pos = s.pos
2300+
AND m2.ref = s.ref
2301+
AND m2.alt = s.alt
2302+
AND m2.gene = s.gene_id;
2303+
"""
2304+
)
2305+
2306+
if fill_nan:
2307+
if verbose:
2308+
print(f"Filling NULL values in splice column with 0...")
2309+
con.execute(
2310+
f"""
2311+
UPDATE final_merge
2312+
SET AbSplice_DNA = 0
2313+
WHERE AbSplice_DNA IS NULL;
2314+
"""
2315+
)
2316+
2317+
if verbose:
2318+
logger.info(f"Saving final merged table to {output}...")
2319+
con.execute(
2320+
f"""
2321+
COPY final_merge TO '{output}' (FORMAT 'parquet');
2322+
"""
2323+
)
2324+
2325+
logger.info(f"Successfully saved final merged table to {output}")

deeprvat_annotations.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ dependencies:
77
- click=8.0.4
88
- scikit-learn=1.0.2
99
- dask=2021.5.0
10+
- duckdb=1.2.1
1011
- tqdm=4.59.0
1112
- pybedtools=0.9.0
1213
- keras=2.11.0

docs/annotations.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ Alternatively, when the user want to select a specific set of genes to consider,
160160
- column `id`:`int` unique id for each gene
161161
Each row represents a gene the user want to include in the analysis.
162162

163+
## Manually adding precomputed and downloaded AbSplice scores
164+
165+
Instead of running the absplice part of the annotation pipeline, users can include `include_absplice: false` in the annotation config file and manually add Absplice annotations after the pipeline ran through. To do this, after downloading and unpacking the precomputed absplice scores from [zenodo](https://zenodo.org/records/10781457), and running the annotation pipeline without absplice, users can run `deeprvat_annotations aggregate_and_concat_absplice` inside the `deeprvat_annotations` environment with `--absplice-dir` pointing to the unpacked directory containing the absplice scores and `--ab-splice-agg-score-file` pointing to the path where the intermediate aggregated absplice scores should be saved.
166+
Run `deeprvat_annotations merge_absplice_scores` afterwards with the annotations, variants and `protein_coding_genes.parquet` as well as the aggregated absplice scores as input. Finally state where you want the output annotation file stored (`--output`).
163167

164168
## References
165169

tests/annotations/test_annotations.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,3 +819,115 @@ def test_compute_plof(test_data_name_dir, annotations_in, expected, tmp_path):
819819
assert_frame_equal(
820820
written_results, expected_results[written_results.columns], check_exact=False
821821
)
822+
823+
824+
@pytest.mark.parametrize(
825+
"test_data_name_dir, absplice_dir, expected",
826+
[
827+
(
828+
"aggregate_abplice_manually_medium",
829+
"absplice_dir",
830+
"aggregated_absplice_scores_expected.parquet",
831+
),
832+
],
833+
)
834+
def test_aggregate_absplice_manually(
835+
test_data_name_dir, absplice_dir, expected, tmp_path
836+
):
837+
current_test_data_dir = (
838+
tests_data_dir / "aggregate_absplice_manually" / test_data_name_dir
839+
)
840+
absplice_dir_path = current_test_data_dir / "input" / absplice_dir
841+
expected_path = current_test_data_dir / "expected" / expected
842+
output_path = tmp_path / "out.parquet"
843+
cli_runner = CliRunner()
844+
cli_parameters = [
845+
"aggregate-and-concat-absplice",
846+
"--absplice-dir",
847+
absplice_dir_path.as_posix(),
848+
"--ab-splice-agg-score-file",
849+
output_path.as_posix(),
850+
]
851+
result = cli_runner.invoke(annotations_cli, cli_parameters, catch_exceptions=False)
852+
assert result.exit_code == 0
853+
written_results = pd.read_parquet(output_path)
854+
expected_results = pd.read_parquet(expected_path)
855+
assert written_results.shape == expected_results.shape
856+
assert_frame_equal(
857+
written_results.sort_values(
858+
by=["chrom", "pos", "ref", "alt", "gene_id"]
859+
).reset_index(drop=True),
860+
expected_results[written_results.columns]
861+
.sort_values(by=["chrom", "pos", "ref", "alt", "gene_id"])
862+
.reset_index(drop=True),
863+
check_exact=False,
864+
)
865+
866+
867+
@pytest.mark.parametrize(
868+
"test_data_name_dir, annotations_in, variants_in, genes_in, absplice_in, expected",
869+
[
870+
(
871+
"merge_absplice_scores_medium",
872+
"annotations.parquet",
873+
"variants.parquet",
874+
"protein_coding_genes.parquet",
875+
"aggregated_absplice_scores.parquet",
876+
"expected.parquet",
877+
)
878+
],
879+
)
880+
def test_merge_absplice_scores_manually(
881+
test_data_name_dir,
882+
annotations_in,
883+
variants_in,
884+
genes_in,
885+
absplice_in,
886+
expected,
887+
tmp_path,
888+
):
889+
current_test_data_dir = (
890+
tests_data_dir / "merge_absplice_scores_manually" / test_data_name_dir
891+
)
892+
annotations_in_path = current_test_data_dir / "input" / annotations_in
893+
variants_in_path = current_test_data_dir / "input" / variants_in
894+
genes_in_path = current_test_data_dir / "input" / genes_in
895+
splice_in_path = current_test_data_dir / "input" / absplice_in
896+
expected_path = current_test_data_dir / "expected" / expected
897+
output_path = tmp_path / "splice_anno.parquet"
898+
899+
cli_runner = CliRunner()
900+
cli_parameters = [
901+
"merge-absplice-scores",
902+
"--annotations",
903+
annotations_in_path.as_posix(),
904+
"--variants",
905+
variants_in_path.as_posix(),
906+
"--genes",
907+
genes_in_path.as_posix(),
908+
"--ab-splice-agg-score-file",
909+
splice_in_path.as_posix(),
910+
"--output",
911+
output_path.as_posix(),
912+
"--verbose",
913+
]
914+
915+
# Add '--fill-splice-nan' if testing the fill_splice_merge case
916+
if "fill_splice" in test_data_name_dir:
917+
cli_parameters.extend(["--fill-splice-nan", "--fill-value", "0"])
918+
919+
result = cli_runner.invoke(annotations_cli, cli_parameters, catch_exceptions=False)
920+
921+
assert result.exit_code == 0, f"CLI failed: {result.output}"
922+
923+
written_results = pd.read_parquet(output_path)
924+
expected_results = pd.read_parquet(expected_path)
925+
926+
assert written_results.shape == expected_results.shape, "Shapes mismatch"
927+
assert_frame_equal(
928+
written_results.sort_values(by=["id"]).reset_index(drop=True),
929+
expected_results[written_results.columns]
930+
.sort_values(by=["id"])
931+
.reset_index(drop=True),
932+
check_exact=False,
933+
)

0 commit comments

Comments
 (0)