1+ import boto3
2+ import time
3+ from botocore .exceptions import ClientError
4+
5+ TAG_KEY = "APC_TR34"
6+ controlplane_client = boto3 .client ("payment-cryptography" )
7+ private_ca = boto3 .client ("acm-pca" )
8+
9+ def create_certificate_authority ():
10+ # create Certificate Authority for short-lived certificates
11+ ca_arn = private_ca .create_certificate_authority (
12+ CertificateAuthorityConfiguration = {
13+ 'KeyAlgorithm' : 'RSA_2048' ,
14+ 'SigningAlgorithm' : 'SHA256WITHRSA' ,
15+ 'Subject' : {
16+ 'Country' : 'US' ,
17+ 'Organization' : 'AWS Samples' ,
18+ 'OrganizationalUnit' : 'APC' ,
19+ 'State' : 'CA' ,
20+ 'CommonName' : 'AWS Samples' ,
21+ }
22+ },
23+ CertificateAuthorityType = 'ROOT' ,
24+ UsageMode = 'SHORT_LIVED_CERTIFICATE'
25+ )['CertificateAuthorityArn' ]
26+
27+ state = "CREATING"
28+ while state == "CREATING" :
29+ time .sleep (1 )
30+ state = private_ca .describe_certificate_authority (CertificateAuthorityArn = ca_arn )['CertificateAuthority' ][
31+ 'Status' ]
32+ print (state )
33+ return ca_arn
34+
35+ def create_private_ca ():
36+ print ("Creating AWS Private CA" )
37+ cert_authority_arn = create_certificate_authority ()
38+ print ("Newly created Private CA ARN: %s" % cert_authority_arn )
39+ # add tag to CA
40+ private_ca .tag_certificate_authority (
41+ CertificateAuthorityArn = cert_authority_arn ,
42+ Tags = [
43+ {
44+ 'Key' : TAG_KEY ,
45+ 'Value' : 'Sample'
46+ },
47+ ]
48+ )
49+ print ("Getting root CA CSR" )
50+ csr = private_ca .get_certificate_authority_csr (CertificateAuthorityArn = cert_authority_arn )['Csr' ]
51+ print ("self-signing root CA CSR" )
52+ certificate , chain = sign_with_private_ca (cert_authority_arn , csr , {
53+ 'Value' : 10 ,
54+ 'Type' : 'YEARS'
55+ }, template = 'arn:aws:acm-pca:::template/RootCACertificate/V1' )
56+ print ("Importing signed certificate as ROOT" )
57+ private_ca .import_certificate_authority_certificate (CertificateAuthorityArn = cert_authority_arn ,
58+ Certificate = certificate )
59+ print ("CA Setup complete" )
60+ return cert_authority_arn
61+
62+ def find_or_create_private_ca ():
63+ # find existing CA
64+ for ca in private_ca .list_certificate_authorities ()['CertificateAuthorities' ]:
65+ if ca ['Status' ] == 'ACTIVE' :
66+ # get ca tags
67+ tags = private_ca .list_tags (CertificateAuthorityArn = ca ['Arn' ])
68+ for tag in tags ['Tags' ]:
69+ if tag ['Key' ] == TAG_KEY :
70+ # if tag is present, use this CA
71+ return ca ['Arn' ]
72+ return create_private_ca ()
73+
74+ def issue_certificate (csr_content ):
75+ """
76+ Issues a certificate using AWS Private CA
77+
78+ Args:
79+ ca_arn (str): ARN of the private CA
80+ csr_file_path (str): Path to the CSR file
81+
82+ Returns:
83+ str: Certificate ARN if successful, None otherwise
84+ """
85+ try :
86+ # Create ACM PCA client
87+ acmpca_client = boto3 .client ('acm-pca' )
88+ ca_arn = find_or_create_private_ca ()
89+ # Request to issue certificate
90+ response = acmpca_client .issue_certificate (
91+ CertificateAuthorityArn = ca_arn ,
92+ Csr = csr_content ,
93+ SigningAlgorithm = 'SHA256WITHRSA' ,
94+ Validity = {
95+ 'Value' : 7 ,
96+ 'Type' : 'DAYS'
97+ },
98+ TemplateArn = 'arn:aws:acm-pca:::template/EndEntityCertificate/V1'
99+ )
100+
101+ certificate_arn = response ['CertificateArn' ]
102+
103+ # Wait for certificate to be issued
104+ waiter = acmpca_client .get_waiter ('certificate_issued' )
105+ waiter .wait (
106+ CertificateAuthorityArn = ca_arn ,
107+ CertificateArn = certificate_arn
108+ )
109+
110+ # Get the issued certificate
111+ response = acmpca_client .get_certificate (
112+ CertificateAuthorityArn = ca_arn ,
113+ CertificateArn = certificate_arn
114+ )
115+
116+ certificate = response ['Certificate' ]
117+ certificate_chain = response ['CertificateChain' ]
118+
119+ """ # Save the certificate and chain to files
120+ with open('certificate.pem', 'w') as f:
121+ f.write(certificate)
122+
123+ with open('certificate_chain.pem', 'w') as f:
124+ f.write(certificate_chain) """
125+
126+ return certificate , certificate_chain
127+
128+ except ClientError as e :
129+ print (f"Error issuing certificate: { e } " )
130+ return None
131+
132+ def sign_with_private_ca (ca_arn , csr , validity , template = "arn:aws:acm-pca:::template/EndEntityCertificate/V1" ):
133+ """
134+ Signs the client-side Key with AWS Private CA and returns the Certificate and Certificate Chain
135+ :param validity:
136+ :param ca_arn:
137+ :param csr: Certificate Signing Request
138+ :param template: Template ARN to use for the certificate
139+ :return:
140+ """
141+ client = boto3 .client ('acm-pca' )
142+ response = client .issue_certificate (
143+ CertificateAuthorityArn = ca_arn ,
144+ Csr = csr ,
145+ TemplateArn = template ,
146+ SigningAlgorithm = 'SHA256WITHRSA' ,
147+ Validity = validity
148+ )
149+ certificate_arn = response ['CertificateArn' ]
150+ time .sleep (0.5 )
151+
152+ while 1 :
153+ try :
154+ certificate_response = client .get_certificate (CertificateArn = certificate_arn ,
155+ CertificateAuthorityArn = ca_arn )
156+ if 'CertificateChain' in certificate_response :
157+ chain = certificate_response ['CertificateChain' ]
158+ else :
159+ chain = None
160+ return certificate_response ['Certificate' ], chain
161+ except client .exceptions .RequestInProgressException :
162+ time .sleep (0.1 )
163+
164+ """ def setup():
165+ ca_arn = find_or_create_private_ca()
166+ #ca_certificate = private_ca.get_certificate_authority_certificate(CertificateAuthorityArn=ca_arn)['Certificate']
167+ print("CA Certificate: %s" % ca_arn)
168+ return ca_arn, ca_arn
169+
170+ setup() """
0 commit comments