1+ # A wrapper script to Change ownership of Service Catalog provisioned products.
2+ # This script supports changing ownership on one product at a time.
3+ # More info at IT-4483
4+ #
5+ # Usage:
6+ # AWS_PROFILE=my-aws-profile AWS_DEFAULT_REGION=us-east-1 pipenv run change_owner --help
7+ #
8+ # Example to change bucket owner:
9+ # pipenv run change_owner \
10+ # --ProvisionedProductId pp-j6npiwvn72hjg \
11+ # --NewOwnerArn arn:aws:sts::999999999999:assumed-role/ServiceCatalogEndusers/1234567 \
12+ # --BucketName MYBUCKET
13+ # Example to change EC2 instance owner:
14+ # pipenv run change_owner \
15+ # --ProvisionedProductId pp-j6npiwvn72hjg \
16+ # --NewOwnerArn arn:aws:sts::999999999999:assumed-role/ServiceCatalogEndusers/1234567 \
17+ # --InstanceId i-0c1d159b27a027a33
18+ # Example to change scheduled jobs owner:
19+ # pipenv run change_owner \
20+ # --ProvisionedProductId pp-j6npiwvn72hjg \
21+ # --NewOwnerArn arn:aws:sts::999999999999:assumed-role/ServiceCatalogEndusers/1234567 \
22+ # --StackId 'arn:aws:cloudformation:us-east-1:999999999999:stack/SC-999999999999-pp-dtstzmysqf36i/f3a44380-9418-11f0-930e-1204b39d8e69'
23+
24+ import argparse
25+ import json
26+ import boto3
27+ import os
28+ import re
29+ import sys
30+ import set_tags .utils as utils
31+ import set_tags .set_bucket_tags as set_bucket_tags
32+ import set_tags .set_instance_tags as set_instance_tags
33+ import set_tags .set_batch_tags as set_batch_tags
34+
35+
36+ def get_args ():
37+ """
38+ Parse command line arguments.
39+ """
40+ parser = argparse .ArgumentParser (
41+ description = "Change Service Catalog provisioned product ownership."
42+ )
43+
44+ # Required argument
45+ parser .add_argument (
46+ "--ProvisionedProductId" ,
47+ help = "The identifier of the provisioned product (required)" ,
48+ required = True
49+ )
50+ parser .add_argument (
51+ "--NewOwnerArn" ,
52+ help = "The ARN of the new owner (required)" ,
53+ required = True
54+ )
55+
56+ # Optional arguments
57+ parser .add_argument (
58+ "--StackId" ,
59+ help = "The CloudFormation stack ID for batch scheduled jobs" ,
60+ default = None
61+ )
62+ parser .add_argument (
63+ "--BucketName" ,
64+ help = "The S3 bucket name" ,
65+ default = None
66+ )
67+ parser .add_argument (
68+ "--InstanceId" ,
69+ help = "The EC2 instance ID" ,
70+ default = None
71+ )
72+
73+ return parser .parse_args ()
74+
75+ def get_account_and_user_id_from_arn (arn : str ):
76+ """
77+ Extracts the account ID and user ID from an assumed role ARN.
78+
79+ Parameters:
80+ - arn: str → the ARN to parse (must be of form
81+ arn:aws:sts::<account_id>:assumed-role/ServiceCatalogEndusers/<user_id>)
82+
83+ Returns:
84+ - Tuple (account_id: str, user_id: str)
85+ """
86+ match = re .match (
87+ r'arn:aws:sts::(\d+):assumed-role/ServiceCatalogEndusers/(\d+)' ,
88+ arn
89+ )
90+ if not match :
91+ raise ValueError (f"ARN { arn } is not in the expected format" )
92+
93+ return match .groups ()
94+
95+
96+ def update_bucket_principal_arn (bucket_name : str , target_user_id : str , new_assumed_role_arn : str ):
97+ """
98+ Updates the Principal ARNs in an S3 bucket policy by replacing all ARNs
99+ that contain a specific target user ID with a new assumed-role ARN.
100+
101+ Parameters:
102+ ----------
103+ bucket_name : str
104+ The name of the S3 bucket whose policy will be updated.
105+ target_user_id : str
106+ The user ID within the assumed-role ARN that should be replaced.
107+ Only ARNs containing this user ID will be updated.
108+ new_assumed_role_arn : str
109+ The new assumed-role ARN to replace the target ARN(s) with.
110+ Must be in the format:
111+ 'arn:aws:sts::<account_id>:assumed-role/ServiceCatalogEndusers/<user_id>'
112+ """
113+ s3 = boto3 .client ("s3" )
114+
115+ try :
116+ # Fetch current bucket policy
117+ response = s3 .get_bucket_policy (Bucket = bucket_name )
118+ policy = json .loads (response ["Policy" ])
119+
120+ updated = False
121+
122+ for statement in policy .get ("Statement" , []):
123+ aws_principals = statement .get ("Principal" , {}).get ("AWS" , [])
124+
125+ # Ensure we have a list for consistency
126+ if isinstance (aws_principals , str ):
127+ aws_principals = [aws_principals ]
128+
129+ new_list = []
130+ for arn in aws_principals :
131+ # Replace only if ARN contains the target user_id
132+ if f"/{ target_user_id } " in arn :
133+ new_list .append (new_assumed_role_arn )
134+ updated = True
135+ else :
136+ new_list .append (arn )
137+
138+ if new_list :
139+ statement ["Principal" ]["AWS" ] = new_list
140+
141+ if not updated :
142+ print (f"No ARN found with user_id { target_user_id } ." )
143+ return
144+
145+ # Apply updated policy
146+ s3 .put_bucket_policy (Bucket = bucket_name , Policy = json .dumps (policy ))
147+ print (f"Bucket policy updated successfully for { bucket_name } ." )
148+
149+ except s3 .exceptions .NoSuchBucketPolicy :
150+ print (f"No policy found for bucket { bucket_name } ." )
151+ except Exception as e :
152+ print (f"Error updating bucket policy: { e } " )
153+
154+
155+ def main ():
156+ args = get_args ()
157+ new_owner_arn = args .NewOwnerArn
158+ new_account_id , new_user_id = get_account_and_user_id_from_arn (new_owner_arn )
159+
160+ # Execute a Service catalog change owner action
161+ sc_client = boto3 .client ("servicecatalog" )
162+ print (f"Executing Service Catalog change owner action for product { args .ProvisionedProductId } to new owner { new_owner_arn } " )
163+ response = sc_client .update_provisioned_product_properties (
164+ ProvisionedProductId = args .ProvisionedProductId ,
165+ ProvisionedProductProperties = {
166+ "OWNER" : new_owner_arn
167+ }
168+ )
169+ print (f"Service Catalog change owner response: { response } " )
170+
171+ # Update tags for service catalog products
172+ os .environ ["TEAM_TO_ROLE_ARN_MAP_PARAM_NAME" ] = "/service-catalog/TeamToRoleArnMap"
173+ if args .StackId :
174+ print (f"StackId: { args .StackId } " )
175+ event = {"StackId" : args .StackId }
176+ try :
177+ # monkey patch to always return the Synapse owner id from
178+ # the user supplied OwnerArn
179+ utils .get_synapse_owner_id = lambda tags : new_user_id
180+
181+ set_batch_tags .create_or_update (event , None )
182+ print ("Batch tags updated successfully." )
183+ except Exception as e :
184+ print (f"Failed to update batch: { e } " )
185+ sys .exit (1 )
186+ if args .BucketName :
187+ bucket_name = args .BucketName
188+ print (f"Update tags on bucket: { bucket_name } " )
189+ event = {"ResourceProperties" :{"BucketName" : bucket_name }}
190+ try :
191+ # Get existing synapse user id
192+ bucket_tags = set_bucket_tags .get_bucket_tags (bucket_name )
193+ existing_user_id = utils .get_synapse_owner_id (bucket_tags )
194+
195+ # monkey patch to always return the Synapse owner id from
196+ # the user supplied OwnerArn
197+ utils .get_synapse_owner_id = lambda tags : new_user_id
198+
199+ set_bucket_tags .create_or_update (event , None )
200+ print ("Bucket tags updated successfully." )
201+
202+ # Update the bucket policy to allow new owner access
203+ update_bucket_principal_arn (bucket_name , existing_user_id , new_owner_arn )
204+ except Exception as e :
205+ print (f"Failed to update bucket: { e } " )
206+ sys .exit (1 )
207+ if args .InstanceId :
208+ print (f"InstanceId: { args .InstanceId } " )
209+ event = {"ResourceProperties" :{"InstanceId" : args .InstanceId }}
210+ try :
211+ # monkey patch to always return the Synapse owner id from
212+ # the user supplied OwnerArn
213+ utils .get_synapse_owner_id = lambda tags : new_user_id
214+
215+ set_instance_tags .create_or_update (event , None )
216+ print ("Instance tags updated successfully." )
217+ except Exception as e :
218+ print (f"Failed to update instance: { e } " )
219+ sys .exit (1 )
220+
221+ if __name__ == "__main__" :
222+ main ()
0 commit comments