Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

from .models.utils import ExitCode
from .commands.configure import configure_group
from .commands.submit import submit_command
from .commands.stop import stop_command
from .commands.approve import approve_command
from .commands.status import status_command

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,27 +52,14 @@ def util_group(ctx):
@click.argument('shell', type=click.Choice(['bash', 'zsh', 'fish']))
@click.pass_context
def completion(ctx, shell):
"""Generate shell completion script and instructions for setup.
"""Generate shell completion script for bash, zsh, or fish.

Supported shells: bash, zsh, fish
Example setup:
Bash: workflow completion bash > /etc/bash_completion.d/workflow
Zsh: workflow completion zsh > "${fpath[1]}/_workflow"
Fish: workflow completion fish > ~/.config/fish/completions/workflow.fish

To enable completion:

Bash:
workflow completion bash > /etc/bash_completion.d/workflow
# Then restart your shell

Zsh:
# If shell completion is not already enabled in your environment,
# you will need to enable it. You can execute the following once:
echo "autoload -U compinit; compinit" >> ~/.zshrc

workflow completion zsh > "${fpath[1]}/_workflow"
# Then restart your shell

Fish:
workflow completion fish > ~/.config/fish/completions/workflow.fish
# Then restart your shell
Restart your shell after installation.
"""
completion_class = get_completion_class(shell)
if completion_class is None:
Expand All @@ -88,6 +79,10 @@ def completion(ctx, shell):

# Add command groups
workflow_cli.add_command(configure_group)
workflow_cli.add_command(submit_command)
workflow_cli.add_command(stop_command)
workflow_cli.add_command(approve_command)
workflow_cli.add_command(status_command)
workflow_cli.add_command(util_group)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Approve command for workflow CLI - resumes suspended workflows in Argo Workflows."""

import logging
import os
import click

from ..models.utils import ExitCode
from ..services.workflow_service import WorkflowService

logger = logging.getLogger(__name__)


@click.command(name="approve")
@click.argument('workflow_name', required=False)
@click.option(
'--argo-server',
default=lambda: os.environ.get(
'ARGO_SERVER',
f"http://{os.environ.get('ARGO_SERVER_SERVICE_HOST', 'localhost')}:"
f"{os.environ.get('ARGO_SERVER_SERVICE_PORT', '2746')}"
),
help='Argo Server URL (default: auto-detected from Kubernetes service env vars, or ARGO_SERVER env var)'
)
@click.option(
'--namespace',
default='ma',
help='Kubernetes namespace for the workflow (default: ma)'
)
@click.option(
'--insecure',
is_flag=True,
default=False,
help='Skip TLS certificate verification'
)
@click.option(
'--token',
help='Bearer token for authentication'
)
@click.pass_context
def approve_command(ctx, workflow_name, argo_server, namespace, insecure, token):
"""Approve/resume a suspended workflow in Argo Workflows.

If workflow_name is not provided, auto-detects the single workflow
in the specified namespace.

Example:
workflow approve
workflow approve my-workflow
workflow approve --argo-server https://10.105.13.185:2746 --insecure
"""

try:
service = WorkflowService()

# Auto-detect workflow if name not provided
if not workflow_name:
list_result = service.list_workflows(
namespace=namespace,
argo_server=argo_server,
token=token,
insecure=insecure,
phase_filter='Running'
)

if not list_result['success']:
click.echo(f"Error listing workflows: {list_result['error']}", err=True)
ctx.exit(ExitCode.FAILURE.value)

if list_result['count'] == 0:
click.echo(f"Error: No workflows found in namespace {namespace}", err=True)
ctx.exit(ExitCode.FAILURE.value)
elif list_result['count'] > 1:
workflows_list = ', '.join(list_result['workflows'])
click.echo(
f"Error: Multiple workflows found. Please specify which one to approve.\n"
f"Found workflows: {workflows_list}",
err=True
)
ctx.exit(ExitCode.FAILURE.value)

workflow_name = list_result['workflows'][0]
click.echo(f"Auto-detected workflow: {workflow_name}")

# Resume the workflow
result = service.approve_workflow(
workflow_name=workflow_name,
namespace=namespace,
argo_server=argo_server,
token=token,
insecure=insecure
)

if result['success']:
click.echo(f"Workflow {workflow_name} resumed successfully")
else:
click.echo(f"Error: {result['message']}", err=True)
ctx.exit(ExitCode.FAILURE.value)

except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
ctx.exit(ExitCode.FAILURE.value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
"""Status command for workflow CLI - shows detailed status of workflows."""

import logging
import os
import click

from ..models.utils import ExitCode
from ..services.workflow_service import WorkflowService

logger = logging.getLogger(__name__)


@click.command(name="status")
@click.argument('workflow_name', required=False)
@click.option(
'--argo-server',
default=f"http://{os.environ.get('ARGO_SERVER_SERVICE_HOST', 'localhost')}"
f":{os.environ.get('ARGO_SERVER_SERVICE_PORT', '2746')}",
help='Argo Server URL (default: auto-detected from Kubernetes service env vars, or ARGO_SERVER env var)'
)
@click.option(
'--namespace',
default='ma',
help='Kubernetes namespace for the workflow (default: ma)'
)
@click.option(
'--insecure',
is_flag=True,
default=False,
help='Skip TLS certificate verification'
)
@click.option(
'--token',
help='Bearer token for authentication'
)
@click.option(
'--all',
'show_all',
is_flag=True,
default=False,
help='Show all workflows including completed ones (default: only running)'
)
@click.pass_context
def status_command(ctx, workflow_name, argo_server, namespace, insecure, token, show_all):

Check failure on line 44 in TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/console_link/console_link/workflow/commands/status.py

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

python:S3776

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.
"""Show detailed status of workflows.

Displays workflow progress, completed steps, and approval status.
By default, only shows running workflows. Use --all to see completed workflows too.

Example:
workflow status
workflow status my-workflow
workflow status --all
"""

try:
service = WorkflowService()

if workflow_name:
# Show detailed status for specific workflow
result = service.get_workflow_status(
workflow_name=workflow_name,
namespace=namespace,
argo_server=argo_server,
token=token,
insecure=insecure
)

if not result['success']:
click.echo(f"Error: {result['error']}", err=True)
ctx.exit(ExitCode.FAILURE.value)

_display_workflow_status(result)
else:
# List all workflows with status
list_result = service.list_workflows(
namespace=namespace,
argo_server=argo_server,
token=token,
insecure=insecure,
exclude_completed=not show_all
)

if not list_result['success']:
click.echo(f"Error: {list_result['error']}", err=True)
ctx.exit(ExitCode.FAILURE.value)

if list_result['count'] == 0:
if show_all:
click.echo(f"No workflows found in namespace {namespace}")
else:
click.echo(f"No running workflows found in namespace {namespace}")
click.echo("Use --all to see completed workflows")
return

click.echo(f"Found {list_result['count']} workflow(s) in namespace {namespace}:")
click.echo("")

# Sort workflows alphabetically by name
sorted_workflows = sorted(list_result['workflows'])

for wf_name in sorted_workflows:
result = service.get_workflow_status(
workflow_name=wf_name,
namespace=namespace,
argo_server=argo_server,
token=token,
insecure=insecure
)

if result['success']:
_display_workflow_status(result)

except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
ctx.exit(ExitCode.FAILURE.value)


def _get_phase_symbol(phase: str) -> str:
"""Get symbol for workflow phase.

Args:
phase: Workflow phase

Returns:
Symbol character for the phase
"""
phase_symbols = {
'Running': '*',
'Succeeded': '+',
'Failed': '-',
'Error': '-',
'Pending': '>',
'Stopped': 'X',
}
return phase_symbols.get(phase, '?')


def _get_step_symbol(step_phase: str, step_type: str) -> str:
"""Get symbol for workflow step.

Args:
step_phase: Step phase
step_type: Step type

Returns:
Symbol string for the step
"""
if step_phase == 'Succeeded':
return ' +'
elif step_phase == 'Running':
return ' |' if step_type == 'Suspend' else ' *'
elif step_phase in ('Failed', 'Error'):
return ' -'
elif step_phase == 'Pending':
return ' >'
else:
return ' ?'


def _display_workflow_header(name: str, phase: str, progress: str, started_at: str, finished_at: str):
"""Display workflow header information.

Args:
name: Workflow name
phase: Workflow phase
progress: Workflow progress
started_at: Start timestamp
finished_at: Finish timestamp
"""
phase_symbol = _get_phase_symbol(phase)
click.echo(f"[{phase_symbol}] Workflow: {name}")
click.echo(f" Phase: {phase}")
click.echo(f" Progress: {progress}")
if started_at:
click.echo(f" Started: {started_at}")
if finished_at:
click.echo(f" Finished: {finished_at}")


def _display_workflow_steps(steps: list):
"""Display workflow steps.

Args:
steps: List of step dictionaries
"""
if not steps:
return

click.echo("\n Steps:")
for step in steps:
step_name = step['name']
step_phase = step['phase']
step_type = step['type']

symbol = _get_step_symbol(step_phase, step_type)

if step_type == 'Suspend' and step_phase == 'Running':
click.echo(f"{symbol} {step_name} - WAITING FOR APPROVAL")
else:
click.echo(f"{symbol} {step_name} ({step_phase})")


def _display_workflow_status(result: dict):
"""Display formatted workflow status.

Args:
result: WorkflowStatusResult dict from get_workflow_status
"""
name = result['workflow_name']
phase = result['phase']
progress = result['progress']
started_at = result['started_at']
finished_at = result['finished_at']

_display_workflow_header(name, phase, progress, started_at, finished_at)
_display_workflow_steps(result['steps'])
Loading
Loading