diff --git a/Containerfile.c10s b/Containerfile.c10s index 552a77bd..3ad6c858 100644 --- a/Containerfile.c10s +++ b/Containerfile.c10s @@ -38,6 +38,7 @@ RUN dnf -y install --allowerasing \ sed \ gawk \ rsync \ + python3-tabulate \ && dnf clean all RUN pip3 install --no-cache-dir \ @@ -46,7 +47,9 @@ RUN pip3 install --no-cache-dir \ openinference-instrumentation-beeai \ arize-phoenix-otel \ redis \ - specfile + specfile \ + pytest \ + pytest-asyncio # Create user RUN useradd -m -G wheel beeai diff --git a/Makefile b/Makefile index e8c6b0fa..d4db9d98 100644 --- a/Makefile +++ b/Makefile @@ -27,8 +27,12 @@ run-triage-agent-standalone: -e MOCK_JIRA=$(MOCK_JIRA) \ triage-agent - - +.PHONY: run-triage-agent-e2e-tests +run-triage-agent-e2e-tests: + $(COMPOSE_AGENTS) run --rm \ + -e MOCK_JIRA="true" \ + -e DRY_RUN=$(DRY_RUN) \ + triage-agent-e2e-tests .PHONY: run-rebase-agent-c9s-standalone run-rebase-agent-c9s-standalone: diff --git a/README-agents.md b/README-agents.md index c60fa9f3..01ed5d6e 100644 --- a/README-agents.md +++ b/README-agents.md @@ -19,6 +19,23 @@ Three agents process tasks through Redis queues: - **Modify JIRA issues** (add comments, update fields, apply labels) - **Create GitLab merge requests** and push commits +## Jira mocking + +If you clone testing Jira files from +`git@gitlab.cee.redhat.com:jotnar-project/testing-jiras.git` +you can use them to work with instead of real Jira server. + +Example: + +`make run-triage-agent-standalone JIRA_ISSUE=RHEL-15216 MOCK_JIRA=true` + +If used together with `DRY_RUN`, the agents won't edit the Jira files, +otherwise they will. + +Example: + +`make run-triage-agent-standalone JIRA_ISSUE=RHEL-15216 DRY_RUN=true MOCK_JIRA=true` + ## Setup ### Required API Tokens & Authentication diff --git a/agents/backport_agent.py b/agents/backport_agent.py index 77425719..439b5772 100644 --- a/agents/backport_agent.py +++ b/agents/backport_agent.py @@ -37,6 +37,8 @@ LogInputSchema, LogOutputSchema, Task, + BackportData, + ErrorData, ) from common.utils import redis_client, fix_await from constants import I_AM_JOTNAR, CAREFULLY_REVIEW_CHANGES @@ -68,7 +70,6 @@ GitPatchCreationTool, GitPreparePackageSources, ) -from triage_agent import BackportData, ErrorData from utils import ( check_subprocess, get_agent_execution_config, diff --git a/agents/metrics_middleware.py b/agents/metrics_middleware.py new file mode 100644 index 00000000..cd3b60b2 --- /dev/null +++ b/agents/metrics_middleware.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from beeai_framework.context import ( + RunContextStartEvent, + RunContextFinishEvent, + RunMiddlewareProtocol, + RunContext +) +from beeai_framework.emitter import EmitterOptions, EventMeta +from beeai_framework.emitter.utils import create_internal_event_matcher + + +class MetricsMiddleware(RunMiddlewareProtocol): + def __init__(self) -> None: + self.start_time: datetime | None = None + self.end_time: datetime | None = None + self.tool_calls: int = 0 + + def bind(self, ctx: RunContext) -> None: + ctx.emitter.on( + create_internal_event_matcher("start", ctx.instance), + self._on_run_context_start, + EmitterOptions(is_blocking=True, priority=1), + ) + ctx.emitter.on( + create_internal_event_matcher("finish", ctx.instance), + self._on_run_context_finish, + EmitterOptions(is_blocking=True, priority=1), + ) + + async def _on_run_context_start(self, event: RunContextStartEvent, meta: EventMeta) -> None: + self.start_time = datetime.now() + + async def _on_run_context_finish(self, event: RunContextFinishEvent, meta: EventMeta) -> None: + self.end_time = datetime.now() + + @property + def duration(self) -> float: + if self.start_time and self.end_time: + return (self.end_time - self.start_time).total_seconds() + return 0 + + def get_metrics(self) -> dict[str, float]: + return {"duration": self.duration} diff --git a/agents/tasks.py b/agents/tasks.py index 8f74e4b4..afb2f1f7 100644 --- a/agents/tasks.py +++ b/agents/tasks.py @@ -9,9 +9,9 @@ from common.models import LogOutputSchema, CachedMRMetadata from common.utils import is_cs_branch -from constants import BRANCH_PREFIX, JIRA_COMMENT_TEMPLATE -from utils import check_subprocess, run_subprocess, run_tool, mcp_tools -from tools.specfile import UpdateReleaseTool +from agents.constants import BRANCH_PREFIX, JIRA_COMMENT_TEMPLATE +from agents.utils import check_subprocess, run_subprocess, run_tool, mcp_tools +from agents.tools.specfile import UpdateReleaseTool logger = logging.getLogger(__name__) diff --git a/agents/tests/e2e/conftest.py b/agents/tests/e2e/conftest.py new file mode 100644 index 00000000..f52e41f5 --- /dev/null +++ b/agents/tests/e2e/conftest.py @@ -0,0 +1,16 @@ +from typing import Generator + + +import pytest + + +@pytest.hookimpl(wrapper=True) +def pytest_terminal_summary( + terminalreporter: pytest.TerminalReporter, exitstatus, config: pytest.Config +) -> Generator: + yield + metrics = config.stash.get("metrics", None) + + if metrics: + terminalreporter.write_sep("=", "Metrics") + terminalreporter.write_line(metrics, flush=True) diff --git a/agents/tests/e2e/test_triage.py b/agents/tests/e2e/test_triage.py new file mode 100644 index 00000000..c571b83a --- /dev/null +++ b/agents/tests/e2e/test_triage.py @@ -0,0 +1,131 @@ +from tabulate import tabulate +import pytest +import os + +from agents.triage_agent import run_workflow, TriageState, create_triage_agent +from agents.metrics_middleware import MetricsMiddleware +from agents.observability import setup_observability +from common.models import TriageOutputSchema, Resolution, BackportData + + +class TriageAgentTestCase: + def __init__(self, input: str, expected_output: TriageOutputSchema): + self.input: str = input + self.expected_output: TriageOutputSchema = expected_output + self.metrics: dict = None + + async def run(self) -> TriageState: + metrics_middleware = MetricsMiddleware() + + def testing_factory(gateway_tools): + triage_agent = create_triage_agent(gateway_tools) + triage_agent.middlewares.append(metrics_middleware) + return triage_agent + + finished_state = await run_workflow(self.input, False, testing_factory) + self.metrics = metrics_middleware.get_metrics() + return finished_state + + +test_cases = [ + TriageAgentTestCase( + input="RHEL-15216", + expected_output=TriageOutputSchema( + resolution=Resolution.BACKPORT, + data=BackportData( + package="dnsmasq", + patch_urls=[ + "http://thekelleys.org.uk/gitweb/?p=dnsmasq.git;a=patch;h=dd33e98da09c487a58b6cb6693b8628c0b234a3b" + ], + justification="not-implemented", + jira_issue="RHEL-15216", + cve_id=None, + fix_version="rhel-8.10", + ), + ), + ), + TriageAgentTestCase( + input="RHEL-112546", + expected_output=TriageOutputSchema( + resolution=Resolution.BACKPORT, + data=BackportData( + package="libtiff", + patch_urls=[ + "https://gitlab.com/libtiff/libtiff/-/commit/d1c0719e004fbb223c571d286c73911569d4dbb6.patch" + ], + justification="not-implemented", + jira_issue="RHEL-112546", + cve_id="CVE-2025-9900", + fix_version="rhel-9.6.z", + ), + ), + ), + TriageAgentTestCase( + input="RHEL-61943", + expected_output=TriageOutputSchema( + resolution=Resolution.BACKPORT, + data=BackportData( + package="dnsmasq", + patch_urls=[ + "http://thekelleys.org.uk/gitweb/?p=dnsmasq.git;a=patch;h=eb1fe15ca80b6bc43cd6bfdf309ec6c590aff811" + ], + justification="not-implemented", + jira_issue="RHEL-61943", + cve_id=None, + fix_version="rhel-8.10.z", + ), + ), + ), + TriageAgentTestCase( + input="RHEL-29712", + expected_output=TriageOutputSchema( + resolution=Resolution.BACKPORT, + data=BackportData( + package="bind", + patch_urls=[ + "https://gitlab.isc.org/isc-projects/bind9/-/commit/7e2f50c36958f8c98d54e6d131f088a4837ce269" + ], + justification="not-implemented", + jira_issue="RHEL-29712", + cve_id=None, + fix_version="rhel-8.10.z", + ), + ), + ), +] + + +@pytest.fixture(scope="session", autouse=True) +def observability_fixture(): + return setup_observability(os.environ["COLLECTOR_ENDPOINT"]) + + +@pytest.fixture(scope="session", autouse=True) +def mydata(request): + yield + collected_metrics = [] + for test_case in test_cases: + if test_case.metrics is None: + continue + collected_metrics.append([test_case.input] + list(test_case.metrics.values())) + request.config.stash["metrics"] = tabulate(collected_metrics, ["Issue", "Time"]) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "test_case", + test_cases, +) +async def test_triage_agent(test_case: TriageAgentTestCase): + def verify_result( + real_output: TriageOutputSchema, expected_output: TriageOutputSchema + ): + assert real_output.resolution == expected_output.resolution + assert real_output.data.package == expected_output.data.package + assert real_output.data.patch_urls == expected_output.data.patch_urls + assert real_output.data.jira_issue == expected_output.data.jira_issue + assert real_output.data.cve_id == expected_output.data.cve_id + assert real_output.data.fix_version == expected_output.data.fix_version + + finished_state = await test_case.run() + verify_result(finished_state.triage_result, test_case.expected_output) diff --git a/agents/tools/commands.py b/agents/tools/commands.py index f93e9f30..83fda5da 100644 --- a/agents/tools/commands.py +++ b/agents/tools/commands.py @@ -8,7 +8,7 @@ from beeai_framework.emitter import Emitter from beeai_framework.tools import JSONToolOutput, Tool, ToolError, ToolRunOptions -from utils import run_subprocess +from agents.utils import run_subprocess TIMEOUT = 10 * 60 # seconds ELLIPSIZED_LINES = 200 diff --git a/agents/tools/specfile.py b/agents/tools/specfile.py index db5000af..09ecb75b 100644 --- a/agents/tools/specfile.py +++ b/agents/tools/specfile.py @@ -15,7 +15,7 @@ from common.constants import BREWHUB_URL from common.validators import NonEmptyString -from utils import get_absolute_path +from agents.utils import get_absolute_path class GetPackageInfoToolInput(BaseModel): diff --git a/agents/triage_agent.py b/agents/triage_agent.py index 4583e4de..f675a200 100644 --- a/agents/triage_agent.py +++ b/agents/triage_agent.py @@ -1,13 +1,10 @@ import asyncio -import json import logging import os import re import sys import traceback -from enum import Enum from textwrap import dedent -from typing import Union from pydantic import BaseModel, Field @@ -24,15 +21,13 @@ from beeai_framework.workflows import Workflow from beeai_framework.utils.strings import to_json -import tasks +import agents.tasks as tasks from common.config import load_rhel_config from common.models import ( Task, TriageInputSchema as InputSchema, TriageOutputSchema as OutputSchema, Resolution, - RebaseData, - BackportData, ClarificationNeededData, NoActionData, ErrorData, @@ -40,12 +35,12 @@ ) from common.utils import redis_client, fix_await from common.constants import JiraLabels, RedisQueues -from observability import setup_observability -from tools.commands import RunShellCommandTool -from tools.patch_validator import PatchValidatorTool -from tools.version_mapper import VersionMapperTool -from tools.upstream_search import UpstreamSearchTool -from utils import get_agent_execution_config, get_chat_model, get_tool_call_checker_config, mcp_tools, run_tool +from agents.observability import setup_observability +from agents.tools.commands import RunShellCommandTool +from agents.tools.patch_validator import PatchValidatorTool +from agents.tools.version_mapper import VersionMapperTool +from agents.tools.upstream_search import UpstreamSearchTool +from agents.utils import get_agent_execution_config, get_chat_model, get_tool_call_checker_config, mcp_tools, run_tool logger = logging.getLogger(__name__) @@ -291,228 +286,234 @@ def render_prompt(input: InputSchema) -> str: return PromptTemplate(schema=InputSchema, template=template).render(input) -async def main() -> None: - logging.basicConfig(level=logging.INFO) +class TriageState(BaseModel): + jira_issue: str + cve_eligibility_result: CVEEligibilityResult | None = Field(default=None) + triage_result: OutputSchema | None = Field(default=None) + target_branch: str | None = Field(default=None) + + +def create_triage_agent(gateway_tools): + return RequirementAgent( + name="TriageAgent", + llm=get_chat_model(), + tool_call_checker=get_tool_call_checker_config(), + tools=[ThinkTool(), RunShellCommandTool(), PatchValidatorTool(), + VersionMapperTool(), UpstreamSearchTool()] + + [t for t in gateway_tools if t.name in ["get_jira_details", "set_jira_fields"]], + memory=UnconstrainedMemory(), + requirements=[ + ConditionalRequirement( + ThinkTool, + force_at_step=1, + force_after=Tool, + consecutive_allowed=False, + only_success_invocations=False, + ), + ConditionalRequirement("get_jira_details", min_invocations=1), + ConditionalRequirement(UpstreamSearchTool, only_after="get_jira_details"), + ConditionalRequirement(RunShellCommandTool, only_after="get_jira_details"), + ConditionalRequirement(PatchValidatorTool, only_after="get_jira_details"), + ConditionalRequirement("set_jira_fields", only_after="get_jira_details"), + ], + middlewares=[GlobalTrajectoryMiddleware(pretty=True)], + role="Red Hat Enterprise Linux developer", + instructions=[ + "Use the `think` tool to reason through complex decisions and document your approach.", + "Be proactive in your search for fixes and do not give up easily.", + "For any patch URL that you are proposing for backport, you need to validate it using PatchValidator tool.", + "Do not modify the patch URL in your final answer after it has been validated with the PatchValidator tool.", + "After completing your triage analysis, if your decision is backport or rebase, always set appropriate JIRA fields per the instructions using set_jira_fields tool.", + ] + ) - setup_observability(os.environ["COLLECTOR_ENDPOINT"]) - dry_run = os.getenv("DRY_RUN", "False").lower() == "true" +async def run_workflow(jira_issue, dry_run, triage_agent_factory): + async with mcp_tools(os.getenv("MCP_GATEWAY_URL")) as gateway_tools: + triage_agent = triage_agent_factory(gateway_tools) + + workflow = Workflow(TriageState, name="TriageWorkflow") - class State(BaseModel): - jira_issue: str - cve_eligibility_result: CVEEligibilityResult | None = Field(default=None) - triage_result: OutputSchema | None = Field(default=None) - target_branch: str | None = Field(default=None) - - async def run_workflow(jira_issue): - async with mcp_tools(os.getenv("MCP_GATEWAY_URL")) as gateway_tools: - triage_agent = RequirementAgent( - name="TriageAgent", - llm=get_chat_model(), - tool_call_checker=get_tool_call_checker_config(), - tools=[ThinkTool(), RunShellCommandTool(), PatchValidatorTool(), - VersionMapperTool(), UpstreamSearchTool()] - + [t for t in gateway_tools if t.name in ["get_jira_details", "set_jira_fields"]], - memory=UnconstrainedMemory(), - requirements=[ - ConditionalRequirement( - ThinkTool, - force_at_step=1, - force_after=Tool, - consecutive_allowed=False, - only_success_invocations=False, - ), - ConditionalRequirement("get_jira_details", min_invocations=1), - ConditionalRequirement(UpstreamSearchTool, only_after="get_jira_details"), - ConditionalRequirement(RunShellCommandTool, only_after="get_jira_details"), - ConditionalRequirement(PatchValidatorTool, only_after="get_jira_details"), - ConditionalRequirement("set_jira_fields", only_after="get_jira_details"), - ], - middlewares=[GlobalTrajectoryMiddleware(pretty=True)], - role="Red Hat Enterprise Linux developer", - instructions=[ - "Use the `think` tool to reason through complex decisions and document your approach.", - "Be proactive in your search for fixes and do not give up easily.", - "For any patch URL that you are proposing for backport, you need to validate it using PatchValidator tool.", - "Do not modify the patch URL in your final answer after it has been validated with the PatchValidator tool.", - "After completing your triage analysis, if your decision is backport or rebase, always set appropriate JIRA fields per the instructions using set_jira_fields tool.", - ] + async def check_cve_eligibility(state): + """Check CVE eligibility for the issue""" + logger.info(f"Checking CVE eligibility for {state.jira_issue}") + result = await run_tool( + "check_cve_triage_eligibility", + available_tools=gateway_tools, + issue_key=state.jira_issue ) + state.cve_eligibility_result = CVEEligibilityResult.model_validate(result) - workflow = Workflow(State, name="TriageWorkflow") + logger.info(f"CVE eligibility result: {state.cve_eligibility_result}") - async def check_cve_eligibility(state): - """Check CVE eligibility for the issue""" - logger.info(f"Checking CVE eligibility for {state.jira_issue}") - result = await run_tool( - "check_cve_triage_eligibility", - available_tools=gateway_tools, - issue_key=state.jira_issue + # If not eligible for triage, end workflow + if not state.cve_eligibility_result.is_eligible_for_triage: + logger.info(f"Issue {state.jira_issue} not eligible for triage: {state.cve_eligibility_result.reason}") + if state.cve_eligibility_result.error: + state.triage_result = OutputSchema( + resolution=Resolution.ERROR, + data=ErrorData( + details=f"CVE eligibility check error: {state.cve_eligibility_result.error}", + jira_issue=state.jira_issue + ) ) - state.cve_eligibility_result = CVEEligibilityResult.model_validate(result) - - logger.info(f"CVE eligibility result: {state.cve_eligibility_result}") - - # If not eligible for triage, end workflow - if not state.cve_eligibility_result.is_eligible_for_triage: - logger.info(f"Issue {state.jira_issue} not eligible for triage: {state.cve_eligibility_result.reason}") - if state.cve_eligibility_result.error: - state.triage_result = OutputSchema( - resolution=Resolution.ERROR, - data=ErrorData( - details=f"CVE eligibility check error: {state.cve_eligibility_result.error}", + else: + state.triage_result = OutputSchema( + resolution=Resolution.NO_ACTION, + data=NoActionData( + reasoning=f"CVE eligibility check decided to skip triaging: {state.cve_eligibility_result.reason}", jira_issue=state.jira_issue ) ) - else: - state.triage_result = OutputSchema( - resolution=Resolution.NO_ACTION, - data=NoActionData( - reasoning=f"CVE eligibility check decided to skip triaging: {state.cve_eligibility_result.reason}", - jira_issue=state.jira_issue - ) - ) - return "comment_in_jira" - - reason = state.cve_eligibility_result.reason - logger.info(f"Issue {state.jira_issue} is eligible for triage: {reason}") - return "run_triage_analysis" - - async def run_triage_analysis(state): - """Run the main triage analysis""" - logger.info(f"Running triage analysis for {state.jira_issue}") - input_data = InputSchema(issue=state.jira_issue) - output_schema_json = to_json( - OutputSchema.model_json_schema(mode="validation"), - indent=2, - sort_keys=False, - ) - response = await triage_agent.run( - render_prompt(input_data), - # `OutputSchema` alone is not enough here, some models (cough cough, Claude Sonnet 4.5) - # really stuggle with the nesting, let's provide some more hints - expected_output=dedent( - f""" - The final answer must fulfill the following. - - **Important Formatting Rules:** - - The top-level output must be a JSON object with two keys: `resolution` (a string) and `data` (an object). - - The `data` field MUST be a nested JSON object. **It must not be a stringified JSON object.** - - The structure of the `data` object must match the schema corresponding to the chosen `resolution`. - - **Correct example for a 'backport' resolution:** - ```json - {{ - "resolution": "backport", - "data": {{ - "package": "some-package", - "patch_url": "https://example.com/some.patch", - "justification": "This patch fixes the bug by doing X, Y, and Z.", - "jira_issue": "RHEL-12345", - "cve_id": "CVE-1234-98765", - "fix_version": "rhel-X.Y.Z" - }} + return "comment_in_jira" + + reason = state.cve_eligibility_result.reason + logger.info(f"Issue {state.jira_issue} is eligible for triage: {reason}") + return "run_triage_analysis" + + async def run_triage_analysis(state): + """Run the main triage analysis""" + logger.info(f"Running triage analysis for {state.jira_issue}") + input_data = InputSchema(issue=state.jira_issue) + output_schema_json = to_json( + OutputSchema.model_json_schema(mode="validation"), + indent=2, + sort_keys=False, + ) + response = await triage_agent.run( + render_prompt(input_data), + # `OutputSchema` alone is not enough here, some models (cough cough, Claude Sonnet 4.5) + # really stuggle with the nesting, let's provide some more hints + expected_output=dedent( + f""" + The final answer must fulfill the following. + + **Important Formatting Rules:** + - The top-level output must be a JSON object with two keys: `resolution` (a string) and `data` (an object). + - The `data` field MUST be a nested JSON object. **It must not be a stringified JSON object.** + - The structure of the `data` object must match the schema corresponding to the chosen `resolution`. + + **Correct example for a 'backport' resolution:** + ```json + {{ + "resolution": "backport", + "data": {{ + "package": "some-package", + "patch_url": "https://example.com/some.patch", + "justification": "This patch fixes the bug by doing X, Y, and Z.", + "jira_issue": "RHEL-12345", + "cve_id": "CVE-1234-98765", + "fix_version": "rhel-X.Y.Z" }} - ``` - - ```json - {output_schema_json} - ``` - """ - ), - **get_agent_execution_config(), - ) - state.triage_result = OutputSchema.model_validate_json(response.last_message.text) + }} + ``` + + ```json + {output_schema_json} + ``` + """ + ), + **get_agent_execution_config(), + ) + state.triage_result = OutputSchema.model_validate_json(response.last_message.text) - # Jira issue key in resolution data has been generated by LLM, make sure it's upper-case - state.triage_result.data.jira_issue = state.triage_result.data.jira_issue.upper() + # Jira issue key in resolution data has been generated by LLM, make sure it's upper-case + state.triage_result.data.jira_issue = state.triage_result.data.jira_issue.upper() - if state.triage_result.resolution == Resolution.REBASE: - return "verify_rebase_author" - elif state.triage_result.resolution == Resolution.BACKPORT: - return "determine_target_branch" - elif state.triage_result.resolution in [Resolution.CLARIFICATION_NEEDED, Resolution.NO_ACTION]: - return "comment_in_jira" - else: - return Workflow.END + if state.triage_result.resolution == Resolution.REBASE: + return "verify_rebase_author" + elif state.triage_result.resolution == Resolution.BACKPORT: + return "determine_target_branch" + elif state.triage_result.resolution in [Resolution.CLARIFICATION_NEEDED, Resolution.NO_ACTION]: + return "comment_in_jira" + else: + return Workflow.END - async def determine_target_branch_step(state): - """Determine target branch for rebase/backport decisions""" - logger.info(f"Determining target branch for {state.jira_issue}") + async def determine_target_branch_step(state): + """Determine target branch for rebase/backport decisions""" + logger.info(f"Determining target branch for {state.jira_issue}") - state.target_branch = await determine_target_branch( - cve_eligibility_result=state.cve_eligibility_result, - triage_data=state.triage_result.data - ) + state.target_branch = await determine_target_branch( + cve_eligibility_result=state.cve_eligibility_result, + triage_data=state.triage_result.data + ) - if state.target_branch: - logger.info(f"Target branch determined: {state.target_branch}") - else: - logger.warning(f"Could not determine target branch for {state.jira_issue}") + if state.target_branch: + logger.info(f"Target branch determined: {state.target_branch}") + else: + logger.warning(f"Could not determine target branch for {state.jira_issue}") - return "comment_in_jira" + return "comment_in_jira" - async def verify_rebase_author(state): - """Verify that the issue author is a Red Hat employee""" - logger.info(f"Verifying issue author for {state.jira_issue}") + async def verify_rebase_author(state): + """Verify that the issue author is a Red Hat employee""" + logger.info(f"Verifying issue author for {state.jira_issue}") - is_rh_employee = await run_tool( - "verify_issue_author", - available_tools=gateway_tools, - issue_key=state.jira_issue - ) + is_rh_employee = await run_tool( + "verify_issue_author", + available_tools=gateway_tools, + issue_key=state.jira_issue + ) - issue_status = await run_tool( - "get_jira_details", - available_tools=gateway_tools, - issue_key=state.jira_issue + issue_status = await run_tool( + "get_jira_details", + available_tools=gateway_tools, + issue_key=state.jira_issue + ) + issue_status = issue_status.get("fields", {}).get("status", {}).get("name") + + if not is_rh_employee and issue_status == "New": + logger.warning(f"Issue author for {state.jira_issue} is not verified as RH employee - ending triage with clarification needed") + + # override triage result with clarification needed so that it gets reviewed by us + state.triage_result = OutputSchema( + resolution=Resolution.CLARIFICATION_NEEDED, + data=ClarificationNeededData( + findings="The rebase resolution was determined, but author verification failed.", + additional_info_needed="Needs human review, as the issue author is not verified as a Red Hat employee.", + jira_issue=state.jira_issue + ) ) - issue_status = issue_status.get("fields", {}).get("status", {}).get("name") - if not is_rh_employee and issue_status == "New": - logger.warning(f"Issue author for {state.jira_issue} is not verified as RH employee - ending triage with clarification needed") + return "comment_in_jira" - # override triage result with clarification needed so that it gets reviewed by us - state.triage_result = OutputSchema( - resolution=Resolution.CLARIFICATION_NEEDED, - data=ClarificationNeededData( - findings="The rebase resolution was determined, but author verification failed.", - additional_info_needed="Needs human review, as the issue author is not verified as a Red Hat employee.", - jira_issue=state.jira_issue - ) - ) + logger.info(f"Issue author for {state.jira_issue} verified as RH employee or issue is not in new status - proceeding with rebase") - return "comment_in_jira" + return "determine_target_branch" - logger.info(f"Issue author for {state.jira_issue} verified as RH employee or issue is not in new status - proceeding with rebase") + async def comment_in_jira(state): + comment_text = state.triage_result.format_for_comment() + logger.info(f"Result to be put in Jira comment: {comment_text}") + if dry_run: + return Workflow.END + await tasks.comment_in_jira( + jira_issue=state.jira_issue, + agent_type="Triage", + comment_text=comment_text, + available_tools=gateway_tools, + ) + return Workflow.END - return "determine_target_branch" + workflow.add_step("check_cve_eligibility", check_cve_eligibility) + workflow.add_step("run_triage_analysis", run_triage_analysis) + workflow.add_step("verify_rebase_author", verify_rebase_author) + workflow.add_step("determine_target_branch", determine_target_branch_step) + workflow.add_step("comment_in_jira", comment_in_jira) - async def comment_in_jira(state): - comment_text = state.triage_result.format_for_comment() - logger.info(f"Result to be put in Jira comment: {comment_text}") - if dry_run: - return Workflow.END - await tasks.comment_in_jira( - jira_issue=state.jira_issue, - agent_type="Triage", - comment_text=comment_text, - available_tools=gateway_tools, - ) - return Workflow.END + response = await workflow.run(TriageState(jira_issue=jira_issue)) + return response.state - workflow.add_step("check_cve_eligibility", check_cve_eligibility) - workflow.add_step("run_triage_analysis", run_triage_analysis) - workflow.add_step("verify_rebase_author", verify_rebase_author) - workflow.add_step("determine_target_branch", determine_target_branch_step) - workflow.add_step("comment_in_jira", comment_in_jira) - response = await workflow.run(State(jira_issue=jira_issue)) - return response.state +async def main() -> None: + logging.basicConfig(level=logging.INFO) + + setup_observability(os.environ["COLLECTOR_ENDPOINT"]) + + dry_run = os.getenv("DRY_RUN", "False").lower() == "true" if jira_issue := os.getenv("JIRA_ISSUE", None): logger.info("Running in direct mode with environment variable") - state = await run_workflow(jira_issue) + state = await run_workflow(jira_issue, dry_run, create_triage_agent) logger.info(f"Direct run completed: {state.triage_result.model_dump_json(indent=4)}") if state.cve_eligibility_result: logger.info(f"CVE eligibility result: {state.cve_eligibility_result}") @@ -567,7 +568,7 @@ async def retry(task, error): logger.info(f"Cleaned up existing labels for {input.issue}") logger.info(f"Starting triage processing for {input.issue}") - state = await run_workflow(input.issue) + state = await run_workflow(input.issue, dry_run, create_triage_agent) output = state.triage_result logger.info( f"Triage processing completed for {input.issue}, " f"resolution: {output.resolution.value}" diff --git a/compose.yaml b/compose.yaml index 9ecf130c..b0ce1a90 100644 --- a/compose.yaml +++ b/compose.yaml @@ -135,6 +135,15 @@ services: command: ["python", "agents/triage_agent.py"] profiles: ["agents"] + triage-agent-e2e-tests: + <<: *beeai-agent-c10s + environment: + <<: *beeai-env + # the option about default loop is here because of litellm issue + # https://github.com/BerriAI/litellm/issues/14521 + command: ["pytest", "agents/tests/e2e/test_triage.py", "-o", "asyncio_default_test_loop_scope=session"] + profiles: ["agents"] + backport-agent-c9s: <<: *beeai-agent-c9s command: ["python", "agents/backport_agent.py"] diff --git a/mcp_server/jira_tools.py b/mcp_server/jira_tools.py index ec5c94a8..220af20d 100644 --- a/mcp_server/jira_tools.py +++ b/mcp_server/jira_tools.py @@ -159,6 +159,9 @@ async def add_jira_comment( """ Adds a comment to the specified Jira issue. """ + if os.getenv("DRY_RUN", "False").lower() == "true": + return f"Dry run, not adding comment to {issue_key} (this is expected, not an error)" + async with aiohttpClientSession() as session: try: async with session.post(