Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions src/agentic_ci/iterate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""MR/PR iteration lifecycle helpers.

Provides reusable functions for managing the iterate phase of MR/PR workflows:
extracting MR URLs from bot comments, counting iterations, checking readiness,
resolving review threads, and collecting feedback from forges.
"""

from __future__ import annotations

import logging
import re

import requests

from agentic_ci.forge import ForgeError, detect_forge

log = logging.getLogger(__name__)

DEFAULT_BOT_IDENTIFIER = "agentic-ci bot"


def extract_mr_url(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should just be a method in the Forge class; perhaps different per forge type since the logic here is slighly changed for github vs gitlab.

comments: list[dict],
bot_pattern: re.Pattern[str] | None = None,
) -> str | None:
"""Extract MR/PR URL from bot comments.

Scans comments for those matching the bot header pattern, then extracts
the first GitLab MR or GitHub PR URL found.

Args:
comments: List of comment dicts with ``"body"`` keys.
bot_pattern: Compiled regex to identify bot comments.
Defaults to matching ``jira-autofix bot`` headers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It defaults to DEFAULT_BOT_IDENTIFIER.


Returns:
The MR/PR URL string, or ``None`` if not found.
"""
if bot_pattern is None:
bot_pattern = re.compile(
rf"^(h3\.\s+|#{{1,6}}\s+)?{re.escape(DEFAULT_BOT_IDENTIFIER)}",
re.MULTILINE,
)
for c in comments:
body = c.get("body", "")
if not bot_pattern.search(body):
continue
m = re.search(r"https?://gitlab\.com/\S+/-/merge_requests/\d+", body)
if m:
return m.group(0)
m = re.search(r"https?://github\.com/\S+/pull/\d+", body)
if m:
return m.group(0)
Comment on lines +48 to +53

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Overly permissive URL regex can return malformed MR/PR links

At Line 48 and Line 51, \S+ may include trailing punctuation from markdown text (for example ) or .), which can break downstream forge detection/parsing.

Suggested fix
-        m = re.search(r"https?://gitlab\.com/\S+/-/merge_requests/\d+", body)
+        m = re.search(
+            r"https?://gitlab\.com/[A-Za-z0-9._\-/]+/-/merge_requests/\d+\b",
+            body,
+        )
         if m:
-            return m.group(0)
-        m = re.search(r"https?://github\.com/\S+/pull/\d+", body)
+            return m.group(0).rstrip(").,;:!?]>\"'")
+        m = re.search(
+            r"https?://github\.com/[A-Za-z0-9._-]+/[A-Za-z0-9._-]+/pull/\d+\b",
+            body,
+        )
         if m:
-            return m.group(0)
+            return m.group(0).rstrip(").,;:!?]>\"'")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
m = re.search(r"https?://gitlab\.com/\S+/-/merge_requests/\d+", body)
if m:
return m.group(0)
m = re.search(r"https?://github\.com/\S+/pull/\d+", body)
if m:
return m.group(0)
m = re.search(
r"https?://gitlab\.com/[A-Za-z0-9._\-/]+/-/merge_requests/\d+\b",
body,
)
if m:
return m.group(0).rstrip(").,;:!?]>\"'")
m = re.search(
r"https?://github\.com/[A-Za-z0-9._-]+/[A-Za-z0-9._-]+/pull/\d+\b",
body,
)
if m:
return m.group(0).rstrip(").,;:!?]>\"'")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/agentic_ci/iterate.py` around lines 48 - 53, The regexes currently use
\S+ which can capture trailing punctuation; update the two re.search patterns in
iterate.py (the calls assigning to variable m) to use a stricter repo path
pattern and a boundary/lookahead so links don't include trailing punctuation —
e.g. replace \S+ with a pattern that matches repo paths like
[A-Za-z0-9_.-]+(?:/[A-Za-z0-9_.-]+)+ and append a lookahead such as
(?=$|\s|[)\]\>.,'"]) or a negative lookahead for common trailing punctuation;
ensure you update both the GitLab pattern (https?://gitlab\.com/...) and the
GitHub pattern (https?://github\.com/...) so returned m.group(0) never contains
trailing markdown punctuation.

return None


def count_iterations(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Jira comments or MR/PR comments? I think Jira, in which case it should probably be a part of the jira module.

comments: list[dict],
bot_identifier: str = DEFAULT_BOT_IDENTIFIER,
) -> int:
"""Count iteration comments posted by the bot.

Looks for comments containing the bot identifier and an
``Iteration N/M:`` pattern.

Args:
comments: List of comment dicts with ``"body"`` keys.
bot_identifier: String that identifies bot comments.

Returns:
Number of iteration comments found.
"""
pattern = re.compile(r"Iteration \d+/\d+:")
return sum(
1
for c in comments
if bot_identifier in c.get("body", "") and pattern.search(c.get("body", ""))
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have comments that have multiple iterations in a single comment, this needs to be updated to findall() since search() will return the first pattern only.



def already_notified_ready(
comments: list[dict],
bot_pattern: re.Pattern[str] | None = None,
) -> bool:
"""Check if last bot comment is a 'ready for merge' notification.

Args:
comments: List of comment dicts with ``"body"`` keys.
bot_pattern: Compiled regex to identify bot comments.
Defaults to matching ``jira-autofix bot`` headers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.


Returns:
``True`` if the last bot comment indicates the MR/PR is ready.
"""
if bot_pattern is None:
bot_pattern = re.compile(
rf"^(h3\.\s+|#{{1,6}}\s+)?{re.escape(DEFAULT_BOT_IDENTIFIER)}",
re.MULTILINE,
)
last_bot_body = ""
for c in comments:
if bot_pattern.search(c.get("body", "")):
last_bot_body = c.get("body", "")
return (
"ready for a maintainer to review and merge" in last_bot_body
or "ready for a maintainer to merge" in last_bot_body
)
Comment on lines +101 to +107

@coderabbitai coderabbitai Bot Jun 3, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

“Last bot comment” logic is order-dependent and can return wrong readiness state

At Line 101–Line 107, the function assumes comments are oldest→newest. If the caller provides newest-first (common API behavior), readiness detection flips.

Suggested fix
-    last_bot_body = ""
-    for c in comments:
-        if bot_pattern.search(c.get("body", "")):
-            last_bot_body = c.get("body", "")
+    bot_comments = [c for c in comments if bot_pattern.search(c.get("body", ""))]
+    if not bot_comments:
+        return False
+
+    # Prefer explicit timestamps when present; fallback to input order.
+    def _created_at(comment: dict) -> str:
+        return str(comment.get("created_at", ""))
+
+    last_bot_body = max(bot_comments, key=_created_at).get("body", "") if any(
+        "created_at" in c for c in bot_comments
+    ) else bot_comments[-1].get("body", "")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/agentic_ci/iterate.py` around lines 101 - 107, The current detection of
the "last bot comment" is order-dependent; fix the logic in iterate.py by
explicitly finding the most recent bot comment instead of assuming comments are
oldest→newest: locate the last bot comment by either iterating
reversed(comments) to pick the first match or by selecting the comment with the
newest timestamp (e.g., max(comments, key=lambda c: c.get("created_at"))) that
matches bot_pattern, then set last_bot_body from that comment and evaluate the
existing readiness check using last_bot_body.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid, we should sort the comments.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mprpic, here's a clean minimal patch for already_notified_ready that sorts by created_at when timestamps are present, and falls back safely to the last element in input order:

def already_notified_ready(
    comments: list[dict],
    bot_pattern: re.Pattern[str] | None = None,
) -> bool:
    if bot_pattern is None:
        bot_pattern = re.compile(
            rf"^(h3\.\s+|#{{1,6}}\s+)?{re.escape(DEFAULT_BOT_IDENTIFIER)}",
            re.MULTILINE,
        )
    bot_comments = [c for c in comments if bot_pattern.search(c.get("body", ""))]
    if not bot_comments:
        return False

    if any("created_at" in c for c in bot_comments):
        last = max(bot_comments, key=lambda c: c.get("created_at", ""))
    else:
        last = bot_comments[-1]

    last_bot_body = last.get("body", "")
    return (
        "ready for a maintainer to review and merge" in last_bot_body
        or "ready for a maintainer to merge" in last_bot_body
    )

Key changes:

  • Collects all bot comments first; returns False early if none found (avoids a silent miss).
  • Uses max(..., key=created_at) when timestamps exist — handles both oldest→newest and newest→first API responses correctly.
  • Falls back to bot_comments[-1] (last in input order) when no created_at keys are present, preserving existing behaviour for callers that pre-sort.

🧠 Learnings used
Learnt from: mprpic
Repo: opendatahub-io/agentic-ci PR: 21
File: src/agentic_ci/log.py:24-26
Timestamp: 2026-05-23T02:11:19.002Z
Learning: When reviewing this repo’s code, don’t conflate `agentic_ci.log.info` with Python’s stdlib `logging.Logger.info`. `src/agentic_ci/log.py` defines an `info(msg: str)` helper that only accepts a single string, so calls to `agentic_ci.log.info` must pass a fully-formed message (no `%`/format placeholders with extra arguments). If you see `log.info` usage like `log.info("[%s] ...", arg1, ...)`, that pattern matches stdlib `logging.Logger`’s `%`-style formatting (i.e., multiple arguments), not the `agentic_ci.log.info` helper—review suggestions should respect which one is being called.



def resolve_threads(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a method of the Forge class then? Same goes for the two methods below that require forge access.

mr_url: str,
review_comments: list[dict],
reply_message: str = "Addressed in the latest revision.",
github_token: str | None = None,
) -> int:
"""Reply to and resolve review threads on an MR/PR.

For each comment with a ``thread_id``, posts a reply and resolves
the thread via the detected forge API.

Args:
mr_url: Full URL of the MR/PR.
review_comments: List of review comment dicts with ``"thread_id"`` keys.
reply_message: Message to post as a reply before resolving.
github_token: Token for GitHub API authentication.

Returns:
Number of threads successfully resolved.
"""
if not review_comments:
return 0
try:
forge = detect_forge(mr_url, github_token=github_token)
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("forge detection error for %s: %s", mr_url, exc)
return 0

resolved = 0
for comment in review_comments:
thread_id = comment.get("thread_id")
if not thread_id:
continue
try:
forge.reply(mr_url, str(thread_id), reply_message)
forge.resolve(mr_url, str(thread_id))
resolved += 1
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("forge thread resolve error: %s", exc)
return resolved


def check_mr_state(
mr_url: str,
github_token: str | None = None,
) -> dict | None:
"""Check MR/PR state via the forge API.

Args:
mr_url: Full URL of the MR/PR.
github_token: Token for GitHub API authentication.

Returns:
Dict with ``state``, ``source_branch``, ``pipeline_status`` keys,
or ``None`` on error.
"""
try:
forge = detect_forge(mr_url, github_token=github_token)
return forge.mr_status(mr_url)
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("mr_status error for %s: %s", mr_url, exc)
return None


def collect_feedback(
mr_url: str,
github_token: str | None = None,
) -> dict:
"""Collect review comments, general comments, and CI failures from an MR/PR.

Makes three independent forge API calls, catching errors individually
so partial results are still returned.

Args:
mr_url: Full URL of the MR/PR.
github_token: Token for GitHub API authentication.

Returns:
Dict with keys: ``review_comments``, ``general_comments``,
``ci_failures``, ``review_count``, ``general_count``, ``ci_fail_count``.
"""
empty: dict = {
"review_comments": [],
"general_comments": [],
"ci_failures": {},
"review_count": 0,
"general_count": 0,
"ci_fail_count": 0,
}
try:
forge = detect_forge(mr_url, github_token=github_token)
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("forge error for %s: %s", mr_url, exc)
return empty

review_comments: list[dict] = []
general_comments: list[dict] = []
ci_failures: dict = {}

try:
review_comments = forge.review_comments(mr_url) or []
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("review_comments error for %s: %s", mr_url, exc)
try:
general_comments = forge.general_comments(mr_url) or []
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("general_comments error for %s: %s", mr_url, exc)
try:
ci_failures = forge.pipeline_failures(mr_url) or {}
except (ForgeError, requests.RequestException, ValueError, OSError) as exc:
log.warning("pipeline_failures error for %s: %s", mr_url, exc)

ci_fail_count = len(ci_failures.get("failed_jobs", [])) if isinstance(ci_failures, dict) else 0
return {
"review_comments": review_comments,
"general_comments": general_comments,
"ci_failures": ci_failures,
"review_count": len(review_comments),
"general_count": len(general_comments),
"ci_fail_count": ci_fail_count,
}
Loading
Loading