Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 91 additions & 25 deletions plugins/action/base_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
from ansible.module_utils.common.arg_spec import ArgumentSpecValidator
from ansible.module_utils.six import string_types
from ansible.plugins.action import ActionBase
from ansible_collections.ansible.platform.plugins.plugin_utils.platform.exceptions import (
PlatformError,
AuthenticationError,
ValidationError,
NetworkError,
APIError
)

if TYPE_CHECKING:
from ansible_collections.ansible.platform.plugins.plugin_utils.platform.direct_client import DirectHTTPClient
Expand Down Expand Up @@ -212,10 +219,12 @@ def run(self, tmp=None, task_vars=None):
# Track which manager each task uses
# Key: task_uuid, Value: socket_path
_task_to_manager = {} # type: dict
MAX_RETRIES = 1

def _get_or_spawn_manager(
self,
task_vars: dict
task_vars: dict,
force_spawn: bool = False
) -> Tuple[Union['DirectHTTPClient', 'ManagerRPCClient'], Optional[Dict[str, Any]]]:
"""
Get connection client based on connection mode.
Expand All @@ -232,6 +241,7 @@ def _get_or_spawn_manager(

Args:
task_vars: Task variables from Ansible
force_spawn: If True, forces spawning a new manager (ignores existing reuse)

Returns:
Tuple of (client, facts_dict):
Expand Down Expand Up @@ -260,7 +270,7 @@ def _get_or_spawn_manager(
# Route based on connection mode
if gateway_config.connection_mode == 'experimental':
# Experimental mode: Use persistent manager
return self._get_or_spawn_persistent_manager(task_vars, gateway_config)
return self._get_or_spawn_persistent_manager(task_vars, gateway_config, force_spawn=force_spawn)
else:
# Standard mode (default): Use direct HTTP client
return self._get_direct_client(task_vars, gateway_config)
Expand Down Expand Up @@ -296,7 +306,8 @@ def _get_direct_client(
def _get_or_spawn_persistent_manager(
self,
task_vars: dict,
gateway_config: Any
gateway_config: Any,
force_spawn: bool = False
) -> Tuple['ManagerRPCClient', Optional[Dict[str, Any]]]:
"""
Get existing persistent manager or spawn new one (experimental mode).
Expand All @@ -307,6 +318,7 @@ def _get_or_spawn_persistent_manager(
Args:
task_vars: Task variables from Ansible
gateway_config: Gateway configuration
force_spawn: Force a fresh process spawn

Returns:
Tuple of (ManagerRPCClient, facts_dict):
Expand Down Expand Up @@ -395,7 +407,7 @@ def _get_or_spawn_persistent_manager(
actual_socket_path = None
actual_authkey_b64 = None

if socket_path and authkey_b64:
if not force_spawn and socket_path and authkey_b64:
stored_path_exists = Path(socket_path).exists()
if stored_path_exists:
# Check if stored socket path matches expected (same credentials)
Expand All @@ -410,7 +422,7 @@ def _get_or_spawn_persistent_manager(
logger.info(f" Expected: {expected_socket_path}")

# Also check if expected socket path exists (in case facts weren't updated)
if not manager_found and Path(expected_socket_path).exists() and authkey_b64:
if not force_spawn and not manager_found and Path(expected_socket_path).exists() and authkey_b64:
manager_found = True
actual_socket_path = expected_socket_path
actual_authkey_b64 = authkey_b64
Expand All @@ -430,26 +442,29 @@ def _get_or_spawn_persistent_manager(

client = ManagerRPCClient(gateway_config.base_url, actual_socket_path_str, authkey)

# Track this task's manager
task_uuid = self._get_task_uuid(task_vars)
BaseResourceActionPlugin._task_to_manager[task_uuid] = actual_socket_path_str

# Track this manager in playbook tracking (process-safe)
play_id = self._get_play_id()
tracking = self._read_tracking_file(play_id)
if tracking:
if 'socket_paths' in tracking:
if isinstance(tracking['socket_paths'], list):
tracking['socket_paths'] = set(tracking['socket_paths'])
tracking['socket_paths'].add(actual_socket_path_str)
self._write_tracking_file(play_id, tracking)

logger.debug(f"Successfully connected to existing persistent manager: {actual_socket_path_str}")

return client, {
'platform_manager_socket': actual_socket_path_str,
'platform_manager_authkey': actual_authkey_b64
}
if client.check_health():
# Track this task's manager
task_uuid = self._get_task_uuid(task_vars)
BaseResourceActionPlugin._task_to_manager[task_uuid] = actual_socket_path_str

# Track this manager in playbook tracking (process-safe)
play_id = self._get_play_id()
tracking = self._read_tracking_file(play_id)
if tracking:
if 'socket_paths' in tracking:
if isinstance(tracking['socket_paths'], list):
tracking['socket_paths'] = set(tracking['socket_paths'])
tracking['socket_paths'].add(actual_socket_path_str)
self._write_tracking_file(play_id, tracking)

logger.debug(f"Successfully connected to existing persistent manager: {actual_socket_path_str}")

return client, {
'platform_manager_socket': actual_socket_path_str,
'platform_manager_authkey': actual_authkey_b64
}
else:
logger.warning("Manager connected but failed RPC health check")
except Exception as e:
logger.warning(f"Failed to connect to existing manager: {e}, spawning new one")
# Fall through to spawn new one
Expand Down Expand Up @@ -548,6 +563,57 @@ def _get_or_spawn_persistent_manager(
'gateway_url': gateway_config.base_url
}

def _handle_exception(self, e):
result = {'failed': True}
if isinstance(e, PlatformError):
result['msg'] = str(e)
result['error_type'] = e.__class__.__name__
if hasattr(e, 'status_code') and e.status_code:
result['status_code'] = e.status_code

if isinstance(e, AuthenticationError):
result['suggestion'] = "Check your gateway_username, gateway_password, or gateway_token."
elif isinstance(e, ValidationError):
result['suggestion'] = "Check your playbook parameters."
elif isinstance(e, NetworkError):
result['suggestion'] = "Check your gateway_hostname and network connectivity."
elif isinstance(e, APIError):
result['suggestion'] = "The Gateway server returned an error. Check Gateway logs or try again later."
elif isinstance(e, AnsibleError):
result['msg'] = str(e)
result['error_type'] = 'AnsibleError'
else:
result['msg'] = f"An unexpected error occurred: {str(e)}"
result['error_type'] = 'GeneralError'
import traceback
result['exception'] = traceback.format_exc()
return result

def execute_with_retry(self, manager_client, operation, module_name, data, task_vars):
"""
Execute an operation with automatic retry on connection failure.
"""
attempts = 0
current_client = manager_client

while attempts <= self.MAX_RETRIES:
try:
return current_client.execute(operation, module_name, data)
except (ConnectionError, BrokenPipeError, EOFError) as e:
attempts += 1
if attempts > self.MAX_RETRIES:
logger.error(f"Max retries ({self.MAX_RETRIES}) reached for operation {operation}")
raise e
logger.warning(f"Connection lost during {operation}. Attempting recovery ({attempts}/{self.MAX_RETRIES})...")
try:
# Retry with force_spawn=True to get a fresh manager
new_client, _ = self._get_or_spawn_manager(task_vars, force_spawn=True)
current_client = new_client
logger.info(f"Recovery successful. Retrying operation...")
except Exception as spawn_err:
logger.error(f"Failed to recover manager: {spawn_err}")
raise e

def _build_argspec_from_docs(self, documentation: str) -> dict:
"""
Build argument spec from DOCUMENTATION string.
Expand Down
26 changes: 13 additions & 13 deletions plugins/action/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ def run(self, tmp=None, task_vars=None):
sys.stderr.write("PHASE 15: Checking if user exists (idempotency)\n")
sys.stderr.flush()
try:
find_result = manager.execute(
find_result = self.execute_with_retry(
manager_client=manager,
operation='find',
module_name=self.MODULE_NAME,
ansible_data={'username': user.username}
data={'username': user.username},
task_vars=task_vars
)
sys.stderr.write("PHASE 16: Find operation completed\n")
sys.stderr.flush()
Expand All @@ -176,10 +178,12 @@ def run(self, tmp=None, task_vars=None):
sys.stderr.write("PHASE 15b: Finding user to get ID for delete operation\n")
sys.stderr.flush()
try:
find_result = manager.execute(
find_result = self.execute_with_retry(
manager_client=manager,
operation='find',
module_name=self.MODULE_NAME,
ansible_data={'username': user.username}
data={'username': user.username},
task_vars=task_vars
)
sys.stderr.write("PHASE 16b: Find operation completed for delete\n")
sys.stderr.flush()
Expand Down Expand Up @@ -213,10 +217,12 @@ def run(self, tmp=None, task_vars=None):
# Execute via manager
sys.stderr.write(f"PHASE 18: About to execute {operation} via manager\n")
sys.stderr.flush()
manager_result = manager.execute(
manager_result = self.execute_with_retry(
manager_client=manager,
operation=operation,
module_name=self.MODULE_NAME,
ansible_data=user.__dict__
data=user.__dict__,
task_vars=task_vars
)
sys.stderr.write("PHASE 19: Manager execution completed\n")
sys.stderr.flush()
Expand Down Expand Up @@ -303,13 +309,7 @@ def run(self, tmp=None, task_vars=None):
import traceback
sys.stderr.write(f"TRACEBACK:\n{traceback.format_exc()}\n")
sys.stderr.flush()
self._display.vvv(f"❌ Error in action plugin: {e}")
result['failed'] = True
result['msg'] = str(e)

# Include traceback in verbose mode
if self._display.verbosity >= 3:
result['exception'] = traceback.format_exc()
return self._handle_exception(e)

sys.stderr.write("PHASE 25: Returning result from run()\n")
sys.stderr.flush()
Expand Down
46 changes: 36 additions & 10 deletions plugins/plugin_utils/manager/platform_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
TimeoutError,
classify_exception
)
from ..platform.retry import retry_http_request, RetryConfig
from ..platform.retry import retry_http_request, RetryConfig, retry_with_backoff

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -699,6 +699,13 @@ def _build_url(self, endpoint: str, query_params: Optional[Dict] = None) -> str:
url = f"{url}?{urlencode(query_params)}"

return url

def ping(self) -> bool:
"""
Lightweight health check.
Returns True to confirm the application layer is responsive.
"""
return True

def execute(
self,
Expand Down Expand Up @@ -1041,6 +1048,31 @@ def _find_resource(
ansible_instance = mixin_class.from_api(api_result, context)
from dataclasses import asdict
return asdict(ansible_instance)

@retry_with_backoff(max_retries=3, delay=1, backoff=2)
def _perform_request(self, method: str, url: str, **kwargs) -> requests.Response:
"""
Centralized request method with automatic retry and error handling.
"""
try:
# Increment HTTP request counter (thread-safe)
with self._lock:
self._http_request_count += 1
response = self.session.request(method, url, **kwargs)

if response.status_code in [401, 403]:
raise AuthenticationError(f"Authentication failed: {response.text}")
if response.status_code >= 500:
raise APIError(f"Server Error {response.status_code}: {response.text}", status_code=response.status_code)
if response.status_code >= 400:
raise ValidationError(f"Client Error {response.status_code}: {response.text}")
return response
except requests.exceptions.ConnectionError as e:
raise NetworkError("Connection refused or network down", original_exception=e)
except requests.exceptions.Timeout as e:
raise NetworkError("Request timed out", original_exception=e)
except requests.exceptions.RequestException as e:
raise NetworkError(f"Unexpected network error: {e}", original_exception=e)

def _execute_operations(
self,
Expand Down Expand Up @@ -1105,18 +1137,13 @@ def _execute_operations(
api_start = time.perf_counter()

try:
# Increment HTTP request counter (thread-safe)
with self._lock:
self._http_request_count += 1

response = self.session.request(
endpoint_op.method,
url,
response = self._perform_request(
method=endpoint_op.method,
url=url,
json=request_data,
timeout=self.request_timeout,
verify=self.verify_ssl
)
response.raise_for_status()

# Performance timing: API call end
api_end = time.perf_counter()
Expand Down Expand Up @@ -1319,4 +1346,3 @@ class PlatformManager(ThreadingMixIn, BaseManager):
def register_shutdown_method(service):
"""Register shutdown method with manager."""
PlatformManager.register('shutdown', callable=lambda: service.shutdown())

23 changes: 21 additions & 2 deletions plugins/plugin_utils/manager/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
import time
import logging
import socket
from pathlib import Path
from typing import Optional, Tuple, TYPE_CHECKING
from dataclasses import dataclass
Expand Down Expand Up @@ -223,8 +224,10 @@ def wait_for_process_startup(

for attempt in range(max_wait):
if Path(socket_path).exists():
logger.info(f"Socket created successfully after {attempt * 0.1:.1f}s")
return
if ProcessManager.is_socket_responsive(socket_path):
logger.info(f"Socket ready and responsive")
return
logger.debug(f"Socket exists but not yet responsive (attempt {attempt})...")
time.sleep(0.1)
if attempt % 10 == 0 and attempt > 0: # Log every second
logger.debug(f"Still waiting for socket... ({attempt * 0.1:.1f}s elapsed)")
Expand All @@ -244,3 +247,19 @@ def wait_for_process_startup(
error_msg += f"\n\nManager process died (exitcode: {returncode})"

raise RuntimeError(error_msg)

@staticmethod
def is_socket_responsive(socket_path: str, timeout: float = 1.0) -> bool:
"""Check if the socket is accepting connections."""
if not os.path.exists(socket_path):
return False
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.settimeout(timeout)
try:
client.connect(socket_path)
client.close()
return True
except (socket.timeout, ConnectionRefusedError, OSError):
return False
finally:
client.close()
Loading
Loading