Skip to content

Conversation

dennis-bilson-port
Copy link
Member

@dennis-bilson-port dennis-bilson-port commented Oct 14, 2025

User description

Implemented filtering logic in the get_clusters method to exclude unavailable clusters from resource fetching operations. The integration now only processes clusters with a "Successful" connection state, preventing errors and unnecessary API calls to unreachable clusters.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Completed a full resync from a freshly installed integration and it completed successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

image image

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Bug fix, Enhancement


Description

  • Added cluster availability filtering to exclude unavailable clusters from resource fetching operations

  • Implemented dedicated get_clusters method with async generator pattern for paginated cluster retrieval

  • Introduced ClusterState enum to standardize cluster availability checks

  • Moved CLUSTER from ObjectKind to ResourceKindsWithSpecialHandling for specialized processing


Diagram Walkthrough

flowchart LR
  A["ArgoCD API"] --> B["get_clusters method"]
  B --> C["Filter by connectionState"]
  C --> D["Yield available clusters"]
  D --> E["on_clusters_resync handler"]
Loading

File Walkthrough

Relevant files
Enhancement
client.py
Implement cluster availability filtering and dedicated retrieval
method

integrations/argocd/client.py

  • Added ClusterState enum with AVAILABLE status constant
  • Moved CLUSTER from ObjectKind to ResourceKindsWithSpecialHandling
  • Implemented get_clusters async generator method with availability
    filtering
  • Added _available_clusters instance variable for batch processing
  • Updated get_resources to use .get() for safer dictionary access
+37/-2   
main.py
Add dedicated resync handler for cluster resources             

integrations/argocd/main.py

  • Added on_clusters_resync handler for cluster resource synchronization
  • Integrated new get_clusters method with async generator pattern
+7/-0     
Tests
test_client.py
Add comprehensive test coverage for cluster filtering logic

integrations/argocd/tests/test_client.py

  • Added ClusterState import for test assertions
  • Removed cluster testing from generic test_get_resources function
  • Added test_get_clusters_with_only_available_clusters test case
  • Added test_get_clusters_filters_unavailable_clusters test case
+103/-13
Documentation
CHANGELOG.md
Document cluster availability filtering improvement           

integrations/argocd/CHANGELOG.md

  • Added version 0.1.224 release entry with improvement note
+7/-0     
Configuration changes
pyproject.toml
Update package version to 0.1.224                                               

integrations/argocd/pyproject.toml

  • Bumped version from 0.1.223 to 0.1.224
+1/-1     

@dennis-bilson-port dennis-bilson-port self-assigned this Oct 14, 2025
@dennis-bilson-port dennis-bilson-port marked this pull request as ready for review October 14, 2025 16:13
@dennis-bilson-port dennis-bilson-port requested a review from a team as a code owner October 14, 2025 16:13
Copy link
Contributor

qodo-merge-pro bot commented Oct 14, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
No custom compliance provided

Follow the guide to enable custom compliance check.

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

Copy link
Contributor

qodo-merge-pro bot commented Oct 14, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Refactor to filter resources by cluster

The current implementation only filters cluster objects, not the resources
within them. The suggestion is to first get a list of available clusters and
then use it to filter other resource requests, like applications, to avoid
making calls to unavailable clusters.

Examples:

integrations/argocd/main.py [19-25]
async def on_resources_resync(kind: str) -> RAW_RESULT:
    if kind in iter(ResourceKindsWithSpecialHandling):
        logger.info(f"Kind {kind} has a special handling. Skipping...")
        return []
    else:
        argocd_client = init_client()
        return await argocd_client.get_resources(resource_kind=ObjectKind(kind))
integrations/argocd/client.py [87-115]
    async def get_clusters(self) -> AsyncGenerator[list[dict[str, Any]], None]:
        url = f"{self.api_url}/{ResourceKindsWithSpecialHandling.CLUSTER}s"
        try:
            response_data = await self._send_api_request(url=url)
            for cluster in response_data.get("items", []):
                if (
                    cluster.get("connectionState", {}).get("status")
                    == ClusterState.AVAILABLE.value
                ):
                    self._available_clusters.append(cluster)

 ... (clipped 19 lines)

Solution Walkthrough:

Before:

# in main.py
@ocean.on_resync()
async def on_resources_resync(kind: str):
    # This is called for "application", "project", etc.
    argocd_client = init_client()
    # Fetches ALL resources (e.g., applications) without
    # considering cluster availability.
    return await argocd_client.get_resources(resource_kind=ObjectKind(kind))

# in client.py
async def get_clusters(self):
    # This method correctly filters clusters...
    response_data = await self._send_api_request(...)
    for cluster in response_data.get("items", []):
        if cluster.get("connectionState", {}).get("status") == "Successful":
            # ...but the result is only used to sync cluster objects,
            # not to filter other resources.
            yield [cluster]

After:

# A potential implementation in client.py
class ArgocdClient:
    async def get_available_clusters(self) -> list[dict[str, Any]]:
        # New or modified method to return a list of available clusters
        # This would be called first.
        ...

    async def get_resources_from_available_clusters(self, resource_kind: str):
        available_clusters = await self.get_available_clusters()
        cluster_names = [c['name'] for c in available_clusters]

        # Fetch resources (e.g. applications) only for available clusters
        all_resources = []
        for name in cluster_names:
            # The ArgoCD API supports filtering applications by cluster
            resources = await self._send_api_request(
                url=f"{self.api_url}/{resource_kind}s",
                query_params={"cluster": name}
            )
            all_resources.extend(resources.get("items", []))
        return all_resources
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical flaw where the PR only filters cluster objects but fails to prevent resource fetching from unavailable clusters, thus not fully achieving its stated goal.

High
Possible issue
Fix inconsistent and buggy exception handling
Suggestion Impact:The commit removed the use of the shared _available_clusters buffer and corrected error handling by eliminating the special TimeoutException branch and adjusting general exception handling to yield [] when ignore_server_error is True and re-raise otherwise. This aligns with the suggestion’s intent to fix buggy exception handling and prevent silent failures.

code diff:

         url = f"{self.api_url}/{ResourceKindsWithSpecialHandling.CLUSTER}s"
         try:
             response_data = await self._send_api_request(url=url)
+            available_clusters: list[dict[str, Any]] = []
             for cluster in response_data.get("items", []):
                 if (
                     cluster.get("connectionState", {}).get("status")
                     == ClusterState.AVAILABLE.value
                 ):
-                    self._available_clusters.append(cluster)
-                if len(self._available_clusters) >= PAGE_SIZE:
-                    yield self._available_clusters
-                    self._available_clusters = []
-
-            if self._available_clusters:
-                yield self._available_clusters
-                self._available_clusters = []
-        except httpx.TimeoutException as e:
-            logger.error(f"Request timed out while fetching clusters: {e}")
-            if self.ignore_server_error:
-                yield []
-            else:
-                yield self._available_clusters
-                self._available_clusters = []
+                    available_clusters.append(cluster)
+                if len(available_clusters) >= PAGE_SIZE:
+                    yield available_clusters
+                    available_clusters = []
+
+            if available_clusters:
+                yield available_clusters
         except Exception as e:
             logger.error(f"Failed to fetch clusters: {e}")
             if self.ignore_server_error:
                 yield []
             raise e

Correct the exception handling in get_clusters to properly propagate errors or
ignore them based on the ignore_server_error flag. This prevents silent failures
and ensures the ignore_server_error setting works as intended.

integrations/argocd/client.py [104-115]

 except httpx.TimeoutException as e:
     logger.error(f"Request timed out while fetching clusters: {e}")
     if self.ignore_server_error:
         yield []
     else:
-        yield self._available_clusters
-        self._available_clusters = []
+        raise e
 except Exception as e:
     logger.error(f"Failed to fetch clusters: {e}")
     if self.ignore_server_error:
         yield []
-    raise e
+    else:
+        raise e

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies two bugs in the exception handling logic that could lead to silent data loss or incorrect behavior of the ignore_server_error flag, making it a valuable correction for the reliability of the integration.

Medium
Avoid race conditions using local variables
Suggestion Impact:The commit removed the instance attribute _available_clusters and refactored get_clusters to use a local available_clusters list for buffering and yielding results, aligning with the suggestion to avoid shared state and race conditions.

code diff:

-        self._available_clusters: list[dict[str, Any]] = []
 
     async def _send_api_request(
         self,
@@ -88,42 +87,53 @@
         url = f"{self.api_url}/{ResourceKindsWithSpecialHandling.CLUSTER}s"
         try:
             response_data = await self._send_api_request(url=url)
+            available_clusters: list[dict[str, Any]] = []
             for cluster in response_data.get("items", []):
                 if (
                     cluster.get("connectionState", {}).get("status")
                     == ClusterState.AVAILABLE.value
                 ):
-                    self._available_clusters.append(cluster)
-                if len(self._available_clusters) >= PAGE_SIZE:
-                    yield self._available_clusters
-                    self._available_clusters = []
-
-            if self._available_clusters:
-                yield self._available_clusters
-                self._available_clusters = []
-        except httpx.TimeoutException as e:
-            logger.error(f"Request timed out while fetching clusters: {e}")
-            if self.ignore_server_error:
-                yield []
-            else:
-                yield self._available_clusters
-                self._available_clusters = []
+                    available_clusters.append(cluster)
+                if len(available_clusters) >= PAGE_SIZE:
+                    yield available_clusters
+                    available_clusters = []
+
+            if available_clusters:
+                yield available_clusters
         except Exception as e:

To prevent potential race conditions and data corruption, change the
_available_clusters buffer in get_clusters from an instance attribute to a local
variable, making the method re-entrant and safe for concurrent execution.

integrations/argocd/client.py [97-103]

-if len(self._available_clusters) >= PAGE_SIZE:
-    yield self._available_clusters
-    self._available_clusters = []
+available_clusters: list[dict[str, Any]] = []
+for cluster in response_data.get("items", []):
+    if (
+        cluster.get("connectionState", {}).get("status")
+        == ClusterState.AVAILABLE.value
+    ):
+        available_clusters.append(cluster)
+    if len(available_clusters) >= PAGE_SIZE:
+        yield available_clusters
+        available_clusters = []
 
-if self._available_clusters:
-    yield self._available_clusters
-    self._available_clusters = []
+if available_clusters:
+    yield available_clusters

[Suggestion processed]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that using an instance variable as a buffer makes the get_clusters method non-reentrant, which could lead to race conditions. Using a local variable is a good practice for making the method safer for concurrent use.

Medium
  • Update

@github-actions github-actions bot added size/L and removed size/M labels Oct 14, 2025
Copy link
Member

@mk-armah mk-armah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@ivankalinovski ivankalinovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dennis-bilson-port dennis-bilson-port merged commit 5a56095 into main Oct 15, 2025
25 checks passed
@dennis-bilson-port dennis-bilson-port deleted the PORT-16575-fix-argocd-cluster-unreachability-causing-prolonged-sync-durations branch October 15, 2025 11:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants