diff --git a/src/cherami/pipelines/strep_pneumo.py b/src/cherami/pipelines/strep_pneumo.py index 958faa4..ca122da 100644 --- a/src/cherami/pipelines/strep_pneumo.py +++ b/src/cherami/pipelines/strep_pneumo.py @@ -85,8 +85,12 @@ def generate_samplesheet( ) def should_run(self, context: PipelineContext) -> bool: - """Determine whether the Strep pneumo pipeline should run for the given sample. - When this returns False, the worker calls `on_skip()` instead of launching the pipeline. + """ + Determine whether the Strep pneumo pipeline should run for the given + sample. + Pull the claspar Kraken outputs, and check whether Strep is present. + When this returns False, the worker calls `on_skip()` instead of + launching the pipeline. Arguments: context: PipleineContext object that contains key information like @@ -95,34 +99,97 @@ def should_run(self, context: PipelineContext) -> bool: Returns: `True` when the pipeline should run, otherwise `False`. """ - # Get classifer calls info from onyx: - with OnyxClient(onyx_config()) as client: - climb_records = client.get( - project=context.server, - climb_id=context.climb_id, - include=[ - "classifier_calls__taxon_id", - "classifier_calls__count_descendants", - ], - ) - # Set criteria for pipeline running - currently 100 reads of Strep pneumo strep_pneumo_taxon_id = 1313 - min_descendant_reads = 100 - strep_finder = ( - taxa_dict - for taxa_dict in climb_records["classifier_calls"] - if (taxa_dict.get("taxon_id") == strep_pneumo_taxon_id) - and (taxa_dict.get("count_descendants") >= min_descendant_reads) - ) - # Iterate through list of dicts - return taxon_dict if taxon present, None if taxon not present - strep_present = next(strep_finder, None) + select_table_name = "claspar-kraken-bacteria" + + # Has the pipeline with the current context been run before? + first_should_run = super().should_run(context) - if bool(strep_present): - should_run_response = super().should_run(context) - return should_run_response - else: + if not first_should_run: + # if we have run before with a valid table for the pipeline, + # don't run return False + from onyx_analysis_helper import onyx_analysis_helper_functions as oa + + # 1) collate the current context from context object + current_context = ( + context.onyx_versions_hash, + context.orange_box_version, + ) + + # 2) get all the claspar analysis tables with name select_table_name + # associated with the sample. + analysis_tables: dict + exitcode: int + analysis_tables, exitcode = oa.get_analysis_records( + sample_id=context.climb_id, + server=context.server, + fields=[ + "methods", + "name", + "result_metrics", + ], + ) + + claspar_tables = { + aid: table + for aid, table in analysis_tables.items() + if table["name"] == select_table_name + } + + # 3.) get the claspar table that matches the current context + for aid, table in claspar_tables.items(): + # get onyx versions hashes from analysis tables: + onyx_versions_hash: str = table["methods"]["onyx_versions_hash"] + + # Get the orange box version from the analysis tables + versions: list[dict] = table["methods"]["versions"] + versions_dict: dict = { + ver["name"]: ver["version"] for ver in versions + } + orange_box_version: str | None = versions_dict.get( + "orange_box_version" + ) + + if (onyx_versions_hash, orange_box_version) == current_context: + # check the results: + for result in table["result_metrics"].values(): + if ( + int(result["profile_taxon_id"]) + == strep_pneumo_taxon_id + and result["kraken_confidence"] == "high" + ): + logger.debug( + "Incoming sample %s has claspar " + "analysis table (id: %s) with 'high' strep pneumo - " + "Decision: run", + context.climb_id, + aid, + ) + return True + elif ( + int(result["profile_taxon_id"]) + == strep_pneumo_taxon_id + and result["kraken_confidence"] == "low" + ): + logger.debug( + "Incoming sample %s has claspar " + "analysis table (id: %s) with 'low' strep pneumo - " + "Decision: not run", + context.climb_id, + aid, + ) + return False + logger.debug( + "Incoming sample %s has %s claspar kraken tables " + "(ids: %s) without Strep pneumoniae. Decision: not run", + context.climb_id, + len(claspar_tables), + list(claspar_tables.keys()), + ) + return False + def build_worker( config: CheramiConfig, diff --git a/tests/conftest.py b/tests/conftest.py index 1cfe36e..6365244 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,13 +36,24 @@ class MockedSample: context: PipelineContext -class MockContext(PipelineContext): +class TestContext(PipelineContext): def __init__(self, payload, server, pipeline_version, onyx_hash): super().__init__(payload, server, pipeline_version) self.onyx_versions_hash = onyx_hash self.orange_box_version = "1.2.3" +@pytest.fixture +def test_context(): + payload = { + "climb_id": "ID-123456", + "match_uuid": "ABC123", + "test": "test2", + } + context = TestContext(payload, "server", "1.0.0", "") + return context + + ONYX_RECORD = { "climb_id": "ID-123456", "site": "test", @@ -149,7 +160,7 @@ def mock_analysis_1(): }, orange_box_version="1.2.3", payload=PAYLOAD, - context=MockContext( + context=TestContext( PAYLOAD, "server", ANALYSIS_TABLE["pipeline_version"], @@ -210,7 +221,7 @@ def mock_analysis_old_ob(): }, orange_box_version="1.2.3", payload=PAYLOAD, - context=MockContext( + context=TestContext( PAYLOAD, "server", ANALYSIS_TABLE["pipeline_version"], @@ -328,7 +339,7 @@ def mock_multiple_analyses(): }, orange_box_version="1.2.3", payload=PAYLOAD, - context=MockContext( + context=TestContext( PAYLOAD, "server", "1.0.0", @@ -363,7 +374,7 @@ def mock_analysis_empty(): "match_uuid": "XXX000", "test": "test2", }, - context=MockContext( + context=TestContext( payload={ "climb_id": "ID-000000", "match_uuid": "XXX000", @@ -399,7 +410,7 @@ def mock_analysis_2(): }, orange_box_version="1.2.3", payload=payload, - context=MockContext( + context=TestContext( payload=payload, server="server", pipeline_version="1.0.0", diff --git a/tests/pipelines/test_strep_pipeline.py b/tests/pipelines/test_strep_pipeline.py new file mode 100644 index 0000000..95a71e8 --- /dev/null +++ b/tests/pipelines/test_strep_pipeline.py @@ -0,0 +1,171 @@ +import logging +from unittest.mock import patch + +import pytest + +from cherami.pipelines.strep_pneumo import StrepPneumoPipeline + + +@pytest.fixture +def mock_config(mocker): + return mocker.Mock() + + +@pytest.fixture +def strep_pipeline(mock_config, global_config): + pipeline = StrepPneumoPipeline(mock_config, global_config) + return pipeline + + +@pytest.fixture +def claspar_analysis_table_with_strep(): + """Analysis table that has high strep present.""" + return { + "AID-12345678": { + "name": "claspar-kraken-bacteria", + "methods": { + "versions": [ + { + "name": "classifier_version", + "version": "1.0.0", + }, # onyx version + { + "name": "classifier_db_date", + "version": "1970-01-01", + }, # onyx version + { + "name": "ncbi_taxonomy_date", + "version": "1970-01-01", + }, # onyx version + { + "name": "scylla_version", + "version": "1.0.0", + }, # onyx version + { + "name": "sylph_db_version", + "version": "1.0.0", + }, # onyx version + { + "name": "alignment_db_version", + "version": "1.0.0", + }, # onyx version + {"name": "module_dependency_db", "version": "2000-01-01"}, + {"name": "orange_box_version", "version": "1.2.3"}, + ], + "onyx_versions_hash": "e0c8c12a02fa86494059858c41af311d94c086a286bf4c62d53c21261e90f614", + "thresholds": {"limit": 10}, + }, + "result_metrics": { + "0": { + "profile": "test_profile_1", + "profile_taxon_id": 573, + "kraken_confidence": "low", + "profile_taxon_match": "Klebsiella pneumoniae", + }, + "1": { + "profile": "Strep", + "profile_taxon_id": 1313, + "kraken_confidence": "high", + "profile_taxon_match": "Streptococcus pneumoniae", + }, + }, + } + } + + +@patch( + "onyx_analysis_helper.onyx_analysis_helper_functions.get_analysis_records", +) +# @patch("cherami.pipelines.Pipeline.should_run", return_value=True) +def test_should_run_high_strep( + mock_onyx, + strep_pipeline, + test_context, + claspar_analysis_table_with_strep, + caplog, +): + """Should run - strep found and 'high' confidence""" + caplog.set_level(logging.DEBUG) + test_context.onyx_versions_hash = ( + "e0c8c12a02fa86494059858c41af311d94c086a286bf4c62d53c21261e90f614" + ) + + mock_onyx.side_effect = [({}, 0), (claspar_analysis_table_with_strep, 0)] + + assert strep_pipeline.should_run(test_context) + assert "Decision: run" in caplog.text + + +@pytest.fixture +def claspar_analysis_table_with_low_strep(claspar_analysis_table_with_strep): + """Analysis table that has high strep present.""" + low_strep = claspar_analysis_table_with_strep + low_strep["AID-12345678"]["result_metrics"]["1"]["kraken_confidence"] = ( + "low" + ) + return low_strep + + +@patch( + "onyx_analysis_helper.onyx_analysis_helper_functions.get_analysis_records", +) +# @patch("cherami.pipelines.Pipeline.should_run", return_value=True) +def test_should_run_low_strep( + mock_onyx, + strep_pipeline, + test_context, + claspar_analysis_table_with_low_strep, + caplog, +): + """Should not run - strep found but 'low' confidence.""" + caplog.set_level(logging.DEBUG) + test_context.onyx_versions_hash = ( + "e0c8c12a02fa86494059858c41af311d94c086a286bf4c62d53c21261e90f614" + ) + + mock_onyx.side_effect = [ + ({}, 0), + (claspar_analysis_table_with_low_strep, 0), + ] + + assert not strep_pipeline.should_run(test_context) + assert "Decision: not run" in caplog.text + + +@pytest.fixture +def claspar_analysis_table_no_strep(claspar_analysis_table_with_strep): + """Analysis table that has high strep present.""" + no_strep = claspar_analysis_table_with_strep + no_strep["AID-12345678"]["result_metrics"]["1"] = { + "profile": "test_profile_1", + "profile_taxon_id": 1496, + "kraken_confidence": "high", + "profile_taxon_match": "Clostridioides difficile", + } + return no_strep + + +@patch( + "onyx_analysis_helper.onyx_analysis_helper_functions.get_analysis_records", +) +# @patch("cherami.pipelines.Pipeline.should_run", return_value=True) +def test_should_run_no_strep( + mock_onyx, + strep_pipeline, + test_context, + claspar_analysis_table_no_strep, + caplog, +): + """Should not run - no strep found in claspar results.""" + caplog.set_level(logging.DEBUG) + test_context.onyx_versions_hash = ( + "e0c8c12a02fa86494059858c41af311d94c086a286bf4c62d53c21261e90f614" + ) + + mock_onyx.side_effect = [ + ({}, 0), + (claspar_analysis_table_no_strep, 0), + ] + + assert not strep_pipeline.should_run(test_context) + assert "Decision: not run" in caplog.text