diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/Pipfile.lock b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/Pipfile.lock index e4532ee7ea..2a511f69e1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/Pipfile.lock +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/Pipfile.lock @@ -994,12 +994,12 @@ }, "urllib3": { "hashes": [ - "sha256:3fc47733c7e419d4bc3f6b3dc2b4f890bb743906a30d56ba4a5bfa4bbff92760", - "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc" + "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df", + "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d" ], "index": "pypi", "markers": "python_version >= '3.9'", - "version": "==2.5.0" + "version": "==2.3.0" }, "uvicorn": { "extras": [ diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/cli.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/cli.py index 6fde253cad..a91054d5ee 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/cli.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/cli.py @@ -7,6 +7,10 @@ from .models.utils import ExitCode from .commands.configure import configure_group +from .commands.submit import submit_command +from .commands.stop import stop_command +from .commands.approve import approve_command +from .commands.status import status_command logger = logging.getLogger(__name__) @@ -48,27 +52,14 @@ def util_group(ctx): @click.argument('shell', type=click.Choice(['bash', 'zsh', 'fish'])) @click.pass_context def completion(ctx, shell): - """Generate shell completion script and instructions for setup. + """Generate shell completion script for bash, zsh, or fish. - Supported shells: bash, zsh, fish + Example setup: + Bash: workflow completion bash > /etc/bash_completion.d/workflow + Zsh: workflow completion zsh > "${fpath[1]}/_workflow" + Fish: workflow completion fish > ~/.config/fish/completions/workflow.fish - To enable completion: - - Bash: - workflow completion bash > /etc/bash_completion.d/workflow - # Then restart your shell - - Zsh: - # If shell completion is not already enabled in your environment, - # you will need to enable it. You can execute the following once: - echo "autoload -U compinit; compinit" >> ~/.zshrc - - workflow completion zsh > "${fpath[1]}/_workflow" - # Then restart your shell - - Fish: - workflow completion fish > ~/.config/fish/completions/workflow.fish - # Then restart your shell + Restart your shell after installation. """ completion_class = get_completion_class(shell) if completion_class is None: @@ -88,6 +79,10 @@ def completion(ctx, shell): # Add command groups workflow_cli.add_command(configure_group) +workflow_cli.add_command(submit_command) +workflow_cli.add_command(stop_command) +workflow_cli.add_command(approve_command) +workflow_cli.add_command(status_command) workflow_cli.add_command(util_group) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/approve.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/approve.py new file mode 100644 index 0000000000..8987eb1984 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/approve.py @@ -0,0 +1,101 @@ +"""Approve command for workflow CLI - resumes suspended workflows in Argo Workflows.""" + +import logging +import os +import click + +from ..models.utils import ExitCode +from ..services.workflow_service import WorkflowService + +logger = logging.getLogger(__name__) + + +@click.command(name="approve") +@click.argument('workflow_name', required=False) +@click.option( + '--argo-server', + default=lambda: os.environ.get( + 'ARGO_SERVER', + f"http://{os.environ.get('ARGO_SERVER_SERVICE_HOST', 'localhost')}:" + f"{os.environ.get('ARGO_SERVER_SERVICE_PORT', '2746')}" + ), + help='Argo Server URL (default: auto-detected from Kubernetes service env vars, or ARGO_SERVER env var)' +) +@click.option( + '--namespace', + default='ma', + help='Kubernetes namespace for the workflow (default: ma)' +) +@click.option( + '--insecure', + is_flag=True, + default=False, + help='Skip TLS certificate verification' +) +@click.option( + '--token', + help='Bearer token for authentication' +) +@click.pass_context +def approve_command(ctx, workflow_name, argo_server, namespace, insecure, token): + """Approve/resume a suspended workflow in Argo Workflows. + + If workflow_name is not provided, auto-detects the single workflow + in the specified namespace. + + Example: + workflow approve + workflow approve my-workflow + workflow approve --argo-server https://10.105.13.185:2746 --insecure + """ + + try: + service = WorkflowService() + + # Auto-detect workflow if name not provided + if not workflow_name: + list_result = service.list_workflows( + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure, + phase_filter='Running' + ) + + if not list_result['success']: + click.echo(f"Error listing workflows: {list_result['error']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + if list_result['count'] == 0: + click.echo(f"Error: No workflows found in namespace {namespace}", err=True) + ctx.exit(ExitCode.FAILURE.value) + elif list_result['count'] > 1: + workflows_list = ', '.join(list_result['workflows']) + click.echo( + f"Error: Multiple workflows found. Please specify which one to approve.\n" + f"Found workflows: {workflows_list}", + err=True + ) + ctx.exit(ExitCode.FAILURE.value) + + workflow_name = list_result['workflows'][0] + click.echo(f"Auto-detected workflow: {workflow_name}") + + # Resume the workflow + result = service.approve_workflow( + workflow_name=workflow_name, + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure + ) + + if result['success']: + click.echo(f"Workflow {workflow_name} resumed successfully") + else: + click.echo(f"Error: {result['message']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + ctx.exit(ExitCode.FAILURE.value) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/status.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/status.py new file mode 100644 index 0000000000..641ef72db2 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/status.py @@ -0,0 +1,217 @@ +"""Status command for workflow CLI - shows detailed status of workflows.""" + +import logging +import os +import click + +from ..models.utils import ExitCode +from ..services.workflow_service import WorkflowService + +logger = logging.getLogger(__name__) + + +@click.command(name="status") +@click.argument('workflow_name', required=False) +@click.option( + '--argo-server', + default=f"http://{os.environ.get('ARGO_SERVER_SERVICE_HOST', 'localhost')}" + f":{os.environ.get('ARGO_SERVER_SERVICE_PORT', '2746')}", + help='Argo Server URL (default: auto-detected from Kubernetes service env vars, or ARGO_SERVER env var)' +) +@click.option( + '--namespace', + default='ma', + help='Kubernetes namespace for the workflow (default: ma)' +) +@click.option( + '--insecure', + is_flag=True, + default=False, + help='Skip TLS certificate verification' +) +@click.option( + '--token', + help='Bearer token for authentication' +) +@click.option( + '--all', + 'show_all', + is_flag=True, + default=False, + help='Show all workflows including completed ones (default: only running)' +) +@click.pass_context +def status_command(ctx, workflow_name, argo_server, namespace, insecure, token, show_all): + """Show detailed status of workflows. + + Displays workflow progress, completed steps, and approval status. + By default, only shows running workflows. Use --all to see completed workflows too. + + Example: + workflow status + workflow status my-workflow + workflow status --all + """ + + try: + service = WorkflowService() + + if workflow_name: + # Show detailed status for specific workflow + result = service.get_workflow_status( + workflow_name=workflow_name, + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure + ) + + if not result['success']: + click.echo(f"Error: {result['error']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + _display_workflow_status(result) + else: + # List all workflows with status + list_result = service.list_workflows( + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure, + exclude_completed=not show_all + ) + + if not list_result['success']: + click.echo(f"Error: {list_result['error']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + if list_result['count'] == 0: + if show_all: + click.echo(f"No workflows found in namespace {namespace}") + else: + click.echo(f"No running workflows found in namespace {namespace}") + click.echo("Use --all to see completed workflows") + return + + click.echo(f"Found {list_result['count']} workflow(s) in namespace {namespace}:") + click.echo("") + + # Sort workflows alphabetically by name + sorted_workflows = sorted(list_result['workflows']) + + for wf_name in sorted_workflows: + result = service.get_workflow_status( + workflow_name=wf_name, + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure + ) + + if result['success']: + _display_workflow_status(result) + + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + +def _get_phase_symbol(phase: str) -> str: + """Get symbol for workflow phase. + + Args: + phase: Workflow phase + + Returns: + Symbol character for the phase + """ + phase_symbols = { + 'Running': '*', + 'Succeeded': '+', + 'Failed': '-', + 'Error': '-', + 'Pending': '>', + 'Stopped': 'X', + } + return phase_symbols.get(phase, '?') + + +def _get_step_symbol(step_phase: str, step_type: str) -> str: + """Get symbol for workflow step. + + Args: + step_phase: Step phase + step_type: Step type + + Returns: + Symbol string for the step + """ + if step_phase == 'Succeeded': + return ' +' + elif step_phase == 'Running': + return ' |' if step_type == 'Suspend' else ' *' + elif step_phase in ('Failed', 'Error'): + return ' -' + elif step_phase == 'Pending': + return ' >' + else: + return ' ?' + + +def _display_workflow_header(name: str, phase: str, progress: str, started_at: str, finished_at: str): + """Display workflow header information. + + Args: + name: Workflow name + phase: Workflow phase + progress: Workflow progress + started_at: Start timestamp + finished_at: Finish timestamp + """ + phase_symbol = _get_phase_symbol(phase) + click.echo(f"[{phase_symbol}] Workflow: {name}") + click.echo(f" Phase: {phase}") + click.echo(f" Progress: {progress}") + if started_at: + click.echo(f" Started: {started_at}") + if finished_at: + click.echo(f" Finished: {finished_at}") + + +def _display_workflow_steps(steps: list): + """Display workflow steps. + + Args: + steps: List of step dictionaries + """ + if not steps: + return + + click.echo("\n Steps:") + for step in steps: + step_name = step['name'] + step_phase = step['phase'] + step_type = step['type'] + + symbol = _get_step_symbol(step_phase, step_type) + + if step_type == 'Suspend' and step_phase == 'Running': + click.echo(f"{symbol} {step_name} - WAITING FOR APPROVAL") + else: + click.echo(f"{symbol} {step_name} ({step_phase})") + + +def _display_workflow_status(result: dict): + """Display formatted workflow status. + + Args: + result: WorkflowStatusResult dict from get_workflow_status + """ + name = result['workflow_name'] + phase = result['phase'] + progress = result['progress'] + started_at = result['started_at'] + finished_at = result['finished_at'] + + _display_workflow_header(name, phase, progress, started_at, finished_at) + _display_workflow_steps(result['steps']) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/stop.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/stop.py new file mode 100644 index 0000000000..b3160bbc87 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/stop.py @@ -0,0 +1,98 @@ +"""Stop command for workflow CLI - stops running workflows in Argo Workflows.""" + +import logging +import os +import click + +from ..models.utils import ExitCode +from ..services.workflow_service import WorkflowService + +logger = logging.getLogger(__name__) + + +@click.command(name="stop") +@click.argument('workflow_name', required=False) +@click.option( + '--argo-server', + default=f"http://{os.environ.get('ARGO_SERVER_SERVICE_HOST', 'localhost')}" + f":{os.environ.get('ARGO_SERVER_SERVICE_PORT', '2746')}", + help='Argo Server URL (default: auto-detected from Kubernetes service env vars, or ARGO_SERVER env var)' +) +@click.option( + '--namespace', + default='ma', + help='Kubernetes namespace for the workflow (default: ma)' +) +@click.option( + '--insecure', + is_flag=True, + default=False, + help='Skip TLS certificate verification' +) +@click.option( + '--token', + help='Bearer token for authentication' +) +@click.pass_context +def stop_command(ctx, workflow_name, argo_server, namespace, insecure, token): + """Stop a running workflow in Argo Workflows. + + If workflow_name is not provided, auto-detects the single workflow + in the specified namespace. + + Example: + workflow stop + workflow stop my-workflow + workflow stop --argo-server https://10.105.13.185:2746 --insecure + """ + + try: + service = WorkflowService() + + # Auto-detect workflow if name not provided + if not workflow_name: + list_result = service.list_workflows( + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure, + exclude_completed=True + ) + + if not list_result['success']: + click.echo(f"Error listing workflows: {list_result['error']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + if list_result['count'] == 0: + click.echo(f"Error: No workflows found in namespace {namespace}", err=True) + ctx.exit(ExitCode.FAILURE.value) + elif list_result['count'] > 1: + workflows_list = ', '.join(list_result['workflows']) + click.echo( + f"Error: Multiple workflows found. Please specify which one to stop.\n" + f"Found workflows: {workflows_list}", + err=True + ) + ctx.exit(ExitCode.FAILURE.value) + + workflow_name = list_result['workflows'][0] + click.echo(f"Auto-detected workflow: {workflow_name}") + + # Stop the workflow + result = service.stop_workflow( + workflow_name=workflow_name, + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure + ) + + if result['success']: + click.echo(f"Workflow {workflow_name} stopped successfully") + else: + click.echo(f"Error: {result['message']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + ctx.exit(ExitCode.FAILURE.value) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/submit.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/submit.py new file mode 100644 index 0000000000..5fcc9db163 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/submit.py @@ -0,0 +1,196 @@ +"""Submit command for workflow CLI - submits workflows to Argo Workflows.""" + +import logging +import os +import click + +from ..models.utils import ExitCode +from ..models.store import WorkflowConfigStore +from ..services.workflow_service import WorkflowService + +logger = logging.getLogger(__name__) + +# Terminal workflow phases +ENDING_PHASES = ["Succeeded", "Failed", "Error", "Stopped", "Terminated"] + + +@click.command(name="submit") +@click.option( + '--argo-server', + default=f"http://{os.environ.get('ARGO_SERVER_SERVICE_HOST', 'localhost')}" + f":{os.environ.get('ARGO_SERVER_SERVICE_PORT', '2746')}", + help='Argo Server URL (default: auto-detected from Kubernetes service env vars, or ARGO_SERVER env var)' +) +@click.option( + '--namespace', + default='ma', + help='Kubernetes namespace for the workflow (default: ma)' +) +@click.option( + '--name', + help='Workflow name (will use generateName if not provided)' +) +@click.option( + '--insecure', + is_flag=True, + default=False, + help='Skip TLS certificate verification' +) +@click.option( + '--token', + help='Bearer token for authentication' +) +@click.option( + '--wait', + is_flag=True, + default=False, + help='Wait for workflow completion (default: return immediately after submission)' +) +@click.option( + '--timeout', + default=120, + type=int, + help='Timeout in seconds to wait for workflow completion (only used with --wait, default: 120)' +) +@click.option( + '--wait-interval', + default=2, + type=int, + help='Interval in seconds between status checks (only used with --wait, default: 2)' +) +@click.option( + '--session', + default='default', + help='Configuration session name to load parameters from (default: default)' +) +def _load_and_inject_config(service: WorkflowService, workflow_spec: dict, namespace: str, session: str) -> dict: + """Load configuration and inject parameters into workflow spec. + + Args: + service: WorkflowService instance + workflow_spec: Workflow specification + namespace: Kubernetes namespace + session: Session name + + Returns: + Updated workflow spec with injected parameters + """ + try: + store = WorkflowConfigStore(namespace=namespace) + config = store.load_config(session_name=session) + + if config: + click.echo(f"Injecting parameters from session: {session}") + return service.inject_parameters(workflow_spec, config) + else: + logger.debug(f"No configuration found for session: {session}") + return workflow_spec + except Exception as e: + logger.warning(f"Could not load workflow config: {e}") + return workflow_spec + + +def _handle_workflow_wait(service: WorkflowService, namespace: str, workflow_name: str, timeout: int, wait_interval: int): + """Handle waiting for workflow completion. + + Args: + service: WorkflowService instance + namespace: Kubernetes namespace + workflow_name: Name of workflow + timeout: Timeout in seconds + wait_interval: Interval between checks + """ + click.echo(f"\nWaiting for workflow to complete (timeout: {timeout}s)...") + + try: + phase, output_message = service.wait_for_workflow_completion( + namespace=namespace, + workflow_name=workflow_name, + timeout=timeout, + interval=wait_interval + ) + + click.echo(f"\nWorkflow completed with phase: {phase}") + + if output_message: + click.echo(f"Container output: {output_message}") + + except TimeoutError as e: + click.echo(f"\n{str(e)}", err=True) + click.echo(f"Workflow {workflow_name} is still running", err=True) + except Exception as e: + click.echo(f"\nError monitoring workflow: {str(e)}", err=True) + + +@click.pass_context +def submit_command(ctx, argo_server, namespace, name, insecure, token, wait, timeout, wait_interval, session): + """Submit a workflow to Argo Workflows. + + This command submits a workflow to Argo Workflows using the Argo Server REST API. + By default, it returns immediately after submission without waiting for completion. + Use the --wait flag to wait for the workflow to complete. + + The default workflow includes a suspend step that requires manual approval via + 'workflow approve' before completion. + + To use a custom workflow template, set the WORKFLOW_TEMPLATE_PATH environment + variable to point to a workflow YAML file. Parameters can be injected from a + WorkflowConfig stored in a Kubernetes ConfigMap (use 'workflow configure' to set). + + Example: + workflow submit + workflow submit --wait + workflow submit --wait --timeout 300 + WORKFLOW_TEMPLATE_PATH=/path/to/workflow.yaml workflow submit + """ + try: + service = WorkflowService() + template_result = service.load_workflow_template() + + if template_result['error']: + click.echo(f"Warning: {template_result['error']}", err=True) + click.echo("Using default workflow instead", err=True) + + click.echo(f"Using workflow template from: {template_result['source']}") + workflow_spec = template_result['workflow_spec'] + + # Load configuration and inject parameters + workflow_spec = _load_and_inject_config(service, workflow_spec, namespace, session) + + # Add name or generateName if provided + if name: + workflow_spec['metadata']['name'] = name + elif 'name' not in workflow_spec['metadata'] and 'generateName' not in workflow_spec['metadata']: + workflow_spec['metadata']['generateName'] = 'workflow-' + + # Submit the workflow + submit_result = service.submit_workflow_to_argo( + workflow_spec=workflow_spec, + namespace=namespace, + argo_server=argo_server, + token=token, + insecure=insecure + ) + + if not submit_result['success']: + click.echo(f"Error: {submit_result['error']}", err=True) + ctx.exit(ExitCode.FAILURE.value) + + workflow_name = submit_result['workflow_name'] + workflow_uid = submit_result['workflow_uid'] + + click.echo("Workflow submitted successfully") + click.echo(f" Name: {workflow_name}") + click.echo(f" UID: {workflow_uid}") + click.echo(f" Namespace: {namespace}") + + logger.info(f"Workflow {workflow_name} submitted successfully") + + # Wait for workflow completion if requested + if wait: + _handle_workflow_wait(service, namespace, workflow_name, timeout, wait_interval) + + except Exception as e: + logger.exception(f"Unexpected error submitting workflow: {e}") + click.echo(f"Error: {str(e)}", err=True) + ctx.exit(ExitCode.FAILURE.value) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/__init__.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/suspend_workflow_template.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/suspend_workflow_template.py new file mode 100644 index 0000000000..39af0a43ef --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/suspend_workflow_template.py @@ -0,0 +1,67 @@ +"""Template for a workflow with suspend node for testing approve functionality.""" + +from datetime import datetime + + +def get_suspend_workflow_spec(): + """Get a workflow specification with a suspend node. + + This workflow template includes a suspend node that requires manual approval + to continue. It's useful for testing the approve/resume functionality. + + Returns: + Dict containing the workflow spec with suspend node + """ + timestamp = datetime.now().isoformat() + + return { + "metadata": { + "generateName": "suspend-workflow-", + "labels": { + "workflows.argoproj.io/completed": "false" + } + }, + "spec": { + "templates": [ + { + "name": "main", + "steps": [ + [ + { + "name": "step1", + "template": "whalesay" + } + ], + [ + { + "name": "approve", + "template": "approve-gate" + } + ], + [ + { + "name": "step2", + "template": "whalesay" + } + ] + ] + }, + { + "name": "whalesay", + "container": { + "name": "", + "image": "busybox", + "command": ["sh", "-c"], + "args": [f'echo "Workflow step at {timestamp}"'], + "resources": {} + } + }, + { + "name": "approve-gate", + "suspend": {} + } + ], + "entrypoint": "main", + "arguments": {} + } + } diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/workflow_service.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/workflow_service.py new file mode 100644 index 0000000000..8bab68bd4a --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/services/workflow_service.py @@ -0,0 +1,980 @@ +"""Service layer for workflow operations. + +This module provides a service layer that encapsulates all workflow business logic, +making it reusable by CLI commands, REST APIs, or other interfaces. +""" + +import logging +import os +import time +import copy +from typing import Dict, Any, Optional, Tuple, TypedDict, List +from datetime import datetime + +import yaml +import requests +from kubernetes import client + +from ..models.config import WorkflowConfig + +logger = logging.getLogger(__name__) + +# Terminal workflow phases +ENDING_PHASES = ["Succeeded", "Failed", "Error", "Stopped", "Terminated"] + +# Constants +FALLBACK_MESSAGE = "Falling back to default hello-world workflow" +CONTENT_TYPE_JSON = "application/json" + + +class WorkflowTemplateResult(TypedDict): + """Result of template loading operation.""" + success: bool + workflow_spec: Dict[str, Any] + source: str + error: Optional[str] + + +class WorkflowSubmitResult(TypedDict): + """Result of workflow submission operation.""" + success: bool + workflow_name: str + workflow_uid: str + namespace: str + phase: Optional[str] + output_message: Optional[str] + error: Optional[str] + + +class WorkflowStopResult(TypedDict): + """Result of workflow stop operation.""" + success: bool + workflow_name: str + namespace: str + message: str + error: Optional[str] + + +class WorkflowApproveResult(TypedDict): + """Result of workflow approve/resume operation.""" + success: bool + workflow_name: str + namespace: str + message: str + error: Optional[str] + + +class WorkflowListResult(TypedDict): + """Result of workflow list operation.""" + success: bool + workflows: List[str] + count: int + error: Optional[str] + + +class WorkflowStatusResult(TypedDict): + """Result of workflow status operation.""" + success: bool + workflow_name: str + namespace: str + phase: str + progress: str + started_at: Optional[str] + finished_at: Optional[str] + steps: List[Dict[str, str]] + error: Optional[str] + + +class WorkflowService: + """Service class for workflow operations. + + This class encapsulates all workflow business logic including: + - Template loading from file system or environment + - Parameter injection from configuration + - Workflow submission to Argo + - Workflow status monitoring + + The service is stateless (except for caching the default template) and + can be reused by any interface (CLI, API, scripts). + """ + + def __init__(self): + """Initialize the WorkflowService.""" + self._default_workflow: Optional[Dict[str, Any]] = None + + def get_default_workflow_spec(self) -> Dict[str, Any]: + """Get the embedded hello-world workflow specification with approval gate. + + Returns: + Dict containing the default workflow spec with suspend node + """ + if self._default_workflow is None: + # Generate unique message for verification + timestamp = datetime.now().isoformat() + + self._default_workflow = { + "metadata": { + "generateName": "hello-world-", + "labels": { + "workflows.argoproj.io/completed": "false" + } + }, + "spec": { + "serviceAccountName": "argo-workflow-executor", + "templates": [ + { + "name": "main", + "steps": [ + [ + { + "name": "step1", + "template": "hello-step" + } + ], + [ + { + "name": "approval-gate", + "template": "approve" + } + ], + [ + { + "name": "step2", + "template": "goodbye-step" + } + ] + ] + }, + { + "name": "hello-step", + "container": { + "name": "", + "image": "busybox", + "command": ["sh", "-c"], + "args": [ + f'echo "Step 1: Hello from workflow at {timestamp}"' + ], + "resources": {} + } + }, + { + "name": "approve", + "suspend": {} + }, + { + "name": "goodbye-step", + "outputs": { + "parameters": [ + { + "name": "message", + "valueFrom": { + "path": "/tmp/message.txt" + } + } + ] + }, + "container": { + "name": "", + "image": "busybox", + "command": ["sh", "-c"], + "args": [ + f'echo "Step 2: Goodbye from workflow at {timestamp}" | tee /tmp/message.txt' + ], + "resources": {} + } + } + ], + "entrypoint": "main", + "arguments": {} + } + } + + return copy.deepcopy(self._default_workflow) + + def load_workflow_template( + self, + template_path: Optional[str] = None + ) -> WorkflowTemplateResult: + """Load workflow template from file path or use default. + + Args: + template_path: Optional file path to workflow YAML + + Returns: + WorkflowTemplateResult dict with success status, workflow_spec, source, and error + """ + # Determine the template path + path = template_path or os.environ.get('WORKFLOW_TEMPLATE_PATH') + + if not path: + # No template path specified, use default + logger.info("No WORKFLOW_TEMPLATE_PATH set, using default hello-world workflow") + return WorkflowTemplateResult( + success=True, + workflow_spec=self.get_default_workflow_spec(), + source='embedded', + error=None + ) + + # Try to load from file + try: + logger.info(f"Loading workflow template from: {path}") + + with open(path, 'r') as f: + template_data = yaml.safe_load(f) + + if not template_data: + raise ValueError("Template file is empty") + + # Basic validation - check for required workflow structure + if 'spec' not in template_data: + raise ValueError("Template must contain 'spec' section") + + logger.info(f"Successfully loaded workflow template from: {path}") + return WorkflowTemplateResult( + success=True, + workflow_spec=template_data, + source=path, + error=None + ) + + except FileNotFoundError: + error_msg = f"Template file not found: {path}" + logger.error(error_msg) + logger.info(FALLBACK_MESSAGE) + return WorkflowTemplateResult( + success=False, + workflow_spec=self.get_default_workflow_spec(), + source='embedded', + error=error_msg + ) + + except yaml.YAMLError as e: + error_msg = f"Invalid YAML in template file {path}: {e}" + logger.error(error_msg) + logger.info(FALLBACK_MESSAGE) + return WorkflowTemplateResult( + success=False, + workflow_spec=self.get_default_workflow_spec(), + source='embedded', + error=error_msg + ) + + except Exception as e: + error_msg = f"Error loading template from {path}: {e}" + logger.error(error_msg) + logger.info(FALLBACK_MESSAGE) + return WorkflowTemplateResult( + success=False, + workflow_spec=self.get_default_workflow_spec(), + source='embedded', + error=error_msg + ) + + def inject_parameters( + self, + workflow_spec: Dict[str, Any], + config: Optional[WorkflowConfig] + ) -> Dict[str, Any]: + """Inject parameters from WorkflowConfig into workflow specification. + + Args: + workflow_spec: The workflow specification dict + config: WorkflowConfig containing parameters to inject + + Returns: + Modified workflow_spec with parameters injected + """ + # Create deep copy to avoid mutating the input + workflow = copy.deepcopy(workflow_spec) + + # If no config or config is empty, return workflow as-is + if not config or not config.data: + logger.debug("No config provided or config is empty, no parameters to inject") + return workflow + + # Get parameters from config + config_params = config.data.get('parameters', {}) + + if not config_params: + logger.debug("No parameters found in config") + return workflow + + logger.info(f"Injecting {len(config_params)} parameters from config") + + # Ensure spec.arguments.parameters exists + if 'spec' not in workflow: + workflow['spec'] = {} + if 'arguments' not in workflow['spec']: + workflow['spec']['arguments'] = {} + if 'parameters' not in workflow['spec']['arguments']: + workflow['spec']['arguments']['parameters'] = [] + + params_list = workflow['spec']['arguments']['parameters'] + + # Inject each parameter from config + for param_name, param_value in config_params.items(): + # Check if parameter already exists in workflow + existing_param = None + for param in params_list: + if param.get('name') == param_name: + existing_param = param + break + + if existing_param: + # Update existing parameter + existing_param['value'] = param_value + logger.debug(f"Updated existing parameter: {param_name}") + else: + # Add new parameter + params_list.append({ + 'name': param_name, + 'value': param_value + }) + logger.debug(f"Added new parameter: {param_name}") + + return workflow + + def submit_workflow_to_argo( + self, + workflow_spec: Dict[str, Any], + namespace: str, + argo_server: str, + token: Optional[str] = None, + insecure: bool = False + ) -> WorkflowSubmitResult: + """Submit workflow to Argo Workflows via REST API. + + Args: + workflow_spec: Complete workflow specification + namespace: Kubernetes namespace + argo_server: Argo Server URL + token: Bearer token for authentication + insecure: Whether to skip TLS verification + + Returns: + WorkflowSubmitResult dict with success status, workflow_name, workflow_uid, and error + """ + try: + # Prepare the request body + request_body = { + "namespace": namespace, + "serverDryRun": False, + "workflow": workflow_spec + } + + # Ensure namespace is set in workflow metadata + if 'metadata' not in workflow_spec: + request_body['workflow']['metadata'] = {} + request_body['workflow']['metadata']['namespace'] = namespace + + # Prepare headers + headers = { + "Content-Type": CONTENT_TYPE_JSON + } + + if token: + headers["Authorization"] = f"Bearer {token}" + + # Submit the workflow + url = f"{argo_server}/api/v1/workflows/{namespace}" + + logger.info(f"Submitting workflow to {url}") + logger.debug(f"Workflow spec: {workflow_spec}") + + response = requests.post( + url, + json=request_body, + headers=headers, + verify=not insecure + ) + + response.raise_for_status() + + # Parse response + result = response.json() + workflow_name = result.get("metadata", {}).get("name", "unknown") + workflow_uid = result.get("metadata", {}).get("uid", "unknown") + + logger.info(f"Workflow {workflow_name} submitted successfully") + + return WorkflowSubmitResult( + success=True, + workflow_name=workflow_name, + workflow_uid=workflow_uid, + namespace=namespace, + phase=None, + output_message=None, + error=None + ) + + except requests.exceptions.RequestException as e: + error_msg = f"Failed to submit workflow: {e}" + logger.error(error_msg) + + if hasattr(e, 'response') and e.response is not None: + try: + error_detail = e.response.json() + error_msg = f"Failed to submit workflow: {error_detail}" + except Exception: + error_msg = f"Failed to submit workflow: {e.response.text}" + + return WorkflowSubmitResult( + success=False, + workflow_name="", + workflow_uid="", + namespace=namespace, + phase=None, + output_message=None, + error=error_msg + ) + + except Exception as e: + error_msg = f"Unexpected error submitting workflow: {e}" + logger.exception(error_msg) + + return WorkflowSubmitResult( + success=False, + workflow_name="", + workflow_uid="", + namespace=namespace, + phase=None, + output_message=None, + error=error_msg + ) + + def list_workflows( + self, + namespace: str, + argo_server: str, + token: Optional[str] = None, + insecure: bool = False, + exclude_completed: bool = False, + phase_filter: Optional[str] = None + ) -> WorkflowListResult: + """List workflows in a namespace via Argo Workflows REST API. + + Args: + namespace: Kubernetes namespace + argo_server: Argo Server URL + token: Bearer token for authentication + insecure: Whether to skip TLS verification + exclude_completed: Only return running workflows + phase_filter: Filter by specific phase + + Returns: + WorkflowListResult dict with success status, workflows list, count, and error + """ + try: + headers = self._prepare_headers(token) + url = f"{argo_server}/api/v1/workflows/{namespace}" + + logger.info(f"Listing workflows in namespace {namespace} (exclude_completed={exclude_completed})") + logger.debug(f"List request URL: {url}") + + response = requests.get(url, headers=headers, verify=not insecure) + + if response.status_code == 200: + result = response.json() + items = result.get("items", []) + workflow_names = self._filter_workflows(items, exclude_completed, phase_filter) + + logger.info(f"Found {len(workflow_names)} workflows in namespace {namespace}") + return WorkflowListResult( + success=True, + workflows=workflow_names, + count=len(workflow_names), + error=None + ) + else: + error_msg = self._format_error_message("list workflows", response) + logger.error(error_msg) + return WorkflowListResult( + success=False, + workflows=[], + count=0, + error=error_msg + ) + + except requests.exceptions.RequestException as e: + error_msg = f"Network error listing workflows: {e}" + logger.error(error_msg) + return WorkflowListResult( + success=False, + workflows=[], + count=0, + error=str(e) + ) + + except Exception as e: + error_msg = f"Unexpected error listing workflows: {e}" + logger.exception(error_msg) + return WorkflowListResult( + success=False, + workflows=[], + count=0, + error=str(e) + ) + + def get_workflow_status( + self, + workflow_name: str, + namespace: str, + argo_server: str, + token: Optional[str] = None, + insecure: bool = False + ) -> WorkflowStatusResult: + """Get detailed status of a specific workflow. + + Args: + workflow_name: Name of the workflow + namespace: Kubernetes namespace + argo_server: Argo Server URL + token: Optional bearer token for authentication + insecure: Whether to skip TLS verification + + Returns: + WorkflowStatusResult with detailed status information + """ + try: + headers = self._prepare_headers(token) + url = f"{argo_server}/api/v1/workflows/{namespace}/{workflow_name}" + + logger.info(f"Getting status for workflow {workflow_name}") + + response = requests.get(url, headers=headers, verify=not insecure) + + if response.status_code != 200: + error_msg = f"Failed to get workflow status: HTTP {response.status_code}" + return self._create_error_status_result(workflow_name, namespace, error_msg) + + workflow = response.json() + status = workflow.get("status", {}) + + phase = status.get("phase", "Unknown") + progress = status.get("progress", "0/0") + started_at = status.get("startedAt") + finished_at = status.get("finishedAt") + + steps = self._extract_workflow_steps(status.get("nodes", {})) + + return WorkflowStatusResult( + success=True, + workflow_name=workflow_name, + namespace=namespace, + phase=phase, + progress=progress, + started_at=started_at, + finished_at=finished_at, + steps=steps, + error=None + ) + + except Exception as e: + error_msg = f"Error getting workflow status: {e}" + logger.error(error_msg) + return self._create_error_status_result(workflow_name, namespace, error_msg) + + def stop_workflow( + self, + workflow_name: str, + namespace: str, + argo_server: str, + token: Optional[str] = None, + insecure: bool = False + ) -> WorkflowStopResult: + """Stop a running workflow via Argo Workflows REST API. + + Args: + workflow_name: Name of the workflow to stop + namespace: Kubernetes namespace + argo_server: Argo Server URL + token: Bearer token for authentication + insecure: Whether to skip TLS verification + + Returns: + WorkflowStopResult dict with success status, message, and error + """ + try: + headers = self._prepare_headers(token) + + # Construct URL for stop endpoint + url = f"{argo_server}/api/v1/workflows/{namespace}/{workflow_name}/stop" + + logger.info(f"Stopping workflow {workflow_name} in namespace {namespace}") + logger.debug(f"Stop request URL: {url}") + + # Make PUT request to stop the workflow + response = requests.put( + url, + headers=headers, + verify=not insecure + ) + + # Handle response + if response.status_code == 200: + logger.info(f"Workflow {workflow_name} stopped successfully") + return WorkflowStopResult( + success=True, + workflow_name=workflow_name, + namespace=namespace, + message=f"Workflow {workflow_name} stopped successfully", + error=None + ) + elif response.status_code == 404: + error_msg = f"Workflow {workflow_name} not found in namespace {namespace}" + logger.error(error_msg) + return WorkflowStopResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=error_msg + ) + else: + error_msg = f"Failed to stop workflow: HTTP {response.status_code}" + try: + error_detail = response.json() + error_msg = f"Failed to stop workflow: {error_detail}" + except Exception: + error_msg = f"Failed to stop workflow: {response.text}" + + logger.error(error_msg) + return WorkflowStopResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=error_msg + ) + + except requests.exceptions.RequestException as e: + error_msg = f"Network error stopping workflow: {e}" + logger.error(error_msg) + + return WorkflowStopResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=str(e) + ) + + except Exception as e: + error_msg = f"Unexpected error stopping workflow: {e}" + logger.exception(error_msg) + + return WorkflowStopResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=str(e) + ) + + def approve_workflow( + self, + workflow_name: str, + namespace: str, + argo_server: str, + token: Optional[str] = None, + insecure: bool = False + ) -> WorkflowApproveResult: + """Approve/resume a suspended workflow via Argo Workflows REST API. + + Args: + workflow_name: Name of the workflow to approve + namespace: Kubernetes namespace + argo_server: Argo Server URL + token: Bearer token for authentication + insecure: Whether to skip TLS verification + + Returns: + WorkflowApproveResult dict with success status, message, and error + """ + try: + headers = self._prepare_headers(token) + + # Construct URL for resume endpoint + url = f"{argo_server}/api/v1/workflows/{namespace}/{workflow_name}/resume" + + logger.info(f"Resuming workflow {workflow_name} in namespace {namespace}") + logger.debug(f"Resume request URL: {url}") + + # Make PUT request to resume the workflow + response = requests.put( + url, + headers=headers, + verify=not insecure + ) + + # Handle response + if response.status_code == 200: + logger.info(f"Workflow {workflow_name} resumed successfully") + return WorkflowApproveResult( + success=True, + workflow_name=workflow_name, + namespace=namespace, + message=f"Workflow {workflow_name} resumed successfully", + error=None + ) + elif response.status_code == 404: + error_msg = f"Workflow {workflow_name} not found in namespace {namespace}" + logger.error(error_msg) + return WorkflowApproveResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=error_msg + ) + else: + error_msg = f"Failed to resume workflow: HTTP {response.status_code}" + try: + error_detail = response.json() + error_msg = f"Failed to resume workflow: {error_detail}" + except Exception: + error_msg = f"Failed to resume workflow: {response.text}" + + logger.error(error_msg) + return WorkflowApproveResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=error_msg + ) + + except requests.exceptions.RequestException as e: + error_msg = f"Network error resuming workflow: {e}" + logger.error(error_msg) + + return WorkflowApproveResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=str(e) + ) + + except Exception as e: + error_msg = f"Unexpected error resuming workflow: {e}" + logger.exception(error_msg) + + return WorkflowApproveResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + message=error_msg, + error=str(e) + ) + + def _prepare_headers(self, token: Optional[str] = None) -> Dict[str, str]: + """Prepare HTTP headers for API requests. + + Args: + token: Optional bearer token for authentication + + Returns: + Dictionary of HTTP headers + """ + headers = {"Content-Type": CONTENT_TYPE_JSON} + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + def _format_error_message(self, operation: str, response) -> str: + """Format error message from HTTP response. + + Args: + operation: Description of the operation that failed + response: HTTP response object + + Returns: + Formatted error message + """ + error_msg = f"Failed to {operation}: HTTP {response.status_code}" + try: + error_detail = response.json() + error_msg = f"Failed to {operation}: {error_detail}" + except Exception: + error_msg = f"Failed to {operation}: {response.text}" + return error_msg + + def _filter_workflows( + self, + items: List[Dict[str, Any]], + exclude_completed: bool, + phase_filter: Optional[str] + ) -> List[str]: + """Filter workflow items based on criteria. + + Args: + items: List of workflow items from API + exclude_completed: Whether to exclude completed workflows + phase_filter: Optional phase to filter by + + Returns: + List of workflow names matching criteria + """ + workflow_names = [] + for item in items: + name = item.get("metadata", {}).get("name", "") + if not name: + continue + + phase = item.get("status", {}).get("phase", "Unknown") + + if exclude_completed and phase in ENDING_PHASES: + continue + + if phase_filter == 'Running': + if phase != 'Running': + continue + if not self._has_active_suspend(item): + continue + elif phase_filter and phase != phase_filter: + continue + + workflow_names.append(name) + return workflow_names + + def _has_active_suspend(self, workflow_item: Dict[str, Any]) -> bool: + """Check if workflow has an active suspend node. + + Args: + workflow_item: Workflow item from API + + Returns: + True if workflow has active suspend node + """ + nodes = workflow_item.get("status", {}).get("nodes", {}) + for node in nodes.values(): + if node.get("type") == "Suspend" and node.get("phase") == "Running": + return True + return False + + def _extract_workflow_steps(self, nodes: Dict[str, Any]) -> List[Dict[str, str]]: + """Extract step information from workflow nodes. + + Args: + nodes: Dictionary of workflow nodes + + Returns: + List of step dictionaries with name, phase, type, and started_at + """ + steps = [] + for node in nodes.values(): + node_type = node.get("type", "") + if node_type in ["Pod", "Suspend"]: + steps.append({ + "name": node.get("displayName", ""), + "phase": node.get("phase", "Unknown"), + "type": node_type, + "started_at": node.get("startedAt", "") + }) + + # Sort chronologically by start time + steps.sort(key=lambda x: x.get("started_at", "9999-99-99")) + return steps + + def _create_error_status_result( + self, + workflow_name: str, + namespace: str, + error_msg: str + ) -> WorkflowStatusResult: + """Create an error status result. + + Args: + workflow_name: Name of the workflow + namespace: Kubernetes namespace + error_msg: Error message + + Returns: + WorkflowStatusResult with error information + """ + return WorkflowStatusResult( + success=False, + workflow_name=workflow_name, + namespace=namespace, + phase="Unknown", + progress="0/0", + started_at=None, + finished_at=None, + steps=[], + error=error_msg + ) + + def wait_for_workflow_completion( + self, + namespace: str, + workflow_name: str, + timeout: int = 120, + interval: int = 2 + ) -> Tuple[str, Optional[str]]: + """Wait for workflow to reach terminal state and retrieve output. + + Args: + namespace: Kubernetes namespace + workflow_name: Name of workflow to monitor + timeout: Maximum seconds to wait + interval: Seconds between status checks + + Returns: + Tuple of (phase, output_message) + + Raises: + TimeoutError: If timeout is exceeded + """ + start_time = time.time() + custom_api = client.CustomObjectsApi() + + while time.time() - start_time < timeout: + try: + # Get workflow status + workflow = custom_api.get_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + namespace=namespace, + plural="workflows", + name=workflow_name + ) + + phase = workflow.get("status", {}).get("phase", "Unknown") + elapsed = int(time.time() - start_time) + + logger.debug(f"[{elapsed}s] Workflow {workflow_name} phase: {phase}") + + # Check if workflow reached a terminal state + if phase in ENDING_PHASES: + # Extract output parameter + output_message = None + nodes = workflow.get("status", {}).get("nodes", {}) + + for node_id, node in nodes.items(): + outputs = node.get("outputs", {}) + parameters = outputs.get("parameters", []) + + for param in parameters: + if param.get("name") == "message": + output_message = param.get("value", "").strip() + break + + if output_message: + break + + logger.info(f"Workflow {workflow_name} completed with phase: {phase}") + return phase, output_message + + # Wait before next check + time.sleep(interval) + + except Exception as e: + logger.error(f"Error checking workflow status: {e}") + raise + + # Timeout reached + error_msg = f"Workflow {workflow_name} did not complete within {timeout} seconds" + logger.error(error_msg) + raise TimeoutError(error_msg) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_cli_commands.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_cli_commands.py new file mode 100644 index 0000000000..02c324da4f --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_cli_commands.py @@ -0,0 +1,276 @@ +"""Integration tests for workflow CLI commands.""" + +from click.testing import CliRunner +from unittest.mock import Mock, patch + +from console_link.workflow.cli import workflow_cli +from console_link.workflow.models.config import WorkflowConfig + + +class TestWorkflowCLICommands: + """Test suite for workflow CLI command integration.""" + + @patch('console_link.workflow.commands.submit.WorkflowService') + @patch('console_link.workflow.commands.submit.WorkflowConfigStore') + def test_submit_command_basic(self, mock_store_class, mock_service_class): + """Test basic submit command execution.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + mock_service.load_workflow_template.return_value = { + 'success': True, + 'workflow_spec': {'metadata': {'generateName': 'test-'}, 'spec': {}}, + 'source': 'embedded', + 'error': None + } + + mock_service.submit_workflow_to_argo.return_value = { + 'success': True, + 'workflow_name': 'test-workflow-abc', + 'workflow_uid': 'uid-123', + 'namespace': 'ma', + 'phase': None, + 'output_message': None, + 'error': None + } + + # Mock the store + mock_store = Mock() + mock_store_class.return_value = mock_store + mock_store.load_config.return_value = None + + result = runner.invoke(workflow_cli, ['submit']) + + assert result.exit_code == 0 + assert 'submitted successfully' in result.output + assert 'test-workflow-abc' in result.output + + @patch('console_link.workflow.commands.submit.WorkflowService') + @patch('console_link.workflow.commands.submit.WorkflowConfigStore') + def test_submit_command_with_wait(self, mock_store_class, mock_service_class): + """Test submit command with --wait flag.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + mock_service.load_workflow_template.return_value = { + 'success': True, + 'workflow_spec': {'metadata': {'generateName': 'test-'}, 'spec': {}}, + 'source': 'embedded', + 'error': None + } + + mock_service.submit_workflow_to_argo.return_value = { + 'success': True, + 'workflow_name': 'test-workflow-abc', + 'workflow_uid': 'uid-123', + 'namespace': 'ma', + 'phase': None, + 'output_message': None, + 'error': None + } + + mock_service.wait_for_workflow_completion.return_value = ('Succeeded', 'Hello World') + + # Mock the store + mock_store = Mock() + mock_store_class.return_value = mock_store + mock_store.load_config.return_value = None + + result = runner.invoke(workflow_cli, ['submit', '--wait', '--timeout', '60']) + + assert result.exit_code == 0 + assert 'submitted successfully' in result.output + assert 'Waiting for workflow to complete' in result.output + assert 'Succeeded' in result.output + + @patch('console_link.workflow.commands.status.WorkflowService') + def test_status_command_single_workflow(self, mock_service_class): + """Test status command for a specific workflow.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + mock_service.get_workflow_status.return_value = { + 'success': True, + 'workflow_name': 'test-workflow', + 'namespace': 'ma', + 'phase': 'Running', + 'progress': '1/2', + 'started_at': '2024-01-01T10:00:00Z', + 'finished_at': None, + 'steps': [ + {'name': 'step1', 'phase': 'Succeeded', 'type': 'Pod', 'started_at': '2024-01-01T10:00:00Z'}, + {'name': 'step2', 'phase': 'Running', 'type': 'Pod', 'started_at': '2024-01-01T10:01:00Z'} + ], + 'error': None + } + + result = runner.invoke(workflow_cli, ['status', 'test-workflow']) + + assert result.exit_code == 0 + assert 'test-workflow' in result.output + assert 'Running' in result.output + assert 'step1' in result.output + assert 'step2' in result.output + + @patch('console_link.workflow.commands.status.WorkflowService') + def test_status_command_list_all(self, mock_service_class): + """Test status command listing all workflows.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + mock_service.list_workflows.return_value = { + 'success': True, + 'workflows': ['workflow-1', 'workflow-2'], + 'count': 2, + 'error': None + } + + mock_service.get_workflow_status.side_effect = [ + { + 'success': True, + 'workflow_name': 'workflow-1', + 'namespace': 'ma', + 'phase': 'Running', + 'progress': '1/2', + 'started_at': '2024-01-01T10:00:00Z', + 'finished_at': None, + 'steps': [], + 'error': None + }, + { + 'success': True, + 'workflow_name': 'workflow-2', + 'namespace': 'ma', + 'phase': 'Succeeded', + 'progress': '2/2', + 'started_at': '2024-01-01T09:00:00Z', + 'finished_at': '2024-01-01T09:05:00Z', + 'steps': [], + 'error': None + } + ] + + result = runner.invoke(workflow_cli, ['status']) + + assert result.exit_code == 0 + assert 'Found 2 workflow(s)' in result.output + assert 'workflow-1' in result.output + assert 'workflow-2' in result.output + + @patch('console_link.workflow.commands.stop.WorkflowService') + def test_stop_command_auto_detect(self, mock_service_class): + """Test stop command with auto-detection.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + # Mock list_workflows to return single workflow + mock_service.list_workflows.return_value = { + 'success': True, + 'workflows': ['test-workflow'], + 'count': 1, + 'error': None + } + + mock_service.stop_workflow.return_value = { + 'success': True, + 'workflow_name': 'test-workflow', + 'namespace': 'ma', + 'message': 'Workflow test-workflow stopped successfully', + 'error': None + } + + result = runner.invoke(workflow_cli, ['stop']) + + assert result.exit_code == 0 + assert 'Auto-detected workflow' in result.output + assert 'stopped successfully' in result.output + + @patch('console_link.workflow.commands.approve.WorkflowService') + def test_approve_command_auto_detect(self, mock_service_class): + """Test approve command with auto-detection.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + # Mock list_workflows to return single workflow + mock_service.list_workflows.return_value = { + 'success': True, + 'workflows': ['test-workflow'], + 'count': 1, + 'error': None + } + + mock_service.approve_workflow.return_value = { + 'success': True, + 'workflow_name': 'test-workflow', + 'namespace': 'ma', + 'message': 'Workflow test-workflow resumed successfully', + 'error': None + } + + result = runner.invoke(workflow_cli, ['approve']) + + assert result.exit_code == 0 + assert 'Auto-detected workflow' in result.output + assert 'resumed successfully' in result.output + + @patch('console_link.workflow.commands.submit.WorkflowService') + @patch('console_link.workflow.commands.submit.WorkflowConfigStore') + def test_submit_command_with_config_injection(self, mock_store_class, mock_service_class): + """Test submit command with parameter injection from config.""" + runner = CliRunner() + + # Mock the service + mock_service = Mock() + mock_service_class.return_value = mock_service + + mock_service.load_workflow_template.return_value = { + 'success': True, + 'workflow_spec': {'metadata': {'generateName': 'test-'}, 'spec': {}}, + 'source': 'embedded', + 'error': None + } + + mock_service.inject_parameters.return_value = { + 'metadata': {'generateName': 'test-'}, + 'spec': {'arguments': {'parameters': [{'name': 'param1', 'value': 'value1'}]}} + } + + mock_service.submit_workflow_to_argo.return_value = { + 'success': True, + 'workflow_name': 'test-workflow-abc', + 'workflow_uid': 'uid-123', + 'namespace': 'ma', + 'phase': None, + 'output_message': None, + 'error': None + } + + # Mock the store with config + mock_store = Mock() + mock_store_class.return_value = mock_store + mock_config = WorkflowConfig({'parameters': {'param1': 'value1'}}) + mock_store.load_config.return_value = mock_config + + result = runner.invoke(workflow_cli, ['submit']) + + assert result.exit_code == 0 + assert 'Injecting parameters' in result.output + assert 'submitted successfully' in result.output diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_integration.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_integration.py index 13ce932743..9e1e347259 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_integration.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_integration.py @@ -9,7 +9,9 @@ import os import pytest import tempfile +import time import uuid +import requests from click.testing import CliRunner from kubernetes import client, config from kubernetes.client.rest import ApiException @@ -25,7 +27,7 @@ @pytest.fixture(scope="session") def k3s_container(): """Set up k3s container for all workflow tests""" - print("\nStarting k3s container for workflow tests...") + logger.info("\nStarting k3s container for workflow tests...") # Start k3s container container = K3SContainer(image="rancher/k3s:latest") @@ -47,7 +49,7 @@ def k3s_container(): yield container - print("\nCleaning up k3s container...") + logger.info("\nCleaning up k3s container...") # Clean up container.stop() if os.path.exists(kubeconfig_path): @@ -56,6 +58,121 @@ def k3s_container(): del os.environ['KUBECONFIG'] +@pytest.fixture(scope="session") +def argo_workflows(k3s_container): + """Install Argo Workflows in the k3s cluster""" + logger.info("\nInstalling Argo Workflows in k3s...") + + # Argo Workflows version to install + argo_version = "v3.5.12" + argo_namespace = "argo" + + v1 = client.CoreV1Api() + apps_v1 = client.AppsV1Api() + + # Create argo namespace + namespace = client.V1Namespace( + metadata=client.V1ObjectMeta(name=argo_namespace) + ) + try: + v1.create_namespace(body=namespace) + logger.info(f"Created namespace: {argo_namespace}") + except ApiException as e: + if e.status != 409: # Ignore if already exists + raise + logger.info(f"Namespace {argo_namespace} already exists") + + # Download and apply the Argo Workflows manifest + manifest_url = ( + f"https://github.com/argoproj/argo-workflows/releases/download/" + f"{argo_version}/quick-start-minimal.yaml" + ) + + try: + logger.info(f"Downloading Argo Workflows manifest from {manifest_url}") + response = requests.get(manifest_url, timeout=30) + response.raise_for_status() + manifest_content = response.text + + # Write manifest to temporary file + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write(manifest_content) + manifest_path = f.name + + # Apply the manifest using Kubernetes Python client + logger.info("Applying Argo Workflows manifest...") + from kubernetes import utils + k8s_client = client.ApiClient() + + try: + utils.create_from_yaml( + k8s_client, + manifest_path, + namespace=argo_namespace + ) + logger.info("Argo Workflows manifest applied successfully") + except Exception as apply_error: + # Some resources might already exist, which is okay + logger.info(f"Note during apply: {apply_error}") + logger.info("Continuing with installation verification...") + + # Clean up temporary file + if os.path.exists(manifest_path): + os.unlink(manifest_path) + + except Exception as e: + logger.info(f"Error installing Argo Workflows: {e}") + raise + + # Wait for Argo Workflows pods to be ready + logger.info("Waiting for Argo Workflows pods to be ready...") + max_wait_time = 120 # 2 minutes + start_time = time.time() + + while time.time() - start_time < max_wait_time: + try: + # Check if argo-server deployment is ready + server_deployment = apps_v1.read_namespaced_deployment( + name="argo-server", + namespace=argo_namespace + ) + + # Check if workflow-controller deployment is ready + controller_deployment = apps_v1.read_namespaced_deployment( + name="workflow-controller", + namespace=argo_namespace + ) + + server_ready = ( + server_deployment.status.ready_replicas is not None and + server_deployment.status.ready_replicas >= 1 + ) + + controller_ready = ( + controller_deployment.status.ready_replicas is not None and + controller_deployment.status.ready_replicas >= 1 + ) + + if server_ready and controller_ready: + logger.info("Argo Workflows is ready!") + break + + except ApiException: + pass # Deployments might not exist yet + + time.sleep(2) + else: + raise TimeoutError("Argo Workflows pods did not become ready in time") + + yield { + "namespace": argo_namespace, + "version": argo_version + } + + # Cleanup is handled by k3s_container fixture + logger.info("\nArgo Workflows cleanup (handled by k3s container cleanup)") + + @pytest.fixture(scope="function") def test_namespace(k3s_container): """Create a unique namespace for each test and clean it up afterwards""" @@ -70,7 +187,7 @@ def test_namespace(k3s_container): try: v1.create_namespace(body=namespace) - print(f"\nCreated test namespace: {namespace_name}") + logger.info(f"\nCreated test namespace: {namespace_name}") except ApiException as e: if e.status != 409: # Ignore if already exists raise @@ -78,7 +195,7 @@ def test_namespace(k3s_container): yield namespace_name # Clean up the namespace after the test - print(f"\nDeleting test namespace: {namespace_name}") + logger.info(f"\nDeleting test namespace: {namespace_name}") try: v1.delete_namespace(name=namespace_name) except ApiException as e: @@ -399,6 +516,193 @@ def test_workflow_util_completions_fish(self, runner): assert "complete" in result.output +@pytest.mark.slow +class TestArgoWorkflows: + """Integration tests for Argo Workflows installation in k3s""" + + def test_argo_workflows_installation(self, argo_workflows): + """Test that Argo Workflows is properly installed in k3s""" + argo_namespace = argo_workflows["namespace"] + argo_version = argo_workflows["version"] + + logger.info(f"\nVerifying Argo Workflows {argo_version} installation in namespace {argo_namespace}") + + v1 = client.CoreV1Api() + apps_v1 = client.AppsV1Api() + + # Verify argo namespace exists + namespaces = v1.list_namespace() + namespace_names = [ns.metadata.name for ns in namespaces.items] + assert argo_namespace in namespace_names, f"Argo namespace {argo_namespace} not found" + logger.info(f"✓ Namespace {argo_namespace} exists") + + # Verify argo-server deployment exists and is ready + server_deployment = apps_v1.read_namespaced_deployment( + name="argo-server", + namespace=argo_namespace + ) + assert server_deployment is not None, "argo-server deployment not found" + assert server_deployment.status.ready_replicas >= 1, "argo-server deployment not ready" + logger.info(f"✓ argo-server deployment is ready ({server_deployment.status.ready_replicas} replicas)") + + # Verify workflow-controller deployment exists and is ready + controller_deployment = apps_v1.read_namespaced_deployment( + name="workflow-controller", + namespace=argo_namespace + ) + assert controller_deployment is not None, "workflow-controller deployment not found" + assert controller_deployment.status.ready_replicas >= 1, "workflow-controller deployment not ready" + logger.info( + f"✓ workflow-controller deployment is ready " + f"({controller_deployment.status.ready_replicas} replicas)") + + # Verify argo-server service exists + services = v1.list_namespaced_service(namespace=argo_namespace) + service_names = [svc.metadata.name for svc in services.items] + assert "argo-server" in service_names, "argo-server service not found" + logger.info(f"✓ argo-server service exists") + + # Verify pods are running + pods = v1.list_namespaced_pod(namespace=argo_namespace) + running_pods = [pod for pod in pods.items if pod.status.phase == "Running"] + assert len(running_pods) >= 2, f"Expected at least 2 running pods, found {len(running_pods)}" + logger.info(f"✓ Found {len(running_pods)} running pods in {argo_namespace} namespace") + + for pod in running_pods: + logger.info(f" - {pod.metadata.name}: {pod.status.phase}") + + logger.info(f"\n✓ Argo Workflows {argo_version} is successfully installed and running!") + + def test_workflow_submit_hello_world(self, argo_workflows): + """Test submitting a hello-world workflow to Argo via Kubernetes API with output verification""" + argo_namespace = argo_workflows["namespace"] + + logger.info(f"\nTesting workflow submission to Argo in namespace {argo_namespace}") + + # Create unique message for this test + test_message = f"hello world from test {uuid.uuid4().hex[:8]}" + + # Create workflow specification as a Kubernetes custom resource with output parameter + workflow_spec = { + "apiVersion": "argoproj.io/v1alpha1", + "kind": "Workflow", + "metadata": { + "generateName": "test-hello-world-", + "namespace": argo_namespace, + "labels": { + "workflows.argoproj.io/completed": "false" + } + }, + "spec": { + "templates": [ + { + "name": "hello-world", + "outputs": { + "parameters": [ + { + "name": "message", + "valueFrom": { + "path": "/tmp/message.txt" + } + } + ] + }, + "container": { + "image": "busybox", + "command": ["sh", "-c"], + "args": [f'echo "{test_message}" | tee /tmp/message.txt'] + } + } + ], + "entrypoint": "hello-world" + } + } + + # Submit workflow using Kubernetes API + custom_api = client.CustomObjectsApi() + + try: + logger.info("Submitting workflow via Kubernetes API...") + + # Create the workflow custom resource + result = custom_api.create_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + namespace=argo_namespace, + plural="workflows", + body=workflow_spec + ) + + workflow_name = result.get("metadata", {}).get("name") + workflow_uid = result.get("metadata", {}).get("uid") + + assert workflow_name is not None, "Workflow name not returned" + assert workflow_name.startswith("test-hello-world-"), f"Unexpected workflow name: {workflow_name}" + assert workflow_uid is not None, "Workflow UID not returned" + + logger.info("✓ Workflow submitted successfully!") + logger.info(f" Name: {workflow_name}") + logger.info(f" UID: {workflow_uid}") + + # Wait for workflow to complete + logger.info("Waiting for workflow to complete...") + max_wait = 60 # 60 seconds timeout + start_time = time.time() + workflow_phase = "Unknown" + + while time.time() - start_time < max_wait: + workflow = custom_api.get_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + namespace=argo_namespace, + plural="workflows", + name=workflow_name + ) + + workflow_phase = workflow.get("status", {}).get("phase", "Unknown") + logger.info(f" Workflow phase: {workflow_phase}") + + # Check if workflow reached a terminal state + if workflow_phase in ["Succeeded", "Failed", "Error"]: + break + + time.sleep(2) + + assert workflow is not None, "Workflow not found in Kubernetes" + assert workflow["metadata"]["name"] == workflow_name + logger.info("✓ Workflow verified in Kubernetes") + + # Verify workflow succeeded + assert workflow_phase == "Succeeded", f"Workflow did not succeed, phase: {workflow_phase}" + logger.info(f"✓ Workflow completed successfully with phase: {workflow_phase}") + + # Extract and verify output parameter + output_message = None + nodes = workflow.get("status", {}).get("nodes", {}) + + for node_id, node in nodes.items(): + outputs = node.get("outputs", {}) + parameters = outputs.get("parameters", []) + + for param in parameters: + if param.get("name") == "message": + output_message = param.get("value", "").strip() + break + + if output_message: + break + + assert output_message is not None, "Could not retrieve workflow output" + assert test_message in output_message, \ + f"Output doesn't match expected message. Expected: '{test_message}', Got: '{output_message}'" + + logger.info(f"✓ Container output verified: {output_message}") + logger.info("✓ Output verification successful - container executed correctly!") + + except ApiException as e: + pytest.fail(f"Failed to submit workflow via Kubernetes API: {e}") + + def test_k3s_container_support(): """Test that k3s container support is available""" try: diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_service.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_service.py new file mode 100644 index 0000000000..adb910c776 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_service.py @@ -0,0 +1,1129 @@ +"""Unit tests for WorkflowService class.""" + +import os +import pytest +from unittest.mock import Mock, patch, mock_open + +from console_link.workflow.services.workflow_service import WorkflowService +from console_link.workflow.models.config import WorkflowConfig + + +class TestWorkflowServiceTemplateLoading: + """Test suite for workflow template loading functionality.""" + + def test_get_default_workflow_spec(self): + """Test that default workflow spec is returned correctly.""" + service = WorkflowService() + + workflow = service.get_default_workflow_spec() + + # Verify structure + assert 'metadata' in workflow + assert 'spec' in workflow + assert 'templates' in workflow['spec'] + assert 'entrypoint' in workflow['spec'] + assert workflow['spec']['entrypoint'] == 'main' + + # Verify it returns a copy (not the cached instance) + workflow2 = service.get_default_workflow_spec() + assert workflow is not workflow2 + assert workflow == workflow2 + + def test_load_workflow_template_default_no_env_var(self): + """Test loading default workflow when WORKFLOW_TEMPLATE_PATH is not set.""" + service = WorkflowService() + + with patch.dict(os.environ, {}, clear=True): + result = service.load_workflow_template() + + assert result['success'] is True + assert result['source'] == 'embedded' + assert result['error'] is None + assert 'spec' in result['workflow_spec'] + + def test_load_workflow_template_from_explicit_path(self): + """Test loading workflow from explicit path parameter.""" + service = WorkflowService() + + workflow_yaml = """ +metadata: + name: test-workflow +spec: + templates: + - name: test + container: + image: busybox + entrypoint: test +""" + + with patch('builtins.open', mock_open(read_data=workflow_yaml)): + result = service.load_workflow_template('/tmp/workflow.yaml') + + assert result['success'] is True + assert result['source'] == '/tmp/workflow.yaml' + assert result['error'] is None + assert result['workflow_spec']['metadata']['name'] == 'test-workflow' + + def test_load_workflow_template_from_env_var(self): + """Test loading workflow from WORKFLOW_TEMPLATE_PATH environment variable.""" + service = WorkflowService() + + workflow_yaml = """ +metadata: + name: env-workflow +spec: + templates: + - name: env-test + container: + image: alpine + entrypoint: env-test +""" + + with patch.dict(os.environ, {'WORKFLOW_TEMPLATE_PATH': '/path/to/workflow.yaml'}): + with patch('builtins.open', mock_open(read_data=workflow_yaml)): + result = service.load_workflow_template() + + assert result['success'] is True + assert result['source'] == '/path/to/workflow.yaml' + assert result['error'] is None + assert result['workflow_spec']['metadata']['name'] == 'env-workflow' + + def test_load_workflow_template_file_not_found(self): + """Test handling of missing template file.""" + service = WorkflowService() + + with patch('builtins.open', side_effect=FileNotFoundError("File not found")): + result = service.load_workflow_template('/missing/workflow.yaml') + + # Should fall back to default + assert result['success'] is False + assert result['source'] == 'embedded' + assert 'not found' in result['error'] + # Still returns a valid workflow spec (the default) + assert 'spec' in result['workflow_spec'] + + def test_load_workflow_template_invalid_yaml(self): + """Test handling of invalid YAML syntax.""" + service = WorkflowService() + + # This is truly invalid YAML - unmatched brackets + invalid_yaml = """ +metadata: + name: broken + items: [unclosed bracket + other: value +""" + + with patch('builtins.open', mock_open(read_data=invalid_yaml)): + result = service.load_workflow_template('/tmp/invalid.yaml') + + # Should fall back to default + assert result['success'] is False + assert result['source'] == 'embedded' + assert 'Invalid YAML' in result['error'] or 'YAML' in result['error'] + assert 'spec' in result['workflow_spec'] + + def test_load_workflow_template_empty_file(self): + """Test handling of empty template file.""" + service = WorkflowService() + + with patch('builtins.open', mock_open(read_data="")): + result = service.load_workflow_template('/tmp/empty.yaml') + + # Should fall back to default + assert result['success'] is False + assert result['source'] == 'embedded' + assert 'empty' in result['error'].lower() + + def test_load_workflow_template_missing_spec(self): + """Test handling of template without required 'spec' section.""" + service = WorkflowService() + + invalid_workflow = """ +metadata: + name: no-spec +""" + + with patch('builtins.open', mock_open(read_data=invalid_workflow)): + result = service.load_workflow_template('/tmp/no-spec.yaml') + + # Should fall back to default + assert result['success'] is False + assert result['source'] == 'embedded' + assert 'spec' in result['error'] + + +class TestWorkflowServiceParameterInjection: + """Test suite for parameter injection functionality.""" + + def test_inject_parameters_empty_config(self): + """Test parameter injection with empty config.""" + service = WorkflowService() + + workflow = {'spec': {'arguments': {'parameters': []}}} + config = WorkflowConfig({}) + + result = service.inject_parameters(workflow, config) + + assert result == workflow + assert result is not workflow # Returns a copy + + def test_inject_parameters_no_config(self): + """Test parameter injection with None config.""" + service = WorkflowService() + + workflow = {'spec': {'arguments': {'parameters': []}}} + + result = service.inject_parameters(workflow, None) + + assert result == workflow + + def test_inject_parameters_with_values(self): + """Test parameter injection with actual values.""" + service = WorkflowService() + + workflow = { + 'metadata': {'name': 'test'}, + 'spec': { + 'arguments': { + 'parameters': [] + } + } + } + + config = WorkflowConfig({ + 'parameters': { + 'param1': 'value1', + 'param2': 'value2' + } + }) + + result = service.inject_parameters(workflow, config) + + params = result['spec']['arguments']['parameters'] + assert len(params) == 2 + assert {'name': 'param1', 'value': 'value1'} in params + assert {'name': 'param2', 'value': 'value2'} in params + + def test_inject_parameters_update_existing(self): + """Test parameter injection updates existing parameters.""" + service = WorkflowService() + + workflow = { + 'spec': { + 'arguments': { + 'parameters': [ + {'name': 'param1', 'value': 'old-value'} + ] + } + } + } + + config = WorkflowConfig({ + 'parameters': { + 'param1': 'new-value', + 'param2': 'another-value' + } + }) + + result = service.inject_parameters(workflow, config) + + params = result['spec']['arguments']['parameters'] + assert len(params) == 2 + + param1 = next(p for p in params if p['name'] == 'param1') + assert param1['value'] == 'new-value' + + param2 = next(p for p in params if p['name'] == 'param2') + assert param2['value'] == 'another-value' + + def test_inject_parameters_creates_structure(self): + """Test parameter injection creates missing structure.""" + service = WorkflowService() + + workflow = {'metadata': {'name': 'test'}} + + config = WorkflowConfig({ + 'parameters': { + 'param1': 'value1' + } + }) + + result = service.inject_parameters(workflow, config) + + assert 'spec' in result + assert 'arguments' in result['spec'] + assert 'parameters' in result['spec']['arguments'] + assert len(result['spec']['arguments']['parameters']) == 1 + + def test_inject_parameters_no_parameters_in_config(self): + """Test parameter injection when config has no 'parameters' key.""" + service = WorkflowService() + + workflow = {'spec': {'arguments': {'parameters': []}}} + config = WorkflowConfig({'other_key': 'value'}) + + result = service.inject_parameters(workflow, config) + + assert result['spec']['arguments']['parameters'] == [] + + +class TestWorkflowServiceSubmission: + """Test suite for workflow submission functionality.""" + + @patch('console_link.workflow.services.workflow_service.requests.post') + def test_submit_workflow_success(self, mock_post): + """Test successful workflow submission.""" + service = WorkflowService() + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'metadata': { + 'name': 'test-workflow-abc123', + 'uid': 'uid-12345' + } + } + mock_post.return_value = mock_response + + workflow_spec = {'metadata': {'name': 'test'}, 'spec': {}} + + result = service.submit_workflow_to_argo( + workflow_spec=workflow_spec, + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + assert result['success'] is True + assert result['workflow_name'] == 'test-workflow-abc123' + assert result['workflow_uid'] == 'uid-12345' + assert result['namespace'] == 'argo' + assert result['error'] is None + + # Verify request was made correctly + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args[0][0] == 'https://localhost:2746/api/v1/workflows/argo' + assert call_args[1]['verify'] is False + + @patch('console_link.workflow.services.workflow_service.requests.post') + def test_submit_workflow_with_token(self, mock_post): + """Test workflow submission with authentication token.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.json.return_value = { + 'metadata': {'name': 'test', 'uid': 'uid'} + } + mock_post.return_value = mock_response + + workflow_spec = {'metadata': {}, 'spec': {}} + + result = service.submit_workflow_to_argo( + workflow_spec=workflow_spec, + namespace='argo', + argo_server='https://localhost:2746', + token='secret-token' + ) + + # Verify Authorization header was set + call_args = mock_post.call_args + headers = call_args[1]['headers'] + assert 'Authorization' in headers + assert headers['Authorization'] == 'Bearer secret-token' + + @patch('console_link.workflow.services.workflow_service.requests.post') + def test_submit_workflow_api_error(self, mock_post): + """Test workflow submission with API error.""" + service = WorkflowService() + + # Mock API error + mock_response = Mock() + mock_response.status_code = 400 + mock_response.json.return_value = {'message': 'Invalid workflow'} + mock_post.side_effect = Exception("Request failed") + + workflow_spec = {'metadata': {}, 'spec': {}} + + result = service.submit_workflow_to_argo( + workflow_spec=workflow_spec, + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['error'] is not None + assert 'error' in result['error'].lower() + + @patch('console_link.workflow.services.workflow_service.requests.post') + def test_submit_workflow_ensures_namespace(self, mock_post): + """Test that namespace is always set in workflow metadata.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.json.return_value = { + 'metadata': {'name': 'test', 'uid': 'uid'} + } + mock_post.return_value = mock_response + + # Workflow without namespace in metadata + workflow_spec = {'metadata': {'name': 'test'}, 'spec': {}} + + service.submit_workflow_to_argo( + workflow_spec=workflow_spec, + namespace='custom-namespace', + argo_server='https://localhost:2746' + ) + + # Verify namespace was added to request body + call_args = mock_post.call_args + request_body = call_args[1]['json'] + assert request_body['workflow']['metadata']['namespace'] == 'custom-namespace' + + +class TestWorkflowServiceMonitoring: + """Test suite for workflow monitoring functionality.""" + + @patch('console_link.workflow.services.workflow_service.client.CustomObjectsApi') + def test_wait_for_workflow_completion_success(self, mock_api_class): + """Test successful workflow completion monitoring.""" + service = WorkflowService() + + # Mock Kubernetes API + mock_api = Mock() + mock_api_class.return_value = mock_api + + # First call: workflow running, second call: workflow succeeded + mock_api.get_namespaced_custom_object.side_effect = [ + { + 'status': {'phase': 'Running', 'nodes': {}} + }, + { + 'status': { + 'phase': 'Succeeded', + 'nodes': { + 'node-1': { + 'outputs': { + 'parameters': [ + {'name': 'message', 'value': 'Hello World'} + ] + } + } + } + } + } + ] + + phase, output = service.wait_for_workflow_completion( + namespace='argo', + workflow_name='test-workflow', + timeout=10, + interval=1 + ) + + assert phase == 'Succeeded' + assert output == 'Hello World' + + @patch('console_link.workflow.services.workflow_service.client.CustomObjectsApi') + def test_wait_for_workflow_completion_timeout(self, mock_api_class): + """Test workflow monitoring timeout.""" + service = WorkflowService() + + mock_api = Mock() + mock_api_class.return_value = mock_api + + # Always return running status + mock_api.get_namespaced_custom_object.return_value = { + 'status': {'phase': 'Running', 'nodes': {}} + } + + with pytest.raises(TimeoutError) as exc_info: + service.wait_for_workflow_completion( + namespace='argo', + workflow_name='test-workflow', + timeout=2, + interval=1 + ) + + assert 'did not complete' in str(exc_info.value) + + @patch('console_link.workflow.services.workflow_service.client.CustomObjectsApi') + def test_wait_for_workflow_completion_failed(self, mock_api_class): + """Test monitoring workflow that fails.""" + service = WorkflowService() + + mock_api = Mock() + mock_api_class.return_value = mock_api + + mock_api.get_namespaced_custom_object.return_value = { + 'status': {'phase': 'Failed', 'nodes': {}} + } + + phase, output = service.wait_for_workflow_completion( + namespace='argo', + workflow_name='test-workflow', + timeout=10, + interval=1 + ) + + assert phase == 'Failed' + assert output is None + + @patch('console_link.workflow.services.workflow_service.client.CustomObjectsApi') + def test_wait_for_workflow_completion_no_output(self, mock_api_class): + """Test monitoring workflow that completes without output.""" + service = WorkflowService() + + mock_api = Mock() + mock_api_class.return_value = mock_api + + mock_api.get_namespaced_custom_object.return_value = { + 'status': {'phase': 'Succeeded', 'nodes': {}} + } + + phase, output = service.wait_for_workflow_completion( + namespace='argo', + workflow_name='test-workflow', + timeout=10, + interval=1 + ) + + assert phase == 'Succeeded' + assert output is None + + @patch('console_link.workflow.services.workflow_service.client.CustomObjectsApi') + def test_wait_for_workflow_completion_api_error(self, mock_api_class): + """Test handling of Kubernetes API errors during monitoring.""" + service = WorkflowService() + + mock_api = Mock() + mock_api_class.return_value = mock_api + + mock_api.get_namespaced_custom_object.side_effect = Exception("API Error") + + with pytest.raises(Exception) as exc_info: + service.wait_for_workflow_completion( + namespace='argo', + workflow_name='test-workflow', + timeout=10, + interval=1 + ) + + assert 'API Error' in str(exc_info.value) + + +class TestWorkflowServiceStop: + """Test suite for workflow stop functionality.""" + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_stop_workflow_success(self, mock_put): + """Test successful workflow stop.""" + service = WorkflowService() + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + result = service.stop_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + assert result['success'] is True + assert result['workflow_name'] == 'test-workflow' + assert result['namespace'] == 'argo' + assert 'stopped successfully' in result['message'] + assert result['error'] is None + + # Verify request was made correctly + mock_put.assert_called_once() + call_args = mock_put.call_args + assert call_args[0][0] == 'https://localhost:2746/api/v1/workflows/argo/test-workflow/stop' + assert call_args[1]['verify'] is False + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_stop_workflow_with_token(self, mock_put): + """Test workflow stop with authentication token.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + result = service.stop_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + token='secret-token' + ) + + assert result['success'] is True + + # Verify Authorization header was set + call_args = mock_put.call_args + headers = call_args[1]['headers'] + assert 'Authorization' in headers + assert headers['Authorization'] == 'Bearer secret-token' + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_stop_workflow_not_found(self, mock_put): + """Test workflow stop with 404 not found.""" + service = WorkflowService() + + # Mock 404 response + mock_response = Mock() + mock_response.status_code = 404 + mock_put.return_value = mock_response + + result = service.stop_workflow( + workflow_name='nonexistent-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['workflow_name'] == 'nonexistent-workflow' + assert result['namespace'] == 'argo' + assert 'not found' in result['message'] + assert result['error'] is not None + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_stop_workflow_api_error(self, mock_put): + """Test workflow stop with API error.""" + service = WorkflowService() + + # Mock API error response + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = 'Internal server error' + mock_response.json.side_effect = ValueError("Not JSON") + mock_put.return_value = mock_response + + result = service.stop_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['workflow_name'] == 'test-workflow' + assert 'Failed to stop workflow' in result['message'] + assert result['error'] is not None + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_stop_workflow_network_error(self, mock_put): + """Test workflow stop with network error.""" + service = WorkflowService() + + # Mock network exception + import requests + mock_put.side_effect = requests.exceptions.ConnectionError("Connection failed") + + result = service.stop_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['workflow_name'] == 'test-workflow' + assert 'Network error' in result['message'] + assert result['error'] is not None + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_stop_workflow_insecure_flag(self, mock_put): + """Test workflow stop with insecure flag variations.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + # Test with insecure=True + service.stop_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + call_args = mock_put.call_args + assert call_args[1]['verify'] is False + + mock_put.reset_mock() + + # Test with insecure=False (default) + service.stop_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=False + ) + + call_args = mock_put.call_args + assert call_args[1]['verify'] is True + + +class TestWorkflowServiceList: + """Test suite for workflow list functionality.""" + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_success(self, mock_get): + """Test successful workflow listing.""" + service = WorkflowService() + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'items': [ + {'metadata': {'name': 'workflow-1'}}, + {'metadata': {'name': 'workflow-2'}}, + {'metadata': {'name': 'workflow-3'}} + ] + } + mock_get.return_value = mock_response + + result = service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + assert result['success'] is True + assert result['count'] == 3 + assert 'workflow-1' in result['workflows'] + assert 'workflow-2' in result['workflows'] + assert 'workflow-3' in result['workflows'] + assert result['error'] is None + + # Verify request was made correctly + mock_get.assert_called_once() + call_args = mock_get.call_args + assert call_args[0][0] == 'https://localhost:2746/api/v1/workflows/argo' + assert call_args[1]['verify'] is False + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_empty(self, mock_get): + """Test listing workflows when none exist.""" + service = WorkflowService() + + # Mock empty response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'items': []} + mock_get.return_value = mock_response + + result = service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is True + assert result['count'] == 0 + assert result['workflows'] == [] + assert result['error'] is None + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_with_token(self, mock_get): + """Test workflow listing with authentication token.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'items': []} + mock_get.return_value = mock_response + + result = service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746', + token='secret-token' + ) + + assert result['success'] is True + + # Verify Authorization header was set + call_args = mock_get.call_args + headers = call_args[1]['headers'] + assert 'Authorization' in headers + assert headers['Authorization'] == 'Bearer secret-token' + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_api_error(self, mock_get): + """Test workflow listing with API error.""" + service = WorkflowService() + + # Mock API error response + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = 'Internal server error' + mock_response.json.side_effect = ValueError("Not JSON") + mock_get.return_value = mock_response + + result = service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['count'] == 0 + assert result['workflows'] == [] + assert 'Failed to list workflows' in result['error'] + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_network_error(self, mock_get): + """Test workflow listing with network error.""" + service = WorkflowService() + + # Mock network exception + import requests + mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed") + + result = service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['count'] == 0 + assert result['workflows'] == [] + assert 'Connection failed' in result['error'] + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_filters_invalid_names(self, mock_get): + """Test workflow listing filters out workflows without names.""" + service = WorkflowService() + + # Mock response with some workflows missing names + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'items': [ + {'metadata': {'name': 'workflow-1'}}, + {'metadata': {}}, # Missing name + {'metadata': {'name': 'workflow-2'}}, + {'other': 'data'} # Missing metadata + ] + } + mock_get.return_value = mock_response + + result = service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is True + assert result['count'] == 2 + assert 'workflow-1' in result['workflows'] + assert 'workflow-2' in result['workflows'] + assert len(result['workflows']) == 2 + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_list_workflows_insecure_flag(self, mock_get): + """Test workflow listing with insecure flag variations.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {'items': []} + mock_get.return_value = mock_response + + # Test with insecure=True + service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + call_args = mock_get.call_args + assert call_args[1]['verify'] is False + + mock_get.reset_mock() + + # Test with insecure=False (default) + service.list_workflows( + namespace='argo', + argo_server='https://localhost:2746', + insecure=False + ) + + call_args = mock_get.call_args + assert call_args[1]['verify'] is True + + +class TestWorkflowServiceStatus: + """Test suite for workflow status functionality.""" + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_get_workflow_status_success(self, mock_get): + """Test successful workflow status retrieval with step parsing.""" + service = WorkflowService() + + # Mock successful response with workflow status including steps + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'status': { + 'phase': 'Running', + 'progress': '1/2', + 'startedAt': '2024-01-01T10:00:00Z', + 'finishedAt': None, + 'nodes': { + 'node-1': { + 'type': 'Pod', + 'displayName': 'step1', + 'phase': 'Succeeded', + 'startedAt': '2024-01-01T10:00:00Z' + }, + 'node-2': { + 'type': 'Pod', + 'displayName': 'step2', + 'phase': 'Running', + 'startedAt': '2024-01-01T10:01:00Z' + } + } + } + } + mock_get.return_value = mock_response + + result = service.get_workflow_status( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + assert result['success'] is True + assert result['workflow_name'] == 'test-workflow' + assert result['phase'] == 'Running' + assert result['progress'] == '1/2' + assert len(result['steps']) == 2 + assert result['steps'][0]['name'] == 'step1' + assert result['steps'][0]['phase'] == 'Succeeded' + assert result['steps'][1]['name'] == 'step2' + assert result['steps'][1]['phase'] == 'Running' + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_get_workflow_status_with_suspend_node(self, mock_get): + """Test workflow status with suspend/approval node.""" + service = WorkflowService() + + # Mock response with suspend node + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'status': { + 'phase': 'Running', + 'progress': '1/2', + 'startedAt': '2024-01-01T10:00:00Z', + 'finishedAt': None, + 'nodes': { + 'node-1': { + 'type': 'Pod', + 'displayName': 'step1', + 'phase': 'Succeeded', + 'startedAt': '2024-01-01T10:00:00Z' + }, + 'node-2': { + 'type': 'Suspend', + 'displayName': 'approval-gate', + 'phase': 'Running', + 'startedAt': '2024-01-01T10:01:00Z' + } + } + } + } + mock_get.return_value = mock_response + + result = service.get_workflow_status( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is True + assert result['phase'] == 'Running' + assert len(result['steps']) == 2 + assert result['steps'][1]['type'] == 'Suspend' + assert result['steps'][1]['phase'] == 'Running' + + @patch('console_link.workflow.services.workflow_service.requests.get') + def test_get_workflow_status_not_found(self, mock_get): + """Test workflow status when workflow doesn't exist.""" + service = WorkflowService() + + # Mock 404 response + mock_response = Mock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + result = service.get_workflow_status( + workflow_name='nonexistent', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['phase'] == 'Unknown' + assert result['error'] is not None + assert 'failed' in result['error'].lower() or '404' in result['error'] + + +class TestWorkflowServiceApprove: + """Test suite for workflow approve/resume functionality.""" + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_approve_workflow_success(self, mock_put): + """Test successful workflow approve/resume.""" + service = WorkflowService() + + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + result = service.approve_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + assert result['success'] is True + assert result['workflow_name'] == 'test-workflow' + assert result['namespace'] == 'argo' + assert 'resumed successfully' in result['message'] + assert result['error'] is None + + # Verify request was made correctly + mock_put.assert_called_once() + call_args = mock_put.call_args + assert call_args[0][0] == 'https://localhost:2746/api/v1/workflows/argo/test-workflow/resume' + assert call_args[1]['verify'] is False + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_approve_workflow_with_token(self, mock_put): + """Test workflow approve with authentication token.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + result = service.approve_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + token='secret-token' + ) + + assert result['success'] is True + + # Verify Authorization header was set + call_args = mock_put.call_args + headers = call_args[1]['headers'] + assert 'Authorization' in headers + assert headers['Authorization'] == 'Bearer secret-token' + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_approve_workflow_not_found(self, mock_put): + """Test workflow approve with 404 not found.""" + service = WorkflowService() + + # Mock 404 response + mock_response = Mock() + mock_response.status_code = 404 + mock_put.return_value = mock_response + + result = service.approve_workflow( + workflow_name='nonexistent-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['workflow_name'] == 'nonexistent-workflow' + assert result['namespace'] == 'argo' + assert 'not found' in result['message'] + assert result['error'] is not None + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_approve_workflow_api_error(self, mock_put): + """Test workflow approve with API error.""" + service = WorkflowService() + + # Mock API error response + mock_response = Mock() + mock_response.status_code = 500 + mock_response.text = 'Internal server error' + mock_response.json.side_effect = ValueError("Not JSON") + mock_put.return_value = mock_response + + result = service.approve_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['workflow_name'] == 'test-workflow' + assert 'Failed to resume workflow' in result['message'] + assert result['error'] is not None + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_approve_workflow_network_error(self, mock_put): + """Test workflow approve with network error.""" + service = WorkflowService() + + # Mock network exception + import requests + mock_put.side_effect = requests.exceptions.ConnectionError("Connection failed") + + result = service.approve_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746' + ) + + assert result['success'] is False + assert result['workflow_name'] == 'test-workflow' + assert 'Network error' in result['message'] + assert result['error'] is not None + + @patch('console_link.workflow.services.workflow_service.requests.put') + def test_approve_workflow_insecure_flag(self, mock_put): + """Test workflow approve with insecure flag variations.""" + service = WorkflowService() + + mock_response = Mock() + mock_response.status_code = 200 + mock_put.return_value = mock_response + + # Test with insecure=True + service.approve_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=True + ) + + call_args = mock_put.call_args + assert call_args[1]['verify'] is False + + mock_put.reset_mock() + + # Test with insecure=False (default) + service.approve_workflow( + workflow_name='test-workflow', + namespace='argo', + argo_server='https://localhost:2746', + insecure=False + ) + + call_args = mock_put.call_args + assert call_args[1]['verify'] is True diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_store.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_store.py new file mode 100644 index 0000000000..ae06696496 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/tests/workflow-tests/test_workflow_store.py @@ -0,0 +1,123 @@ +"""Unit tests for WorkflowConfigStore class.""" + +import pytest +from unittest.mock import Mock +from kubernetes.client.rest import ApiException + +from console_link.workflow.models.store import WorkflowConfigStore +from console_link.workflow.models.config import WorkflowConfig + + +class TestWorkflowConfigStore: + """Test suite for WorkflowConfigStore CRUD operations.""" + + def test_save_config_creates_new_configmap(self): + """Test save_config creates a new ConfigMap when it doesn't exist.""" + # Mock Kubernetes API client + mock_v1 = Mock() + mock_v1.patch_namespaced_config_map.side_effect = ApiException(status=404) + mock_v1.create_namespaced_config_map.return_value = None + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + config = WorkflowConfig({"key": "value"}) + result = store.save_config(config, "test-session") + + assert "created" in result.lower() + mock_v1.create_namespaced_config_map.assert_called_once() + + def test_save_config_updates_existing_configmap(self): + """Test save_config updates an existing ConfigMap.""" + # Mock Kubernetes API client + mock_v1 = Mock() + mock_v1.patch_namespaced_config_map.return_value = None + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + config = WorkflowConfig({"key": "updated_value"}) + result = store.save_config(config, "test-session") + + assert "updated" in result.lower() + mock_v1.patch_namespaced_config_map.assert_called_once() + + def test_load_config_success(self): + """Test load_config successfully retrieves an existing config.""" + # Mock Kubernetes API client + mock_v1 = Mock() + mock_config_map = Mock() + mock_config_map.data = { + "workflow_config.json": '{"key": "value"}' + } + mock_v1.read_namespaced_config_map.return_value = mock_config_map + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + config = store.load_config("test-session") + + assert config is not None + assert config.get("key") == "value" + mock_v1.read_namespaced_config_map.assert_called_once() + + def test_load_config_not_found(self): + """Test load_config returns None when ConfigMap doesn't exist.""" + # Mock Kubernetes API client + mock_v1 = Mock() + mock_v1.read_namespaced_config_map.side_effect = ApiException(status=404) + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + config = store.load_config("nonexistent-session") + + assert config is None + + def test_delete_config_success(self): + """Test delete_config successfully deletes a ConfigMap.""" + # Mock Kubernetes API client + mock_v1 = Mock() + mock_v1.delete_namespaced_config_map.return_value = None + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + result = store.delete_config("test-session") + + assert "deleted" in result.lower() + mock_v1.delete_namespaced_config_map.assert_called_once() + + def test_delete_config_not_found(self): + """Test delete_config raises exception when ConfigMap doesn't exist.""" + # Mock Kubernetes API client + mock_v1 = Mock() + mock_v1.delete_namespaced_config_map.side_effect = ApiException(status=404) + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + with pytest.raises(ApiException) as exc_info: + store.delete_config("nonexistent-session") + + assert exc_info.value.status == 404 + + def test_list_sessions(self): + """Test list_sessions returns all session names.""" + # Mock Kubernetes API client + mock_v1 = Mock() + + # Create mock ConfigMaps + mock_cm1 = Mock() + mock_cm1.data = {"session_name": "session1"} + mock_cm1.metadata.labels = {"session": "session1"} + + mock_cm2 = Mock() + mock_cm2.data = {"session_name": "session2"} + mock_cm2.metadata.labels = {"session": "session2"} + + mock_list = Mock() + mock_list.items = [mock_cm1, mock_cm2] + mock_v1.list_namespaced_config_map.return_value = mock_list + + store = WorkflowConfigStore(namespace="test-ns", k8s_client=mock_v1) + + sessions = store.list_sessions() + + assert len(sessions) == 2 + assert "session1" in sessions + assert "session2" in sessions diff --git a/deployment/k8s/charts/aggregates/migrationAssistantWithArgo/templates/resources/migrationConsole.yaml b/deployment/k8s/charts/aggregates/migrationAssistantWithArgo/templates/resources/migrationConsole.yaml index a20765154c..11b555ed5b 100644 --- a/deployment/k8s/charts/aggregates/migrationAssistantWithArgo/templates/resources/migrationConsole.yaml +++ b/deployment/k8s/charts/aggregates/migrationAssistantWithArgo/templates/resources/migrationConsole.yaml @@ -14,7 +14,7 @@ rules: resources: ["secrets"] verbs: ["list"] - apiGroups: [ "" ] - resources: ["configmaps", "persistentvolumeclaims", "pods", "pods/log"] + resources: ["configmaps", "persistentvolumeclaims", "pods", "pods/log", "services"] verbs: ["get", "watch", "list", "create", "update", "patch", "delete", "deletecollection"] - apiGroups: [""] resources: ["pods/attach"] diff --git a/libraries/testAutomation/testAutomation/test_runner.py b/libraries/testAutomation/testAutomation/test_runner.py index 38b2bfccea..298ac83cf3 100644 --- a/libraries/testAutomation/testAutomation/test_runner.py +++ b/libraries/testAutomation/testAutomation/test_runner.py @@ -16,8 +16,8 @@ logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) -VALID_SOURCE_VERSIONS = ["ES_1.5", "ES_2.4", "ES_5.6", "ES_6.8", "ES_7.10"] -VALID_TARGET_VERSIONS = ["OS_1.3", "OS_2.19", "OS_3.1"] +VALID_SOURCE_VERSIONS = ["ES_1.5", "ES_2.4", "ES_5.6", "ES_7.10"] +VALID_TARGET_VERSIONS = ["OS_1.3", "OS_2.19"] MA_RELEASE_NAME = "ma" @@ -202,16 +202,15 @@ def copy_logs(self, destination: str = "./logs") -> None: def run(self, skip_delete: bool = False, keep_workflows: bool = False, developer_mode: bool = False, reuse_clusters: bool = False, test_reports_dir: str = None) -> None: - self.k8s_service.create_namespace(self.k8s_service.namespace) - if developer_mode: - workflow_templates_dir = ( - "../../TrafficCapture/dockerSolution/src/main/docker/migrationConsole/" - "workflows/templates/" - ) - self.k8s_service.run_command([ - "kubectl", "apply", "-f", workflow_templates_dir, "-n", "ma" - ]) - logger.info("Applied workflow templates directory") + # if developer_mode: + # workflow_templates_dir = ( + # "../../TrafficCapture/dockerSolution/src/main/docker/migrationConsole/" + # "workflows/templates/" + # ) + # self.k8s_service.run_command([ + # "kubectl", "apply", "-f", workflow_templates_dir, "-n", "ma" + # ]) + # logger.info("Applied workflow templates directory") for source_version, target_version in self.combinations: try: