|
| 1 | +import argparse |
| 2 | +import tempfile |
| 3 | + |
| 4 | +from ark_sdk_python.auth import ArkISPAuth |
| 5 | +from ark_sdk_python.common.ark_jwt_utils import ArkJWTUtils |
| 6 | +from ark_sdk_python.common.connections.ssh.ark_pty_ssh_connection import ArkPTYSSHConnection |
| 7 | +from ark_sdk_python.models.auth import ArkAuthMethod, ArkAuthProfile, ArkSecret |
| 8 | +from ark_sdk_python.models.auth.ark_auth_method import IdentityServiceUserArkAuthMethodSettings |
| 9 | +from ark_sdk_python.models.common.connections.ark_connection_command import ArkConnectionCommand |
| 10 | +from ark_sdk_python.models.common.connections.ark_connection_credentials import ArkConnectionCredentials |
| 11 | +from ark_sdk_python.models.common.connections.ark_connection_details import ArkConnectionDetails |
| 12 | +from ark_sdk_python.models.services.sia.sso.ark_sia_sso_get_ssh_key import ArkSIASSOGetSSHKey |
| 13 | +from ark_sdk_python.services.sia.sso import ArkSIASSOService |
| 14 | + |
| 15 | + |
| 16 | +def login_to_identity_security_platform(service_user: str, service_token: str, application_name: str) -> ArkISPAuth: |
| 17 | + """ |
| 18 | + This will perform login to the tenant with the given service user credentials |
| 19 | + Caching the logged in token is disabled and will perform a full login on each call |
| 20 | + Will return an identity security platform authenticated class |
| 21 | +
|
| 22 | + Args: |
| 23 | + service_user (str): _description_ |
| 24 | + service_token (str): _description_ |
| 25 | + application_name (str): _description_ |
| 26 | +
|
| 27 | + Returns: |
| 28 | + ArkISPAuth: _description_ |
| 29 | + """ |
| 30 | + print('Logging in to the tenant') |
| 31 | + isp_auth = ArkISPAuth(cache_authentication=False) |
| 32 | + isp_auth.authenticate( |
| 33 | + auth_profile=ArkAuthProfile( |
| 34 | + username=service_user, |
| 35 | + auth_method=ArkAuthMethod.IdentityServiceUser, |
| 36 | + auth_method_settings=IdentityServiceUserArkAuthMethodSettings(identity_authorization_application=application_name), |
| 37 | + ), |
| 38 | + secret=ArkSecret(secret=service_token), |
| 39 | + ) |
| 40 | + print('Logged in successfully') |
| 41 | + return isp_auth |
| 42 | + |
| 43 | + |
| 44 | +def construct_ssh_proxy_address(isp_auth: ArkISPAuth) -> str: |
| 45 | + """ |
| 46 | + For a given authentication, construct the SSH proxy address |
| 47 | + This'll grab from the ID token of the logged in user the name of the tenant (subdomain) |
| 48 | + And the platform domain (cyberark.cloud) |
| 49 | + And will concatenante everything to construct the SSH proxy address |
| 50 | + will return the constructed ssh proxy address |
| 51 | +
|
| 52 | + Args: |
| 53 | + isp_auth (ArkISPAuth): _description_ |
| 54 | +
|
| 55 | + Returns: |
| 56 | + str: _description_ |
| 57 | + """ |
| 58 | + token = isp_auth.token.token.get_secret_value() |
| 59 | + return f'{ArkJWTUtils.get_subdomain_from_token(token)}.ssh.{ArkJWTUtils.get_platform_domain_from_token(token)}' |
| 60 | + |
| 61 | + |
| 62 | +def generate_mfa_caching_ssh_key(isp_auth: ArkISPAuth, folder: str) -> str: |
| 63 | + """ |
| 64 | + This function will use the logged in user |
| 65 | + And generate an SSH key into a given folder |
| 66 | + Will return the path to the SSH key |
| 67 | +
|
| 68 | + Args: |
| 69 | + isp_auth (ArkISPAuth): _description_ |
| 70 | + temp_folder (str): _description_ |
| 71 | +
|
| 72 | + Returns: |
| 73 | + str: _description_ |
| 74 | + """ |
| 75 | + print('Generating SSH MFA Caching Key') |
| 76 | + sso_service = ArkSIASSOService(isp_auth=isp_auth) |
| 77 | + path = sso_service.short_lived_ssh_key(get_ssh_key=ArkSIASSOGetSSHKey(folder=folder)) |
| 78 | + print(f'MFA Caching SSH Key generated to [{path}]') |
| 79 | + return path |
| 80 | + |
| 81 | + |
| 82 | +def connect_and_validate_connection(ssh_key_path: str, proxy_address: str, connection_string: str, command: str) -> None: |
| 83 | + """ |
| 84 | + This function will perform a connection via SSH and via SIA proxy to the target |
| 85 | + For the given proxy address and connection string, a connection will be made with the ssh key |
| 86 | + Once connected, the given command will be performed, expecting a return code of 0 as success |
| 87 | +
|
| 88 | + Args: |
| 89 | + ssh_key_path (str): _description_ |
| 90 | + proxy_address (str): _description_ |
| 91 | + connection_string (str): _description_ |
| 92 | + command (str): _description_ |
| 93 | + """ |
| 94 | + print(f'Connecting to proxy [{proxy_address}] and connection string [{connection_string}]') |
| 95 | + ssh_connection = ArkPTYSSHConnection() |
| 96 | + ssh_connection.connect( |
| 97 | + ArkConnectionDetails( |
| 98 | + address=proxy_address, |
| 99 | + credentials=ArkConnectionCredentials( |
| 100 | + user=connection_string, |
| 101 | + private_key_filepath=ssh_key_path, |
| 102 | + ), |
| 103 | + ), |
| 104 | + ) |
| 105 | + print(f'Running test command [{command}]') |
| 106 | + ssh_connection.run_command( |
| 107 | + ArkConnectionCommand(command=command, expected_rc=0), |
| 108 | + ) |
| 109 | + print('Finished Successfully!') |
| 110 | + |
| 111 | + |
| 112 | +if __name__ == '__main__': |
| 113 | + # Construct an argument parser for CLI parameters for the script |
| 114 | + parser = argparse.ArgumentParser() |
| 115 | + parser.add_argument('--service-user', required=True, help='Service user to login and perform the operation with') |
| 116 | + parser.add_argument('--service-token', required=True, help='Service user token to use for logging in and connecting') |
| 117 | + parser.add_argument( |
| 118 | + '--service-application', |
| 119 | + default='__idaptive_cybr_user_oidc', |
| 120 | + help='Service application to login to, defaults to cyberark ISP application', |
| 121 | + ) |
| 122 | + parser.add_argument( |
| 123 | + '--connection-string', |
| 124 | + required=True, |
| 125 | + help='SSH connection string to use for the connection, without the proxy address.' |
| 126 | + 'A connection string may look like the following' |
| 127 | + '<service-user>#<tenant_subdomain>@<target_user>@<target_address>#<optional_network_name>', |
| 128 | + ) |
| 129 | + parser.add_argument('--test-command', default='ls -l', help='Test command to use once connected to the target, defaults to ls -l') |
| 130 | + |
| 131 | + # Parse the CLI parameters |
| 132 | + args = parser.parse_args() |
| 133 | + |
| 134 | + # Construct a temporary folder to be used for the MFA Caching SSH key generation |
| 135 | + # At the end of the execution, the temporary folder will be deleted alongside the SSH key |
| 136 | + with tempfile.TemporaryDirectory() as temp_folder: |
| 137 | + # Perform the flow: |
| 138 | + # - Login to the tenant |
| 139 | + # - Generate an MFA Caching SSH key |
| 140 | + # - Construct the ssh proxy address |
| 141 | + # - Connect in SSH to the proxy / target with the MFA Caching SSH Key and perform the command |
| 142 | + isp_auth = login_to_identity_security_platform(args.service_user, args.service_token, args.service_application) |
| 143 | + ssh_key_path = generate_mfa_caching_ssh_key(isp_auth, temp_folder) |
| 144 | + proxy_address = construct_ssh_proxy_address(isp_auth) |
| 145 | + connect_and_validate_connection(ssh_key_path, proxy_address, args.connection_string, args.test_command) |
0 commit comments