diff --git a/.gitignore b/.gitignore index 078b804..50ae1bd 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,6 @@ __pycache__ ### Idea .idea + +### Kiro +.kiro \ No newline at end of file diff --git a/key-import-export/ecdh-tr31-import-export/README.md b/key-import-export/ecdh-tr31-import-export/README.md new file mode 100644 index 0000000..7fb228e --- /dev/null +++ b/key-import-export/ecdh-tr31-import-export/README.md @@ -0,0 +1,150 @@ +# AWS Payment Cryptography ECDH-TR31 Key Exchange + +This application demonstrates the integration between AWS Payment Cryptography and a local CA +for secure key management and cryptographic operations using ECDH key exchange with TR31 format. + +## Features + +- Create and manage a local Certificate Authority (CA) +- Generate ECDH key pairs locally and in AWS Payment Cryptography +- Perform secure key exchange using ECDH and TR31 format +- Support for both key import and export operations +- Support for multiple key types: + - AES-256 (32 bytes / 256 bits) + - Triple DES 2-Key (16 bytes / 128 bits) + - Triple DES 3-Key (24 bytes / 192 bits) +- Interactive CLI for operation selection + +## Prerequisites + +- Python 3.8+ +- AWS account with Payment Cryptography service enabled +- AWS credentials configured locally + +## Installation + +1. Create and activate a virtual environment: + +```bash +python -m venv .venv +source .venv/bin/activate # On macOS/Linux +.venv\Scripts\activate # On Windows +``` + +2. Install dependencies: + +```bash +pip install -r requirements.txt +``` + +## Usage + +### Interactive Mode + +Run the application without arguments for interactive mode: + +```bash +python main.py +``` + +#### Interactive Operation Selection + +The application will present an interactive menu to select between: +- `Import a key from a raw plaintext key into AWS Payment Cryptography`: Import a key into AWS Payment Cryptography +- `Export a key from AWS Payment Cryptography to plaintext`: Export an existing key from AWS Payment Cryptography + +Use the up/down arrow keys to navigate and Enter to select an option. + +#### Key Type Selection + +You'll be prompted to select the key type: +- **AES-256 (32 bytes / 256 bits)**: Advanced Encryption Standard with 256-bit key length +- **Triple DES 2-Key (16 bytes / 128 bits)**: Triple DES with two distinct key components +- **Triple DES 3-Key (24 bytes / 192 bits)**: Triple DES with three distinct key components + +#### Import Key Options + +If you select the import operation, you'll be presented with another menu to choose how to provide the key: + +1. **Generate a random key**: The application will automatically generate a secure random key of the selected type +2. **Enter a custom hexadecimal key**: You can manually enter a key in hexadecimal format + +For custom key entry, the application will validate that: +- The key has the correct length for the selected key type: + - AES-256: 64 hexadecimal characters (32 bytes) + - TDES_2KEY: 32 hexadecimal characters (16 bytes) + - TDES_3KEY: 48 hexadecimal characters (24 bytes) +- All characters are valid hexadecimal digits (0-9, A-F, a-f) + +### Command-Line Arguments + +You can also run the application with command-line arguments to bypass the interactive prompts: + +```bash +# Export operation with AES-256 key +python main.py --operation export --key-type AES_256 + +# Export operation with TDES_2KEY +python main.py --operation export --key-type TDES_2KEY + +# Import operation with random AES-256 key +python main.py --operation import --import-method random --key-type AES_256 + +# Import operation with custom AES-256 key +python main.py --operation import --import-method custom --key-type AES_256 --key 000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F + +# Import operation with custom TDES_2KEY +python main.py --operation import --import-method custom --key-type TDES_2KEY --key 0123456789ABCDEF0123456789ABCDEF + +# Import operation with custom TDES_3KEY +python main.py --operation import --import-method custom --key-type TDES_3KEY --key 0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF +``` + +#### Available Arguments + +| Argument | Description | Values | +|----------|-------------|--------| +| `--operation` | Operation mode | `import`, `export` | +| `--key-type` | Type of key | `AES_256`, `TDES_2KEY`, `TDES_3KEY` | +| `--import-method` | Key import method | `random`, `custom` | +| `--key` | Custom key in hex format | Hex characters (length depends on key type) | + +You can mix command-line arguments with interactive prompts. For example, specifying just the operation will still prompt for the import method and key if needed. + +## Application Flow + +### Input Collection Phase +- The application first collects all necessary inputs: + - Operation mode (import or export) + - For import operations: import method (random or custom key) + - For custom key imports: the key value in hexadecimal format + +### Execution Phase + +#### Common Steps (1-7) +1. Create Local CA and import public CA key +2. Locally create an ECDH KeyPair +3. Sign the local KeyPair with Local CA +4. Create an ECDH KeyPair in AWS Payment Cryptography +5. Get the server's public key certificate +6. Generate shared information for key derivation +7. Derive shared secret for key wrapping + +#### Export Operation (Steps 8-10) +8. Create a data encryption key of the selected type (AES_256, TDES_2KEY, or TDES_3KEY) in AWS Payment Cryptography +9. Export the key wrapped under ECDH key using TR31 +10. Unwrap the exported key locally + +#### Import Operation (Steps 8-10) +8. Get key to import (random generation or use provided custom key) of the selected type +9. Wrap key using TR31 format with the appropriate algorithm identifier (A for AES, T for TDES) +10. Import the wrapped key into AWS Payment Cryptography + +#### Verification (Step 11) +11. Calculate and verify Key Check Value (KCV) + +## Directory Structure + +- `main.py`: Main application script +- `crypto_utils.py`: Cryptographic utility functions +- `tr31.py`: TR31 key block operations diff --git a/key-import-export/ecdh-tr31-import-export/crypto_utils.py b/key-import-export/ecdh-tr31-import-export/crypto_utils.py new file mode 100644 index 0000000..34088c2 --- /dev/null +++ b/key-import-export/ecdh-tr31-import-export/crypto_utils.py @@ -0,0 +1,549 @@ +import boto3 +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography import x509 +from cryptography.x509.oid import NameOID +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash +import base64 +import binascii +import secrets +import datetime +import os +import uuid + +# Python 3.8 compatibility: datetime.UTC was introduced in Python 3.11 +try: + # Python 3.11+ + UTC = datetime.UTC +except AttributeError: + # Python 3.8-3.10 + UTC = datetime.timezone.utc + +controlplane_client = boto3.client("payment-cryptography") + +# Local storage for CA information +CA_STORAGE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ca_storage") +os.makedirs(CA_STORAGE_DIR, exist_ok=True) + + +class CryptoUtils: + + @staticmethod + def generate_certificate_signing_request(private_key): + """ + Generate a Certificate Signing Request (CSR) with standard attributes + + Args: + private_key (EC private key): The private key to sign the CSR with + + Returns: + x509.CertificateSigningRequest: The generated CSR + """ + # Generate a CSR + csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([ + # Provide various details about who we are. + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "My Company"), + x509.NameAttribute(NameOID.COMMON_NAME, "mysite.com"), + ])).sign(private_key, hashes.SHA256()) + + return csr + + @staticmethod + def generate_ecdh_key_pair(): + """ + Generate an ECDH key pair using the NIST P-521 curve + + Returns: + tuple: (private_key, public_key) - The generated EC key pair + """ + private_key = ec.generate_private_key(curve=ec.SECP521R1()) + public_key = private_key.public_key() + return private_key, public_key + + @staticmethod + def generate_shared_info(): + """ + Generate random shared information for key derivation + + Returns: + bytes: 32 bytes of cryptographically secure random data + """ + return secrets.token_bytes(32) + + @staticmethod + def generate_ecc_symmetric_key_client(certificate, krd_private_key, info): + """ + Generate a symmetric key using ECDH key agreement protocol + + Args: + certificate (str): Base64-encoded certificate containing the public key + krd_private_key (EC private key): The private key for ECDH + info (bytes): Additional information for key derivation + + Returns: + bytes: The derived symmetric key + """ + pem = base64.b64decode(certificate) + certificate = x509.load_pem_x509_certificate(pem) + shared_key = krd_private_key.exchange( + ec.ECDH(), certificate.public_key()) + # Perform key derivation. + derived_key = ConcatKDFHash( # ConcatKDFHash also known as NIST SP 800-56Ar3 + algorithm=hashes.SHA512(), + length=32, # 16 is AES-128, 32 is AES-256 + otherinfo=info, + ).derive(shared_key) + + return derived_key + + @staticmethod + def sign_with_private_ca(ca_id, csr_pem, validity, template=None): + """ + Signs the client-side ECDH Key with local CA and returns the Certificate and Certificate Chain + + Args: + ca_id (str): ID of the Certificate Authority + csr_pem (str): Certificate Signing Request in PEM format + validity (dict): Validity period for the certificate + template (str): Not used in local implementation + + Returns: + tuple: (Certificate, Certificate Chain) + """ + # Load the CA certificate and private key + ca_cert_path = os.path.join(CA_STORAGE_DIR, f"{ca_id}_cert.pem") + ca_key_path = os.path.join(CA_STORAGE_DIR, f"{ca_id}_key.pem") + + with open(ca_cert_path, "rb") as f: + ca_cert_data = f.read() + ca_cert = x509.load_pem_x509_certificate(ca_cert_data) + + with open(ca_key_path, "rb") as f: + ca_key_data = f.read() + ca_key = serialization.load_pem_private_key(ca_key_data, password=None) + + # Parse the CSR + if isinstance(csr_pem, str): + csr_pem = csr_pem.encode('utf-8') + csr = x509.load_pem_x509_csr(csr_pem) + + # Calculate validity period + now = datetime.datetime.now(UTC) + if validity['Type'] == 'DAYS': + valid_until = now + datetime.timedelta(days=validity['Value']) + elif validity['Type'] == 'YEARS': + valid_until = now + datetime.timedelta(days=365 * validity['Value']) + else: + raise ValueError(f"Unsupported validity type: {validity['Type']}") + + # Generate Subject Key Identifier from the public key + ski = x509.SubjectKeyIdentifier.from_public_key(csr.public_key()) + + # Create Authority Key Identifier directly from the CA's public key + # This avoids the need to extract it from the CA certificate + aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key()) + + # Create a certificate - aligning with payshield_hsm.py implementation + builder = x509.CertificateBuilder().subject_name( + csr.subject + ).issuer_name( + ca_cert.subject + ).public_key( + csr.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + now + ).not_valid_after( + valid_until + ).add_extension( + x509.BasicConstraints(ca=False, path_length=None), critical=True + ).add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=True, # Changed to match payshield_hsm + key_encipherment=True, + data_encipherment=True, # Changed to match payshield_hsm + key_agreement=True, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False + ), critical=True + ).add_extension( + x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH, + x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, + x509.oid.ObjectIdentifier("1.3.6.1.4.1.311.20.2.2") # Smart Card Logon + ]), critical=False + ).add_extension( + ski, critical=False + ).add_extension( + aki, critical=False + ) + + # Sign the certificate with the CA key - using SHA256 to match payshield_hsm + certificate = builder.sign( + private_key=ca_key, + algorithm=hashes.SHA256(), + ) + + # Serialize the certificate to PEM format + cert_pem = certificate.public_bytes(serialization.Encoding.PEM).decode('utf-8') + + # Return the certificate and chain (which is just the CA certificate in this case) + ca_cert_pem = ca_cert.public_bytes(serialization.Encoding.PEM).decode('utf-8') + + return cert_pem, ca_cert_pem + +def import_ca_key_to_apc(certificate_authority): + """ + Import a Certificate Authority public key into AWS Payment Cryptography + + Args: + certificate_authority (str): PEM-encoded certificate of the CA + + Returns: + str: ARN of the imported key + """ + print("Importing CA Key") + key_arn = controlplane_client.import_key( + Enabled=True, + KeyMaterial={ + 'RootCertificatePublicKey': { + 'KeyAttributes': { + 'KeyAlgorithm': 'ECC_NIST_P521', + 'KeyClass': 'PUBLIC_KEY', + 'KeyModesOfUse': { + 'Verify': True, + }, + 'KeyUsage': 'TR31_S0_ASYMMETRIC_KEY_FOR_DIGITAL_SIGNATURE', + }, + 'PublicKeyCertificate': base64.b64encode(certificate_authority.encode('UTF-8')).decode('UTF-8') + } + }, + KeyCheckValueAlgorithm='ANSI_X9_24' + )['Key']['KeyArn'] + + return key_arn + + +def apc_generate_pgk(): + """ + Create a PIN Generation Key (PGK) in AWS Payment Cryptography + + Returns: + str: ARN of the created PGK key + """ + print("Creating new PGK") + key_arn = controlplane_client.create_key( + Exportable=True, + KeyAttributes={ + "KeyAlgorithm": "TDES_2KEY", + "KeyUsage": "TR31_V2_VISA_PIN_VERIFICATION_KEY", + "KeyClass": "SYMMETRIC_KEY", + "KeyModesOfUse": {"Generate": True, "Verify": True} + } + )['Key']['KeyArn'] + return key_arn + + +def apc_generate_pek(): + """ + Create a PIN Encryption Key (PEK) in AWS Payment Cryptography + + Returns: + str: ARN of the created PEK key + """ + print("Creating new PEK") + key_arn = controlplane_client.create_key( + Exportable=True, + KeyAttributes={ + "KeyAlgorithm": "TDES_3KEY", + "KeyUsage": "TR31_P0_PIN_ENCRYPTION_KEY", + "KeyClass": "SYMMETRIC_KEY", + "KeyModesOfUse": { + "Encrypt": True, + "Decrypt": True, + "Wrap": True, + "Unwrap": True + } + } + )['Key']['KeyArn'] + return key_arn + + +def create_local_ca(): + """ + Create a local Certificate Authority + + Returns: + str: ID of the created CA + """ + print("Creating local CA") + + # Generate a key pair for the CA + private_key = ec.generate_private_key(curve=ec.SECP521R1()) + + # Create a self-signed certificate for the CA + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), + x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Local CA"), + x509.NameAttribute(NameOID.COMMON_NAME, "pindemo-ca"), + ]) + + # Create the CA certificate + now = datetime.datetime.now(UTC) + + # Generate Subject Key Identifier from the public key + ski = x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()) + + # For a root CA, the Authority Key Identifier is the same as the Subject Key Identifier + aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(private_key.public_key()) + + ca_cert = x509.CertificateBuilder().subject_name( + subject + ).issuer_name( + issuer + ).public_key( + private_key.public_key() + ).serial_number( + x509.random_serial_number() + ).not_valid_before( + now + ).not_valid_after( + now + datetime.timedelta(days=3650) # 10 years + ).add_extension( + x509.BasicConstraints(ca=True, path_length=None), critical=True + ).add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=True, + key_encipherment=True, + data_encipherment=True, + key_agreement=True, + key_cert_sign=True, + crl_sign=True, + encipher_only=False, + decipher_only=False + ), critical=True + ).add_extension( + ski, critical=False # Making Subject Key Identifier non-critical to match payshield_hsm + ).sign(private_key, hashes.SHA256()) + + # Generate a unique ID for the CA + ca_id = str(uuid.uuid4()) + + # Save the CA certificate and private key + ca_cert_path = os.path.join(CA_STORAGE_DIR, f"{ca_id}_cert.pem") + ca_key_path = os.path.join(CA_STORAGE_DIR, f"{ca_id}_key.pem") + + with open(ca_cert_path, "wb") as f: + f.write(ca_cert.public_bytes(serialization.Encoding.PEM)) + + with open(ca_key_path, "wb") as f: + f.write(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + )) + + print(f"Local CA created with ID: {ca_id}") + return ca_id + + +def find_or_create_local_ca(): + """ + Find an existing local CA or create a new one + + Returns: + str: ID of the CA + """ + # Check if any CA exists in the storage directory + for filename in os.listdir(CA_STORAGE_DIR): + if filename.endswith("_cert.pem"): + ca_id = filename.split("_")[0] + print(f"Found existing CA: {ca_id}") + return ca_id + + # If no CA exists, create one + return create_local_ca() + + +def get_ca_certificate(ca_id): + """ + Get the certificate of a CA + + Args: + ca_id (str): ID of the CA + + Returns: + str: PEM-encoded certificate + """ + ca_cert_path = os.path.join(CA_STORAGE_DIR, f"{ca_id}_cert.pem") + with open(ca_cert_path, "r") as f: + return f.read() + + +def setup(): + """ + Set up the CA and import it into AWS Payment Cryptography + + Returns: + tuple: (CA ID, CA Key ARN) + """ + ca_id = find_or_create_local_ca() + ca_certificate = get_ca_certificate(ca_id) + ca_key_arn = import_ca_key_to_apc(ca_certificate) + return ca_id, ca_key_arn + + +def create_data_encryption_key(key_type='AES_256'): + """ + Create a data encryption key in AWS Payment Cryptography + + Args: + key_type (str): Type of key to create ('AES_256', 'TDES_2KEY', or 'TDES_3KEY') + + Returns: + str: ARN of the created key + """ + print(f"Creating new {key_type} Data Encryption Key") + key_arn = controlplane_client.create_key( + Exportable=True, + KeyAttributes={ + "KeyAlgorithm": key_type, + "KeyUsage": "TR31_D0_SYMMETRIC_DATA_ENCRYPTION_KEY", + "KeyClass": "SYMMETRIC_KEY", + "KeyModesOfUse": { + "Encrypt": True, + "Decrypt": True, + "Wrap": True, + "Unwrap": True + } + } + )['Key']['KeyArn'] + return key_arn + +def create_aes_256_data_encryption_key(): + """ + Create an AES-256 data encryption key in AWS Payment Cryptography + + This is a legacy wrapper function that calls create_data_encryption_key with 'AES_256' + + Returns: + str: ARN of the created AES-256 data encryption key + """ + return create_data_encryption_key('AES_256') + + +def create_ecdh_key_pair_in_payment_crypto(): + """ + Create an ECDH key pair in AWS Payment Cryptography service + + Creates an ECC NIST P-521 asymmetric key pair that can be used for key agreement + protocols. The key is configured for deriving TR31_K1_KEY_BLOCK_PROTECTION_KEY. + + Returns: + str: ARN of the created ECDH key pair + """ + print("Creating new ECDH Key Pair in AWS Payment Cryptography") + key_arn = controlplane_client.create_key( + Enabled=True, + Exportable=True, + KeyAttributes={ + 'KeyAlgorithm': 'ECC_NIST_P521', + 'KeyClass': 'ASYMMETRIC_KEY_PAIR', + 'KeyModesOfUse': { + 'DeriveKey': True + }, + 'KeyUsage': 'TR31_K3_ASYMMETRIC_KEY_FOR_KEY_AGREEMENT' + }, + DeriveKeyUsage= 'TR31_K1_KEY_BLOCK_PROTECTION_KEY' + )['Key']['KeyArn'] + return key_arn + + +def export_aes_key_under_tr31(aes_key_arn, client_cert_key_arn, server_ecdh_key_arn, shared_info, public_key_certificate, key_type='AES_256'): + """Export a key wrapped under ECDH key using TR31 format + + Args: + aes_key_arn (str): ARN of the key to export + client_cert_key_arn (str): ARN of the client certificate key + server_ecdh_key_arn (str): ARN of the server ECDH key + shared_info (bytes): Shared information for key derivation + public_key_certificate (str): Public key certificate + key_type (str): Type of key to export ('AES_256', 'TDES_2KEY', or 'TDES_3KEY') + + Returns: + str: TR31 key block + """ + print(f"Exporting {key_type} key wrapped under ECDH key using TR31 format") + + # Get the shared info in the format expected by AWS Payment Cryptography + shared_info_hex = binascii.hexlify(shared_info).decode().upper() + + # Using the correct boto3 API structure for export_key with DiffieHellmanTr31KeyBlock + response = controlplane_client.export_key( + ExportKeyIdentifier=aes_key_arn, + KeyMaterial={ + 'DiffieHellmanTr31KeyBlock': { + 'CertificateAuthorityPublicKeyIdentifier': client_cert_key_arn, + 'DerivationData': { + 'SharedInformation': shared_info_hex + }, + 'DeriveKeyAlgorithm': 'AES_256', + 'KeyDerivationFunction': 'NIST_SP800', + 'KeyDerivationHashAlgorithm': 'SHA_512', + 'PrivateKeyIdentifier': server_ecdh_key_arn, + 'PublicKeyCertificate': base64.b64encode(public_key_certificate.encode('UTF-8')).decode('UTF-8') + } + } + ) + + print(f"Export response: {response}") + return response['WrappedKey']['KeyMaterial'] + +def import_key_under_tr31_ecdh(client_cert_key_arn, server_ecdh_key_arn, shared_info, public_key_certificate, tr31block): + """Import a key wrapped under ECDH key using TR31 format + + Args: + client_cert_key_arn (str): ARN of the client certificate key + server_ecdh_key_arn (str): ARN of the server ECDH key + shared_info (bytes): Shared information for key derivation + public_key_certificate (str): Public key certificate + tr31block (str): TR31 key block + + Returns: + str: ARN of the imported key + """ + print(f"Importing key wrapped under ECDH key using TR31 format") + + # Get the shared info in the format expected by AWS Payment Cryptography + shared_info_hex = binascii.hexlify(shared_info).decode().upper() + + # Using the correct boto3 API structure for export_key with DiffieHellmanTr31KeyBlock + response = controlplane_client.import_key( + KeyMaterial={ + 'DiffieHellmanTr31KeyBlock': { + 'CertificateAuthorityPublicKeyIdentifier': client_cert_key_arn, + 'DerivationData': { + 'SharedInformation': shared_info_hex + }, + 'DeriveKeyAlgorithm': 'AES_256', + 'KeyDerivationFunction': 'NIST_SP800', + 'KeyDerivationHashAlgorithm': 'SHA_512', + 'PrivateKeyIdentifier': server_ecdh_key_arn, + 'PublicKeyCertificate': base64.b64encode(public_key_certificate.encode('UTF-8')).decode('UTF-8'), + 'WrappedKeyBlock': tr31block + } + } + ) + + print(f"Import response: {response}") + return response['Key']['KeyArn'] \ No newline at end of file diff --git a/key-import-export/ecdh-tr31-import-export/main.py b/key-import-export/ecdh-tr31-import-export/main.py new file mode 100644 index 0000000..33ff32f --- /dev/null +++ b/key-import-export/ecdh-tr31-import-export/main.py @@ -0,0 +1,459 @@ +#!/usr/bin/env python3 +""" +AWS Payment Cryptography and Local CA Integration Application + +This application demonstrates the integration between AWS Payment Cryptography and a local CA +for secure key management and cryptographic operations using ECDH key exchange. +The application supports both key export and import operations using TR31 format. +""" + +import base64 +import binascii +import boto3 +import secrets +import inquirer +import argparse +import sys +from tr31 import unwrap_tr31, construct_tr31_header +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.cmac import CMAC +from cryptography.hazmat.primitives import serialization +import psec +from crypto_utils import ( + CryptoUtils, find_or_create_local_ca, get_ca_certificate, import_ca_key_to_apc, + create_aes_256_data_encryption_key, create_data_encryption_key, create_ecdh_key_pair_in_payment_crypto, + export_aes_key_under_tr31, import_key_under_tr31_ecdh +) + +# Initialize AWS clients +payment_crypto_client = boto3.client('payment-cryptography') +payment_crypto_data_client = boto3.client('payment-cryptography-data') +KEY_ALIAS_PREFIX = "pindemo-" +TAG_KEY = "pindemo" + +def select_operation(): + """ + Interactive CLI prompt to select operation mode with descriptive labels + + Returns: + str: Selected operation ('import' or 'export') + """ + questions = [ + inquirer.List('operation', + message="Select operation mode", + choices=[ + ('Import a key from a raw plaintext key into AWS Payment Cryptography', 'import'), + ('Create a random key in AWS Payment Cryptography and export it to plaintext', 'export') + ], + carousel=True), + ] + answers = inquirer.prompt(questions) + return answers['operation'] + +def select_import_method(): + """ + Interactive CLI prompt to select key import method + + Returns: + str: Selected method ('random' or 'custom') + """ + questions = [ + inquirer.List('import_method', + message="Select key import method", + choices=[ + ('Generate a random key', 'random'), + ('Enter a custom hexadecimal key', 'custom') + ], + carousel=True), + ] + answers = inquirer.prompt(questions) + return answers['import_method'] + +def select_key_type(): + """ + Interactive CLI prompt to select key type + + Returns: + str: Selected key type ('AES_256', 'TDES_2KEY', or 'TDES_3KEY') + """ + questions = [ + inquirer.List('key_type', + message="Select key type", + choices=[ + ('AES-256 (32 bytes / 256 bits)', 'AES_256'), + ('Triple DES 2-Key (16 bytes / 128 bits)', 'TDES_2KEY'), + ('Triple DES 3-Key (24 bytes / 192 bits)', 'TDES_3KEY') + ], + carousel=True), + ] + answers = inquirer.prompt(questions) + return answers['key_type'] + +def get_custom_key(key_type): + """ + Prompt user to enter a custom key in hexadecimal format based on key type + + Args: + key_type (str): Type of key ('AES_256', 'TDES_2KEY', or 'TDES_3KEY') + + Returns: + bytes: The key as bytes + """ + valid_key = False + key_bytes = None + + # Define validation parameters based on key type + if key_type == 'AES_256': + expected_length = 64 # 32 bytes = 64 hex chars + message = "Enter 32-byte (64 hex characters) AES-256 key" + elif key_type == 'TDES_2KEY': + expected_length = 32 # 16 bytes = 32 hex chars + message = "Enter 16-byte (32 hex characters) Triple DES 2-Key" + elif key_type == 'TDES_3KEY': + expected_length = 48 # 24 bytes = 48 hex chars + message = "Enter 24-byte (48 hex characters) Triple DES 3-Key" + else: + raise ValueError(f"Unsupported key type: {key_type}") + + while not valid_key: + questions = [ + inquirer.Text('key_hex', + message=message, + validate=lambda _, x: len(x.strip()) == expected_length and all(c in '0123456789ABCDEFabcdef' for c in x.strip()) + ) + ] + answers = inquirer.prompt(questions) + key_hex = answers['key_hex'].strip().upper() + + try: + key_bytes = binascii.unhexlify(key_hex) + valid_key = True + except binascii.Error: + print("❌ Invalid hexadecimal format. Please try again.") + + return key_bytes + +def parse_arguments(): + """Parse command line arguments""" + parser = argparse.ArgumentParser( + description="AWS Payment Cryptography and Local CA Integration Application" + ) + parser.add_argument( + "--operation", + choices=["import", "export"], + help="Operation mode: import or export" + ) + parser.add_argument( + "--import-method", + choices=["random", "custom"], + help="Import method: random or custom key" + ) + parser.add_argument( + "--key-type", + choices=["AES_256", "TDES_2KEY", "TDES_3KEY"], + help="Type of key to import or export" + ) + parser.add_argument( + "--key", + help="Custom key in hexadecimal format (length depends on key type)" + ) + + args = parser.parse_args() + + # Validate arguments + if args.import_method == "custom" and args.operation == "import" and not args.key: + parser.error("--import-method custom requires --key to be specified") + + return args + +def get_user_inputs(): + """ + Gather all user inputs at the beginning of the interaction + + Returns: + dict: Dictionary containing all user inputs + """ + inputs = {} + + # Parse command line arguments + args = parse_arguments() + + # Determine operation mode (from args or interactive) + if args.operation: + inputs['operation'] = args.operation + print(f"\nUsing command-line specified operation: {inputs['operation'].upper()}") + else: + inputs['operation'] = select_operation() + print(f"\nSelected operation: {inputs['operation'].upper()}") + + # Determine key type (from args or interactive) + if args.key_type: + inputs['key_type'] = args.key_type + print(f"Using command-line specified key type: {inputs['key_type']}") + else: + inputs['key_type'] = select_key_type() + print(f"Selected key type: {inputs['key_type']}") + + # If import operation, get import method and key + if inputs['operation'] == 'import': + # Determine import method (from args or interactive) + if args.import_method: + inputs['import_method'] = args.import_method + print(f"Using command-line specified import method: {inputs['import_method']}") + else: + inputs['import_method'] = select_import_method() + print(f"Selected import method: {inputs['import_method']}") + + # Get key based on import method + if inputs['import_method'] == 'random': + key_type_desc = "AES-256" if inputs['key_type'] == 'AES_256' else "TDES" + print(f"Will generate a random {key_type_desc} key during execution") + inputs['plaintext_key_bytes'] = None # Will be generated during execution + else: # custom + # Check if key was provided via command line + if args.key: + try: + # Define expected length based on key type + if inputs['key_type'] == 'AES_256': + expected_length = 64 # 32 bytes = 64 hex chars + elif inputs['key_type'] == 'TDES_2KEY': + expected_length = 32 # 16 bytes = 32 hex chars + elif inputs['key_type'] == 'TDES_3KEY': + expected_length = 48 # 24 bytes = 48 hex chars + + # Validate the key format + if len(args.key) != expected_length or not all(c in '0123456789ABCDEFabcdef' for c in args.key): + print(f"❌ Invalid key format. Key must be {expected_length} hexadecimal characters for {inputs['key_type']}.") + sys.exit(1) + + inputs['plaintext_key_bytes'] = binascii.unhexlify(args.key.upper()) + print(f"Using command-line provided key: {args.key.upper()}") + except binascii.Error: + print("❌ Invalid hexadecimal format in provided key.") + sys.exit(1) + else: + print("Please enter your custom key:") + inputs['plaintext_key_bytes'] = get_custom_key(inputs['key_type']) + print(f"Using custom key: {binascii.hexlify(inputs['plaintext_key_bytes']).decode().upper()}") + + return inputs + +def main(): + """Main function to orchestrate the application flow""" + print("Starting AWS Payment Cryptography and Local CA Integration") + + # Gather all user inputs at the beginning + inputs = get_user_inputs() + operation = inputs['operation'] + + print("\n=== Starting Execution ===") + + # Step 1: Create Local CA and import the public CA key into Payment Cryptography + print("\n--- Step 1: Create Local CA and import public CA key ---") + ca_id = find_or_create_local_ca() + ca_certificate = get_ca_certificate(ca_id) + ca_key_arn = import_ca_key_to_apc(ca_certificate) + print(f"CA ID: {ca_id}") + print(f"CA Key ARN in Payment Cryptography: {ca_key_arn}") + + # Step 2: Locally create an ECDH KeyPair + print("\n--- Step 2: Locally create an ECDH KeyPair ---") + private_key, public_key = CryptoUtils.generate_ecdh_key_pair() + print("Local ECDH KeyPair created") + + # Step 3: Sign the local KeyPair with Local CA + print("\n--- Step 3: Sign the local KeyPair with Local CA ---") + # Generate CSR + csr = CryptoUtils.generate_certificate_signing_request(private_key) + csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode('utf-8') + + # Sign the CSR with Local CA + certificate, chain = CryptoUtils.sign_with_private_ca( + ca_id, + csr_pem, + {'Value': 7, 'Type': 'DAYS'} + ) + print("Certificate signed by Private CA") + + # Step 4: Create an ECDH KeyPair in AWS Payment Cryptography + print("\n--- Step 4: Create an ECDH KeyPair in AWS Payment Cryptography ---") + server_ecdh_key_arn = create_ecdh_key_pair_in_payment_crypto() + print(f"Server ECDH KeyPair created with ARN: {server_ecdh_key_arn}") + + # Step 5: Get the server's public key certificate + print("\n--- Step 5: Get server's public key certificate ---") + response = payment_crypto_client.get_public_key_certificate( + KeyIdentifier=server_ecdh_key_arn + ) + server_certificate = response['KeyCertificate'] + print("Retrieved server certificate for ECDH key exchange") + + # Step 6: Use static shared information for key derivation + print("\n--- Step 6: Use static shared information for key derivation ---") + # Using a hardcoded static secret (32 bytes) + shared_info = binascii.unhexlify("0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF") + print(f"Using static shared information: {binascii.hexlify(shared_info).decode()}") + + # Step 7: Derive shared secret for key wrapping (common for both import and export) + print("\n--- Step 7: Derive shared secret for key wrapping ---") + shared_secret = CryptoUtils.generate_ecc_symmetric_key_client( + server_certificate, + private_key, + shared_info + ) + print("Shared secret derived successfully") + key_type = inputs['key_type'] + if operation == 'export': + # Export flow + print("\n=== EXPORT OPERATION ===") + + # Step 8: Create a data encryption key in AWS Payment Cryptography based on selected key type + print(f"\n--- Step 8: Create a {key_type} data encryption key in AWS Payment Cryptography ---") + aes_key_arn = create_data_encryption_key(key_type) + print(f"{key_type} data encryption key created with ARN: {aes_key_arn}") + + # Step 9: Export the AES-256 key wrapped under ECDH key using TR31 + print("\n--- Step 9: Export the AES-256 key wrapped under ECDH key using TR31 ---") + tr31_keyblock = export_aes_key_under_tr31( + aes_key_arn, + ca_key_arn, + server_ecdh_key_arn, + shared_info, + certificate + ) + print(f"TR31 Keyblock: {tr31_keyblock}") + + # Step 10: Unwrap the key locally + print("\n--- Step 10: Unwrap the exported key locally ---") + unwrapped_key = unwrap_tr31(tr31_keyblock, shared_secret) + print(f"Unwrapped key: {unwrapped_key}") + + # Store for KCV verification + unwrapped_key_bytes = binascii.unhexlify(unwrapped_key) + aes_key_arn = aes_key_arn + + elif operation == 'import': + # Import flow + print("\n=== IMPORT OPERATION ===") + + # Step 8: Get key to import (random or custom) + print("\n--- Step 8: Get key to import ---") + + import_method = inputs['import_method'] + + if import_method == 'random': + # Generate random key based on key type + + if key_type == 'AES_256': + print("Generating random AES-256 key...") + plaintext_key_bytes = secrets.token_bytes(32) # 32 bytes = 256 bits + elif key_type == 'TDES_2KEY': + print("Generating random Triple DES 2-Key...") + plaintext_key_bytes = secrets.token_bytes(16) # 16 bytes = 128 bits + elif key_type == 'TDES_3KEY': + print("Generating random Triple DES 3-Key...") + plaintext_key_bytes = secrets.token_bytes(24) # 24 bytes = 192 bits + else: + raise ValueError(f"Unsupported key type: {key_type}") + + print(f"Generated plaintext key: {binascii.hexlify(plaintext_key_bytes).decode().upper()}") + else: # custom + plaintext_key_bytes = inputs['plaintext_key_bytes'] + print(f"Using provided custom key: {binascii.hexlify(plaintext_key_bytes).decode().upper()}") + + # Step 9: Wrap key using TR31 format + print("\n--- Step 9: Wrap key using TR31 format ---") + # Set algorithm based on key type + if key_type == 'AES_256': + algorithm = 'A' # AES + else: # TDES keys + algorithm = 'T' # Triple DES + + # Construct TR31 header for the key to be imported + header = construct_tr31_header( + algorithm=algorithm, + export_mode='E', + key_type='D0', # Symmetric Data Encryption Key + mode_of_use='B', + version_id='D' + ) + wrapped_key = psec.tr31.wrap( + kbpk=shared_secret, + header=header, + key=plaintext_key_bytes + ).upper() + print(f"TR31 wrapped key block: {wrapped_key}") + + # Step 10: Import the wrapped key into AWS Payment Cryptography + print("\n--- Step 10: Import the wrapped key into AWS Payment Cryptography ---") + imported_key_arn = import_key_under_tr31_ecdh( + ca_key_arn, + server_ecdh_key_arn, + shared_info, + certificate, + wrapped_key + + ) + print(f"Key successfully imported with ARN: {imported_key_arn}") + + # Store for KCV verification + aes_key_arn = imported_key_arn + unwrapped_key_bytes = plaintext_key_bytes + + # Step 11: Calculate and verify Key Check Value (KCV) + print("\n--- Step 11: Calculate and verify Key Check Value (KCV) ---") + + # Calculate KCV for the key based on key type + if key_type == 'AES_256': + # For AES keys, use CMAC + cmac = CMAC(algorithms.AES(unwrapped_key_bytes)) + cmac.update(b'\x00' * 16) # Zero block + kcv = cmac.finalize()[:3] # First 3 bytes + else: # TDES keys + # For TDES keys, encrypt a zero block and take first 3 bytes + if key_type == 'TDES_2KEY': + # For 2-key TDES, duplicate the first 8 bytes to make it 24 bytes + if len(unwrapped_key_bytes) == 16: # 16 bytes = 128 bits + tdes_key = unwrapped_key_bytes + unwrapped_key_bytes[:8] + else: + tdes_key = unwrapped_key_bytes + else: # TDES_3KEY + tdes_key = unwrapped_key_bytes + + cipher = Cipher(algorithms.TripleDES(tdes_key), modes.ECB()) + encryptor = cipher.encryptor() + kcv = encryptor.update(b'\x00' * 8)[:3] # Encrypt zero block, take first 3 bytes + + key_check_value = binascii.hexlify(kcv).decode().upper() + print(f"Calculated KCV for key: {key_check_value}") + + # Get the KCV from AWS Payment Cryptography + aes_key_info = payment_crypto_client.get_key(KeyIdentifier=aes_key_arn) + aws_kcv = aes_key_info['Key']['KeyCheckValue'] + print(f"AWS Payment Cryptography KCV: {aws_kcv}") + + # Verify KCV match + if aws_kcv == key_check_value: + print("✅ KCV values match! Key operation was successful.") + else: + print("❌ KCV values do not match. There might be an issue with the key operation.") + + # Step 12: Delete the ECDH key pair from AWS Payment Cryptography + print("\n--- Step 12: Deleting ECDH key pair from AWS Payment Cryptography (used as KEK) ---") + try: + payment_crypto_client.delete_key(KeyIdentifier=server_ecdh_key_arn) + print(f"Successfully deleted ECDH key pair (KEK) with ARN: {server_ecdh_key_arn}") + except Exception as e: + print(f"Warning: Failed to delete ECDH key pair: {e}") + + print("\nApplication completed successfully!") + + # At the end of the execution, print the relevant outputs (ARN and Plaintext key) + + print("\n=== OUTPUTS ===") + print(f"Key ARN: {aes_key_arn}") + print(f"Plaintext Key (hex): {binascii.hexlify(unwrapped_key_bytes).decode().upper()}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/key-import-export/ecdh-tr31-import-export/requirements.txt b/key-import-export/ecdh-tr31-import-export/requirements.txt new file mode 100644 index 0000000..5be68f4 --- /dev/null +++ b/key-import-export/ecdh-tr31-import-export/requirements.txt @@ -0,0 +1,5 @@ +boto3>=1.28.0 +cryptography>=41.0.0 +psec>=0.3.0 +inquirer>=3.1.3 +pycryptodome>=3.18.0 \ No newline at end of file diff --git a/key-import-export/ecdh-tr31-import-export/tr31.py b/key-import-export/ecdh-tr31-import-export/tr31.py new file mode 100644 index 0000000..d3b71f8 --- /dev/null +++ b/key-import-export/ecdh-tr31-import-export/tr31.py @@ -0,0 +1,32 @@ +import psec +import binascii +import boto3 + + +def construct_tr31_header(algorithm, export_mode, key_type, mode_of_use, version_id='B', version_number='00'): + header = psec.tr31.Header( + version_id=version_id, + key_usage=key_type, + algorithm=algorithm, + mode_of_use=mode_of_use, + version_num=version_number, + exportability=export_mode + ) + + return header + + +def unwrap_tr31(key_block, wrapping_key): + """ + Unwraps a TR-31 key block using the provided wrapping key + + Args: + key_block (str): TR-31 key block to unwrap + wrapping_key (bytes): Key used to unwrap the TR-31 key block + + Returns: + str: Unwrapped key in hex format + """ + header, exported_plaintext_key_bytes = psec.tr31.unwrap(kbpk=wrapping_key, key_block=key_block) + exported_plaintext_kek_hex = binascii.hexlify(exported_plaintext_key_bytes).decode('utf-8').upper() + return exported_plaintext_kek_hex