-
Notifications
You must be signed in to change notification settings - Fork 1.2k
.pr_agent_auto_best_practices
Pattern 1: When accessing nested configuration or attributes from external clients or settings, use safe accessors (getattr/get with defaults) and validate allowed values to prevent AttributeError and misconfiguration; raise clear errors on invalid options.
Example code before:
explicit_auth_type = settings.GITLAB.AUTH_TYPE
if "gitlab.com" in gitlab_url:
method = "oauth_token"
token = client.oauth_token or client.private_token
Example code after:
explicit_auth_type = get_settings().get("GITLAB.AUTH_TYPE")
if explicit_auth_type and explicit_auth_type not in {"oauth_token","private_token"}:
raise ValueError(f"Unsupported GITLAB.AUTH_TYPE: {explicit_auth_type}")
from urllib.parse import urlparse
hostname = (urlparse(gitlab_url).hostname or "").lower()
method = "oauth_token" if hostname.endswith("gitlab.com") or hostname.endswith("gitlab.io") else "private_token"
token = getattr(client, "oauth_token", None) or getattr(client, "private_token", None)
Relevant past accepted suggestions:
Suggestion 1:
Validate and normalize status
Validate the configured status to prevent invalid values from being passed to Azure DevOps. Normalize and restrict to supported statuses (e.g., "active" or "closed") with a safe default fallback to "closed".
pr_agent/git_providers/azuredevops_provider.py [355-358]
-status = get_settings().azure_devops.default_comment_status
-if status is None:
+status = getattr(get_settings().azure_devops, "default_comment_status", None)
+if isinstance(status, str):
+ status_normalized = status.strip().lower()
+ status = status_normalized if status_normalized in {"active", "closed"} else "closed"
+else:
status = "closed"
thread = CommentThread(comments=[comment], thread_context=thread_context, status=status)Suggestion 2:
Add safe config access
Guard access to nested config attributes in case azure_devops is missing. Use .get or getattr with defaults to avoid AttributeError and ensure a safe fallback.
pr_agent/git_providers/azuredevops_provider.py [355-358]
-status = get_settings().azure_devops.default_comment_status
-if status is None:
- status = "closed"
+settings = get_settings()
+status = getattr(getattr(settings, "azure_devops", None), "default_comment_status", None) or "closed"
thread = CommentThread(comments=[comment], thread_context=thread_context, status=status)Suggestion 3:
Add null safety for attribute access
The code directly accesses oauth_token and private_token attributes without checking if they exist, which could raise AttributeError if the GitLab instance doesn't have these attributes. Add defensive checks using getattr() with default values to prevent runtime exceptions.
pr_agent/git_providers/gitlab_provider.py [722]
-access_token = self.gl.oauth_token or self.gl.private_token
+access_token = getattr(self.gl, 'oauth_token', None) or getattr(self.gl, 'private_token', None)Suggestion 4:
Add safe attribute access
This assumes both oauth_token and private_token attributes exist on the GitLab instance. Add validation to ensure the expected token attribute exists before accessing it.
pr_agent/git_providers/gitlab_provider.py [722]
-access_token = self.gl.oauth_token or self.gl.private_token
+access_token = getattr(self.gl, 'oauth_token', None) or getattr(self.gl, 'private_token', None)Suggestion 5:
Validate explicit authentication type configuration
The code doesn't validate the value of explicit_auth_type from settings. If a user provides an unsupported value, the init method will silently default to using private_token, which can lead to confusing authentication failures. It's better to validate the configuration and raise an error for invalid values.
pr_agent/git_providers/gitlab_provider.py [76-78]
explicit_auth_type = get_settings().get("GITLAB.AUTH_TYPE", None)
if explicit_auth_type:
+ if explicit_auth_type not in ["oauth_token", "private_token"]:
+ raise ValueError(f"Unsupported GITLAB.AUTH_TYPE: '{explicit_auth_type}'. Must be 'oauth_token' or 'private_token'.")
return explicit_auth_typeSuggestion 6:
Use robust URL parsing for logic
Using a simple substring check (in) on the URL to determine the authentication method is fragile. It can lead to incorrect matches for URLs that contain "gitlab.com" or "gitlab.io" in subdomains or paths unexpectedly. A more robust approach is to parse the URL and check the hostname.
pr_agent/git_providers/gitlab_provider.py [80-83]
+from urllib.parse import urlparse
# Default strategy: gitlab.com and gitlab.io use oauth_token, others use private_token
-if "gitlab.com" in gitlab_url or "gitlab.io" in gitlab_url:
- return "oauth_token"
+try:
+ hostname = urlparse(gitlab_url).hostname
+ if hostname and (hostname == "gitlab.com" or hostname.endswith(".gitlab.com") or hostname == "gitlab.io" or hostname.endswith(".gitlab.io")):
+ return "oauth_token"
+except Exception:
+ # Fallback for invalid URLs, though gitlab.Gitlab would likely fail later anyway
+ pass
return "private_token"Suggestion 7:
Fix inconsistent settings access logic
The setting access method in the if condition is inconsistent with how settings are defined and accessed elsewhere. The check get_settings().get("LITELLM.MODEL_ID", None) will likely fail to find the setting, which is defined under the [litellm] section in the configuration files. To ensure the setting is correctly retrieved, it's better to fetch it once, store it in a variable, and then use that variable for both the check and the assignment.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [356-358]
-if get_settings().get("LITELLM.MODEL_ID", None) and 'bedrock/' in model:
- kwargs["model_id"] = get_settings().litellm.model_id
- get_logger().info(f"Using Bedrock custom inference profile: {get_settings().litellm.model_id}")
+model_id = get_settings().get("litellm.model_id")
+if model_id and 'bedrock/' in model:
+ kwargs["model_id"] = model_id
+ get_logger().info(f"Using Bedrock custom inference profile: {model_id}")Suggestion 8:
Use consistent setting access pattern
The code uses inconsistent access patterns for the same setting. Use get_settings().litellm.extra_body consistently instead of mixing get_settings().get("LITELLM.EXTRA_BODY", None) and get_settings().litellm.extra_body.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [368-375]
-if get_settings().get("LITELLM.EXTRA_BODY", None):
+if get_settings().litellm.extra_body:
try:
litellm_extra_body = json.loads(get_settings().litellm.extra_body)
if not isinstance(litellm_extra_body, dict):
raise ValueError("LITELLM.EXTRA_BODY must be a JSON object")
kwargs.update(litellm_extra_body)
except json.JSONDecodeError as e:
raise ValueError(f"LITELLM.EXTRA_BODY contains invalid JSON: {str(e)}")Suggestion 9:
Handle missing configuration safely
The code is directly accessing aws_secrets_manager.secret_arn which will raise an AttributeError if the aws_secrets_manager section doesn't exist in the configuration. Use the get() method with a default value instead, similar to how region_name is retrieved.
pr_agent/secret_providers/aws_secrets_manager_provider.py [20-22]
-self.secret_arn = get_settings().aws_secrets_manager.secret_arn
+self.secret_arn = get_settings().get("aws_secrets_manager.secret_arn")
if not self.secret_arn:
raise ValueError("AWS Secrets Manager ARN is not configured")Suggestion 10:
Validate required configuration value
The code doesn't check if secret_arn is None or empty before using it. This could lead to runtime errors when attempting to access AWS Secrets Manager without a valid ARN. Add validation to ensure secret_arn is properly configured.
pr_agent/secret_providers/aws_secrets_manager_provider.py [11-24]
def __init__(self):
try:
region_name = get_settings().get("aws_secrets_manager.region_name") or \
get_settings().get("aws.AWS_REGION_NAME")
if region_name:
self.client = boto3.client('secretsmanager', region_name=region_name)
else:
self.client = boto3.client('secretsmanager')
self.secret_arn = get_settings().aws_secrets_manager.secret_arn
+ if not self.secret_arn:
+ raise ValueError("AWS Secrets Manager ARN is not configured")
except Exception as e:
get_logger().error(f"Failed to initialize AWS Secrets Manager Provider: {e}")
raise eSuggestion 11:
Missing parameter in function call
The get_settings() call is missing the use_context=False parameter that was present in the original code. This could lead to context-related issues when retrieving the Anthropic API key.
pr_agent/algo/token_handler.py [100-106]
def _calc_claude_tokens(self, patch: str) -> int:
try:
import anthropic
from pr_agent.algo import MAX_TOKENS
- client = anthropic.Anthropic(api_key=get_settings().get('anthropic.key'))
+ client = anthropic.Anthropic(api_key=get_settings(use_context=False).get('anthropic.key'))
max_tokens = MAX_TOKENS[get_settings().config.model]Pattern 2: Add null-safety and structure checks when parsing webhook or API payloads and lists; avoid chained gets on possibly missing keys and validate required components before concatenation or use.
Example code before:
sender = data["pullRequest"]["author"]["user"]["name"]
repo_full_name = pr["toRef"]["repository"]["project"]["key"] + "/" + pr["toRef"]["repository"]["slug"]
files = [f for f in files if not r.match(f['path']['toString'])]
Example code after:
pr = data.get("pullRequest", {}) or {}
author = pr.get("author", {}) or {}
user = author.get("user", {}) or {}
sender = user.get("name", "")
repo = (pr.get("toRef", {}) or {}).get("repository", {}) or {}
proj = repo.get("project", {}) or {}
repo_full_name = f"{proj.get('key')}/{repo.get('slug')}" if proj.get('key') and repo.get('slug') else ""
files = [f for f in files if (f.get('path', {}) or {}).get('toString') and not r.match(f['path']['toString'])]
Relevant past accepted suggestions:
Suggestion 1:
Add safe dictionary access
Add error handling for potential KeyError when accessing nested dictionary keys. The current code assumes f['path']['toString'] always exists, which could cause crashes if the structure is different.
pr_agent/algo/file_filter.py [59-60]
elif platform == 'bitbucket_server':
- files = [f for f in files if not r.match(f['path']['toString'])]
+ files = [f for f in files if f.get('path', {}).get('toString') and not r.match(f['path']['toString'])]Suggestion 2:
Add null safety checks
The code chains multiple .get() calls without null safety checks, which could cause AttributeError if intermediate values are None. Add proper null checks before accessing nested properties to prevent runtime errors.
pr_agent/servers/bitbucket_server_webhook.py [46-52]
pr_data = data.get("pullRequest", {})
title = pr_data.get("title", "")
-source_branch = pr_data.get("fromRef", {}).get("displayId", "")
-target_branch = pr_data.get("toRef", {}).get("displayId", "")
-sender = pr_data.get("author", {}).get("user", {}).get("name", "")
-project_key = pr_data.get("toRef", {}).get("repository", {}).get("project", {}).get("key", "")
-repo_slug = pr_data.get("toRef", {}).get("repository", {}).get("slug", "")
+from_ref = pr_data.get("fromRef", {})
+source_branch = from_ref.get("displayId", "") if from_ref else ""
+
+to_ref = pr_data.get("toRef", {})
+target_branch = to_ref.get("displayId", "") if to_ref else ""
+
+author = pr_data.get("author", {})
+user = author.get("user", {}) if author else {}
+sender = user.get("name", "") if user else ""
+
+repository = to_ref.get("repository", {}) if to_ref else {}
+project = repository.get("project", {}) if repository else {}
+project_key = project.get("key", "") if project else ""
+repo_slug = repository.get("slug", "") if repository else ""
+Suggestion 3:
Validate repository name components
The string concatenation could result in malformed repository names if either key or slug is empty. Add validation to ensure both components exist before concatenation.
pr_agent/servers/bitbucket_server_webhook.py [50]
-repo_full_name = pr_data.get("toRef", {}).get("repository", {}).get("project", {}).get("key", "") + "/" + pr_data.get("toRef", {}).get("repository", {}).get("slug", "")
+project_key = pr_data.get("toRef", {}).get("repository", {}).get("project", {}).get("key", "")
+repo_slug = pr_data.get("toRef", {}).get("repository", {}).get("slug", "")
+repo_full_name = f"{project_key}/{repo_slug}" if project_key and repo_slug else ""Suggestion 4:
Add null check
The method is accessing self.pr.head.ref directly without checking if self.pr.head exists first. This could cause a NoneType error if the PR head is None.
pr_agent/git_providers/gitea_provider.py [540-546]
def get_pr_branch(self) -> str:
"""Get the branch name of the PR"""
if not self.pr:
self.logger.error("Failed to get PR branch")
return ""
+ if not self.pr.head:
+ self.logger.error("PR head is None")
+ return ""
+
return self.pr.head.ref if self.pr.head.ref else ""Suggestion 5:
Add null check for SHA
Add a null check for the commit SHA before using it in the API call. If the SHA is None or empty, the API call might fail with an unexpected error.
pr_agent/git_providers/gitea_provider.py [100-108]
-if file_path:
+if file_path and self.sha:
try:
content = self.repo_api.get_file_content(
owner=self.owner,
repo=self.repo,
commit_sha=self.sha,
filepath=file_path
)
self.file_contents[file_path] = contentSuggestion 6:
Handle string configuration values properly
Add validation to ensure code_generators is a list before iterating. If the configuration value is a string, it should be converted to a list to prevent iteration over individual characters.
pr_agent/algo/file_filter.py [22-27]
code_generators = get_settings().config.get('ignore_language_framework', [])
+if isinstance(code_generators, str):
+ code_generators = [code_generators]
for cg in code_generators:
glob_patterns = get_settings().generated_code.get(cg, [])
if isinstance(glob_patterns, str):
glob_patterns = [glob_patterns]
patterns += translate_globs_to_regexes(glob_patterns)Suggestion 7:
Handle single TodoItem type safely
This code will fail when value is a single TodoItem dict rather than a list, since len() cannot be called on a dict in this context. Add a type check to handle both cases properly.
-todo_entry_label = f"{len(value)} " + "entries" if len(value) > 1 else "entry"
+if isinstance(value, list):
+ todo_entry_label = f"{len(value)} " + ("entries" if len(value) > 1 else "entry")
+else:
+ todo_entry_label = "1 entry"Pattern 3: Improve exception handling by preserving context (raise from), capturing exceptions into variables, sanitizing logs to avoid secrets leakage, and converting returns of exception objects into proper raises.
Example code before:
except TypeError:
logger.warning("Fallback", artifact={"error": e})
...
self.logger.error(error_msg)
return RuntimeError(error_msg)
Example code after:
except TypeError as e:
logger.warning("Fallback", artifact={"error": str(e)})
...
self.logger.error(error_msg)
raise RuntimeError(error_msg)
Relevant past accepted suggestions:
Suggestion 1:
Fix undefined exception variable
In the except TypeError block, capture the exception as e (i.e., except TypeError as e:) to fix the NameError when logging the error.
pr_agent/git_providers/utils.py [39-53]
try:
new_settings = Dynaconf(settings_files=[repo_settings_file],
# Disable all dynamic loading features
load_dotenv=False, # Don't load .env files
merge_enabled=False, # Don't allow merging from other sources
)
-except TypeError:
+except TypeError as e:
import traceback
# Fallback for older Dynaconf versions that don't support these parameters
get_logger().warning(
"Your Dynaconf version does not support disabled 'load_dotenv'/'merge_enabled' parameters. "
"Loading repo settings without these security features. "
"Please upgrade Dynaconf for better security.",
artifact={"error": e, "traceback": traceback.format_exc()})
new_settings = Dynaconf(settings_files=[repo_settings_file])Suggestion 2:
Preserve original exception context on failure
Chain the last caught exception e when raising the final Exception to preserve the original error context for better debugging.
pr_agent/algo/pr_processing.py [336-337]
if i == len(all_models) - 1: # If it's the last iteration
- raise Exception(f"Failed to generate prediction with any model of {all_models}")
+ raise Exception(f"Failed to generate prediction with any model of {all_models}") from eSuggestion 3:
Prevent credential leakage in logs
Avoid logging the raw exception message if it may include credentials from the client init path. Log a sanitized message and attach the original exception as the cause when re-raising to prevent credential leakage.
pr_agent/git_providers/bitbucket_server_provider.py [48-65]
try:
if self.bearer_token:
self.bitbucket_client = bitbucket_client or Bitbucket(
url=self.bitbucket_server_url,
token=self.bearer_token
)
else:
if not username or not password:
raise ValueError("Bitbucket authentication requires either 'BITBUCKET_SERVER.BEARER_TOKEN' or both 'BITBUCKET_SERVER.USERNAME' and 'BITBUCKET_SERVER.PASSWORD'.")
self.bitbucket_client = bitbucket_client or Bitbucket(
url=self.bitbucket_server_url,
username=username,
password=password
)
except Exception as e:
- get_logger().error(f"Failed to initialize Bitbucket client for {self.bitbucket_server_url}: {e}")
+ get_logger().error(f"Failed to initialize Bitbucket client for {self.bitbucket_server_url}")
raiseSuggestion 4:
Add error handling for GitLab instantiation
The GitLab instance creation can fail due to network issues, invalid tokens, or server errors. Add try-catch blocks around the GitLab instantiation to handle potential exceptions gracefully. This prevents crashes and provides meaningful error messages to users when authentication fails.
pr_agent/git_providers/gitlab_provider.py [41-51]
# Create GitLab instance based on authentication method
-if auth_method == "oauth_token":
- self.gl = gitlab.Gitlab(
- url=gitlab_url,
- oauth_token=gitlab_access_token
- )
-else: # private_token
- self.gl = gitlab.Gitlab(
- url=gitlab_url,
- private_token=gitlab_access_token
- )
+try:
+ if auth_method == "oauth_token":
+ self.gl = gitlab.Gitlab(
+ url=gitlab_url,
+ oauth_token=gitlab_access_token
+ )
+ else: # private_token
+ self.gl = gitlab.Gitlab(
+ url=gitlab_url,
+ private_token=gitlab_access_token
+ )
+except Exception as e:
+ get_logger().error(f"Failed to create GitLab instance: {e}")
+ raise ValueError(f"Unable to authenticate with GitLab: {e}")Suggestion 5:
Validate streaming response content
The method should validate that full_response is not empty before returning. An empty response could indicate a streaming failure or incomplete data transfer that should be handled appropriately.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [414-439]
async def _handle_streaming_response(self, response):
"""
Handle streaming response from acompletion and collect the full response.
Args:
response: The streaming response object from acompletion
Returns:
tuple: (full_response_content, finish_reason)
"""
full_response = ""
finish_reason = None
try:
async for chunk in response:
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_response += delta.content
if chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason
except Exception as e:
get_logger().error(f"Error handling streaming response: {e}")
raise
+ if not full_response:
+ get_logger().warning("Streaming response resulted in empty content")
+ raise openai.APIError("Empty streaming response received")
+
return full_response, finish_reasonSuggestion 6:
Fix exception handling
The function is returning a RuntimeError object instead of raising it when an unexpected response format is received. This will cause issues as the caller expects a string return value, not an exception object.
pr_agent/git_providers/gitea_provider.py [753-762]
if hasattr(response, 'data'):
raw_data = response.data.read()
return raw_data.decode('utf-8')
elif isinstance(response, tuple):
raw_data = response[0].read()
return raw_data.decode('utf-8')
else:
error_msg = f"Unexpected response format received from API: {type(response)}"
self.logger.error(error_msg)
- return RuntimeError(error_msg)
+ raise RuntimeError(error_msg)Suggestion 7:
Add signature verification error handling
The signature verification is missing error handling. If the signature verification fails, the function verify_signature will likely raise an exception, but there's no try-except block to catch it and return an appropriate HTTP error response.
pr_agent/servers/gitea_app.py [45-54]
# Verify webhook signature
webhook_secret = getattr(get_settings().gitea, 'webhook_secret', None)
if webhook_secret:
body_bytes = await request.body()
signature_header = request.headers.get('x-gitea-signature', None)
if not signature_header:
get_logger().error("Missing signature header")
raise HTTPException(status_code=400, detail="Missing signature header")
- verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}")
+ try:
+ verify_signature(body_bytes, webhook_secret, f"sha256={signature_header}")
+ except Exception as e:
+ get_logger().error(f"Invalid signature: {e}")
+ raise HTTPException(status_code=401, detail="Invalid signature")Suggestion 8:
Handle UTF-8 decoding errors gracefully
Add error handling for UTF-8 decoding failures to prevent crashes when encountering files with different encodings or corrupted byte sequences.
pr_agent/git_providers/gitlab_provider.py [115-121]
def _ensure_string_content(self, content: Union[str, bytes, None]) -> str:
"""Convert bytes content to UTF-8 string if needed."""
if content is None:
return ""
if isinstance(content, bytes):
- return content.decode('utf-8')
+ try:
+ return content.decode('utf-8')
+ except UnicodeDecodeError:
+ return content.decode('utf-8', errors='replace')
return contentSuggestion 9:
Add null safety check
Add a null check to handle cases where content might be None. This prevents potential AttributeError when trying to call decode() on None, which would cause runtime errors.
pr_agent/git_providers/gitlab_provider.py [115-119]
-def _ensure_string_content(self, content: Union[str, bytes]) -> str:
+def _ensure_string_content(self, content: Union[str, bytes, None]) -> str:
"""Convert bytes content to UTF-8 string if needed."""
+ if content is None:
+ return ""
if isinstance(content, bytes):
return content.decode('utf-8')
return contentPattern 4: Normalize and validate external-facing inputs (status enums, URL formats, hostnames) before use; pre-parse and sanitize once per operation to avoid repeated calls and brittle string checks.
Example code before:
status = settings.azure_devops.default_comment_status
if "gitlab.com" in url:
...
for item in items:
status = get_settings().azure_devops.get("default_comment_status", "closed")
Example code after:
settings_obj = get_settings()
status_raw = getattr(getattr(settings_obj, "azure_devops", None), "default_comment_status", None)
status = (status_raw or "").strip().lower()
status = status if status in {"active","closed"} else "closed"
from urllib.parse import urlparse
hostname = (urlparse(host_input).hostname or "").lower()
# use hostname and enforce scheme/no trailing slash
cfg_status = status # fetched once outside loops
Relevant past accepted suggestions:
Suggestion 1:
Clarify and normalize host input
Host Address: Leave empty if using gitlab.com (for self-hosted GitLab servers, enter your GitLab instance URL)
-
- Host Address: Leave empty if using gitlab.com (for self-hosted GitLab servers, enter your GitLab base URL including scheme (e.g., https://gitlab.mycorp-inc.com) without trailing slash. Do not include paths or query strings.
- OAuth Application ID: Enter the Application ID from Step 1
-
- OAuth Application Secret: Enter the Secret from Step 1
-
- OAuth Application Secret: Enter the Secret from Step 1
**Clarify and normalize the expected Host Address format to avoid brittle URL handling; specify schema and example patterns explicitly.**
[docs/docs/installation/qodo_merge.md [80]](https://github.com/qodo-ai/pr-agent/pull/2034/files#diff-d087251981cb031769fd0aa0a248474e542021bae50492d3b1bc9dc216b92d43R80-R80)
```diff
-- **Host Address**: Leave empty if using gitlab.com ([for self-hosted GitLab servers](#gitlab-server), enter your GitLab instance URL)
+- **Host Address**: Leave empty if using gitlab.com. For self-hosted, enter your GitLab base URL including scheme (e.g., "https://gitlab.mycorp-inc.com") without trailing slash. Do not include paths or query strings.
Suggestion 2:
Specify strict host URL format
Host Address: Leave empty if using gitlab.com (for self-hosted GitLab servers, enter your GitLab instance URL)
-
- Host Address: Leave empty if using gitlab.com (for self-hosted GitLab servers, enter your GitLab base URL including scheme (e.g., https://gitlab.mycorp-inc.com) without trailing slash. Do not include paths or query strings.
- OAuth Application ID: Enter the Application ID from Step 1
-
- OAuth Application Secret: Enter the Secret from Step 1
-
- OAuth Application Secret: Enter the Secret from Step 1
**Explicitly state the required URL format for Host Address to avoid failures (e.g., missing scheme or trailing slash). Users often enter values like gitlab.mycorp.com or with trailing slashes, causing validation or authorization issues.**
[docs/docs/installation/qodo_merge.md [78-81]](https://github.com/qodo-ai/pr-agent/pull/2034/files#diff-d087251981cb031769fd0aa0a248474e542021bae50492d3b1bc9dc216b92d43R78-R81)
```diff
1. Browse to: <https://register.oauth.app.gitlab.merge.qodo.ai>
2. Fill in the registration form:
- - **Host Address**: Leave empty if using gitlab.com ([for self-hosted GitLab servers](#gitlab-server), enter your GitLab instance URL)
+ - **Host Address**: Leave empty for gitlab.com. For self-hosted, enter the full base URL including scheme, no trailing slash (e.g., `https://gitlab.mycorp-inc.com`).
Suggestion 3:
Avoid repeated calls in a loop
The get_settings() function is called on every iteration of the loop, which is inefficient. Since the settings are not expected to change during the loop, you should fetch the status value once before the loop begins. This will improve performance, especially when publishing a large number of suggestions.
pr_agent/git_providers/azuredevops_provider.py [82]
+status = get_settings().azure_devops.get("default_comment_status", "closed")
for suggestion in code_suggestions:
body = suggestion['body']
relevant_file = suggestion['relevant_file']
relevant_lines_start = suggestion['relevant_lines_start']
relevant_lines_end = suggestion['relevant_lines_end']
if not relevant_lines_start or relevant_lines_start == -1:
get_logger().warning(
f"Failed to publish code suggestion, relevant_lines_start is {relevant_lines_start}")
continue
if relevant_lines_end < relevant_lines_start:
get_logger().warning(f"Failed to publish code suggestion, "
f"relevant_lines_end is {relevant_lines_end} and "
f"relevant_lines_start is {relevant_lines_start}")
continue
thread_context = CommentThreadContext(
file_path=relevant_file,
right_file_start=CommentPosition(offset=1, line=relevant_lines_start),
right_file_end=CommentPosition(offset=1, line=relevant_lines_end))
comment = Comment(content=body, comment_type=1)
- status = get_settings().azure_devops.get("default_comment_status", "closed")
thread = CommentThread(comments=[comment], thread_context=thread_context, status=status)
try:
self.azure_devops_client.create_thread(
comment_thread=thread,
project=self.workspace_slug,
repository_id=self.repo_id,
pull_request_id=self.pr_num
)
post_parameters_list.append(True)
except Exception as e:
get_logger().error(f"Failed to publish code suggestion: {e}")
post_parameters_list.append(False)Suggestion 4:
Validate and normalize status
Validate the configured status to prevent invalid values from being passed to Azure DevOps. Normalize and restrict to supported statuses (e.g., "active" or "closed") with a safe default fallback to "closed".
pr_agent/git_providers/azuredevops_provider.py [355-358]
-status = get_settings().azure_devops.default_comment_status
-if status is None:
+status = getattr(get_settings().azure_devops, "default_comment_status", None)
+if isinstance(status, str):
+ status_normalized = status.strip().lower()
+ status = status_normalized if status_normalized in {"active", "closed"} else "closed"
+else:
status = "closed"
thread = CommentThread(comments=[comment], thread_context=thread_context, status=status)Suggestion 5:
Move URL validation before credential retrieval
The URL validation should occur before retrieving authentication credentials to avoid unnecessary processing. Move this check earlier in the initialization flow.
pr_agent/git_providers/bitbucket_server_provider.py [45-46]
+self.bitbucket_server_url = self._parse_bitbucket_server(url=pr_url)
if not self.bitbucket_server_url:
raise ValueError("Invalid or missing Bitbucket Server URL parsed from PR URL.")
+# Get username and password from settings
+username = get_settings().get("BITBUCKET_SERVER.USERNAME", None)
+password = get_settings().get("BITBUCKET_SERVER.PASSWORD", None)
+Pattern 5: Harden streaming and response handling by validating non-empty content, checking finish_reason, and safely accessing chunk fields with getattr; log aggregated responses for debugging.
Example code before:
full_response = ""
async for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_response += delta.content
if not full_response:
raise APIError("Empty")
Example code after:
full_response, finish_reason = "", None
async for chunk in response:
choice = chunk.choices[0]
content = getattr(choice.delta, "content", None)
if content:
full_response += content
if choice.finish_reason:
finish_reason = choice.finish_reason
if not full_response and not finish_reason:
logger.warning("Empty content and no finish reason")
raise APIError("Empty streaming response received")
Relevant past accepted suggestions:
Suggestion 1:
Improve empty response validation logic
The empty response check should also validate finish_reason to ensure the streaming completed properly. An empty response with a valid finish reason might be acceptable in some cases.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [462-464]
-if not full_response:
+if not full_response and finish_reason != "stop":
get_logger().warning("Streaming response resulted in empty content")
raise openai.APIError("Empty streaming response received")Suggestion 2:
Refine empty stream response handling
The check for an empty response should also consider cases where a finish_reason is present but the content is empty. This can happen, for example, if a content filter is triggered. Raising an error only when both content and finish_reason are missing prevents incorrectly flagging valid empty responses.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [462-464]
-if not full_response:
- get_logger().warning("Streaming response resulted in empty content")
+if not full_response and not finish_reason:
+ get_logger().warning("Streaming response resulted in empty content and no finish reason")
raise openai.APIError("Empty streaming response received")Suggestion 3:
Validate streaming response content
The method should validate that full_response is not empty before returning. An empty response could indicate a streaming failure or incomplete data transfer that should be handled appropriately.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [414-439]
async def _handle_streaming_response(self, response):
"""
Handle streaming response from acompletion and collect the full response.
Args:
response: The streaming response object from acompletion
Returns:
tuple: (full_response_content, finish_reason)
"""
full_response = ""
finish_reason = None
try:
async for chunk in response:
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_response += delta.content
if chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason
except Exception as e:
get_logger().error(f"Error handling streaming response: {e}")
raise
+ if not full_response:
+ get_logger().warning("Streaming response resulted in empty content")
+ raise openai.APIError("Empty streaming response received")
+
return full_response, finish_reasonSuggestion 4:
Enable debug logging for streaming
The current logic prevents logging the full response for streaming models because the complete response object isn't available. However, for debugging purposes, it's valuable to log the aggregated response. You can construct a mock response object for streaming models to enable consistent logging.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [403-406]
# log the full response for debugging
-if not (model in self.streaming_required_models):
+if model in self.streaming_required_models:
+ # for streaming, we don't have the full response object, so we create a mock one
+ response_log = self.prepare_logs(
+ {"choices": [{"message": {"content": resp}, "finish_reason": finish_reason}]},
+ system, user, resp, finish_reason
+ )
+else:
response_log = self.prepare_logs(response, system, user, resp, finish_reason)
- get_logger().debug("Full_response", artifact=response_log)
+get_logger().debug("Full_response", artifact=response_log)Suggestion 5:
Improve streaming response handling robustness
The current implementation for handling streaming responses might be fragile. The delta object within a stream chunk may not always have a content attribute, and checking with hasattr can be simplified. A more robust approach is to use getattr to safely access delta.content.
pr_agent/algo/ai_handlers/litellm_ai_handler.py [414-439]
async def _handle_streaming_response(self, response):
"""
Handle streaming response from acompletion and collect the full response.
Args:
response: The streaming response object from acompletion
Returns:
tuple: (full_response_content, finish_reason)
"""
full_response = ""
finish_reason = None
try:
async for chunk in response:
if chunk.choices and len(chunk.choices) > 0:
- delta = chunk.choices[0].delta
- if hasattr(delta, 'content') and delta.content:
- full_response += delta.content
- if chunk.choices[0].finish_reason:
- finish_reason = chunk.choices[0].finish_reason
+ choice = chunk.choices[0]
+ delta = choice.delta
+ content = getattr(delta, 'content', None)
+ if content:
+ full_response += content
+ if choice.finish_reason:
+ finish_reason = choice.finish_reason
except Exception as e:
get_logger().error(f"Error handling streaming response: {e}")
raise
return full_response, finish_reason[Auto-generated best practices - 2025-10-29]