Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions cloud_governance/common/clouds/aws/cloudtrail/cloudtrail_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,195 @@ def get_username_by_instance_id_and_time(self, resource_id: str, resource_type:
username, event = self.__get_user_by_resource_id(start_time, end_time, resource_id, resource_type, event_type)
return self.__check_filter_username(username, event)

def get_username_from_resource_events(self, resource_id: str, iam_users: list,
start_time: datetime = None, end_time: datetime = None,
exclude_users: set = None):
"""
Fallback username lookup: search ALL CloudTrail events for a resource
and return the first username matching a known IAM user.
Handles managed services (ROSA, EKS, etc.) where RunInstances is
performed by a service account but subsequent events (CreateTags, etc.)
may be performed by the actual user via SSO or IAM credentials.
@param resource_id: The AWS resource ID to look up
@param iam_users: List of known IAM usernames to validate against
@param start_time: Optional search window start (defaults to LOOKBACK_DAYS ago)
@param end_time: Optional search window end (defaults to now)
@param exclude_users: Optional set of usernames to skip (automation accounts)
@return: matching IAM username or empty string
"""
if exclude_users is None:
exclude_users = set()
try:
if not start_time:
start_time = datetime.now() - timedelta(days=self.LOOKBACK_DAYS)
if not end_time:
end_time = datetime.now()
responses = self.get_full_responses(
StartTime=start_time, EndTime=end_time,
LookupAttributes=[{
'AttributeKey': 'ResourceName',
'AttributeValue': resource_id
}])
for event in responses:
username, _ = self.__check_event_is_assumed_role(
event.get('CloudTrailEvent', ''))
if username and username in iam_users and username not in exclude_users:
logger.info(f'Found username {username} from {event.get("EventName")} '
f'event on resource {resource_id}')
return username
event_username = event.get('Username', '')
if event_username and event_username in iam_users and event_username not in exclude_users:
logger.info(f'Found username {event_username} from {event.get("EventName")} '
f'event on resource {resource_id}')
return event_username
return ''
except Exception as err:
logger.error(f'Error in get_username_from_resource_events: {err}')
return ''

def __get_username_from_role_cloudtrail(self, role: dict, iam_users: list):
"""
Look up CloudTrail CreateRole event for a given IAM role and return
the username if it matches a known IAM user.
"""
role_name = role['RoleName']
role_arn = role['Arn']
create_date = role.get('CreateDate')
if not create_date:
return ''
end_time = create_date + timedelta(seconds=self.SEARCH_SECONDS)
start_time = create_date - timedelta(seconds=self.SEARCH_SECONDS)
responses = self.__get_cloudtrail_responses(
start_time=start_time, end_time=end_time,
resource_arn=role_arn)
for event in responses:
if event.get('EventName') == 'CreateRole':
username, _ = self.__check_event_is_assumed_role(
event.get('CloudTrailEvent', ''))
if username and username in iam_users:
logger.info(f'Found cluster owner {username} from CreateRole '
f'event on role {role_name}')
return username
event_username = event.get('Username', '')
if event_username and event_username in iam_users:
logger.info(f'Found cluster owner {event_username} from CreateRole '
f'event on role {role_name}')
return event_username
return ''

def __get_username_from_create_role_events(self, start_time: datetime,
end_time: datetime,
iam_users: list):
"""
Search CloudTrail for CreateRole events of OpenShift operator roles
within a time window. Handles the case where roles were deleted from
IAM but creation events still exist in CloudTrail (90-day retention).
"""
try:
events = []
kwargs = {
'LookupAttributes': [{
'AttributeKey': 'EventName',
'AttributeValue': 'CreateRole'
}],
'StartTime': start_time, 'EndTime': end_time,
'MaxResults': 50
}
while True:
response = self.__global_cloudtrail.lookup_events(**kwargs)
events.extend(response.get('Events', []))
next_token = response.get('NextToken')
if not next_token:
break
kwargs['NextToken'] = next_token
for event in events:
resources = event.get('Resources', [])
role_names = [r.get('ResourceName', '') for r in resources
if r.get('ResourceType') == 'AWS::IAM::Role']
if not any('openshift' in n.lower() for n in role_names):
continue
username, _ = self.__check_event_is_assumed_role(
event.get('CloudTrailEvent', ''))
if username and username in iam_users:
logger.info(f'Found cluster owner {username} from '
f'CloudTrail CreateRole event (deleted role)')
return username
event_username = event.get('Username', '')
if event_username and event_username in iam_users:
logger.info(f'Found cluster owner {event_username} from '
f'CloudTrail CreateRole event (deleted role)')
return event_username
except Exception as err:
logger.error(f'Error searching CreateRole events: {err}')
return ''

def get_username_from_cluster_role(self, cluster_id: str, iam_users: list,
launch_time: datetime = None):
"""
Trace a cluster's ownership through its IAM roles.
Uses three strategies:
1. Direct match: find roles whose names contain the cluster ID
(works for IPI/UPI clusters)
2. Temporal match: find ROSA/OpenShift operator roles created
shortly before the cluster's instance launch time
(works for ROSA STS where roles use cluster name, not infra ID)
3. CloudTrail fallback: search CreateRole events directly for
OpenShift roles in the time window (handles deleted roles)
Works for both SSO (AssumedRole) and IAM user identities.
@param cluster_id: The cluster infra ID (e.g. 'u0s3a7y7o5y9c6w-x56mc')
@param iam_users: List of known IAM usernames to validate against
@param launch_time: Instance launch time for temporal role matching
@return: matching IAM username or empty string
"""
ROSA_ROLE_WINDOW_SECONDS = 21600 # 6 hours before launch
try:
paginator = self.__iam_client.get_paginator('list_roles')
roles = []
for page in paginator.paginate(PaginationConfig={'PageSize': 1000}):
roles.extend(page.get('Roles', []))
cluster_roles = [r for r in roles if cluster_id in r.get('RoleName', '')]
if cluster_roles:
for role in cluster_roles:
username = self.__get_username_from_role_cloudtrail(role, iam_users)
if username:
return username

if not launch_time:
return ''
candidate_roles = []
for role in roles:
role_name = role.get('RoleName', '')
if 'openshift' not in role_name.lower():
continue
create_date = role.get('CreateDate')
if not create_date:
continue
launch_naive = launch_time.replace(tzinfo=None)
create_naive = create_date.replace(tzinfo=None)
diff_seconds = (launch_naive - create_naive).total_seconds()
if 0 <= diff_seconds <= ROSA_ROLE_WINDOW_SECONDS:
candidate_roles.append(role)
if candidate_roles:
candidate_roles.sort(key=lambda r: r.get('CreateDate', ''), reverse=True)
resolved_users = set()
for role in candidate_roles[:6]:
username = self.__get_username_from_role_cloudtrail(role, iam_users)
if username:
resolved_users.add(username)
if len(resolved_users) == 1:
return resolved_users.pop()

ct_start = launch_time - timedelta(seconds=ROSA_ROLE_WINDOW_SECONDS)
username = self.__get_username_from_create_role_events(
start_time=ct_start, end_time=launch_time,
iam_users=iam_users)
if username:
return username
return ''
except Exception as err:
logger.error(f'Error in get_username_from_cluster_role: {err}')
return ''

def get_stop_time(self, resource_id: str, event_name: str):
"""
This method return the time of when instance is stopped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cloud_governance.common.clouds.aws.ec2.ec2_operations import EC2Operations
from cloud_governance.common.clouds.aws.iam.iam_operations import IAMOperations
from cloud_governance.common.clouds.aws.utils.utils import Utils
from cloud_governance.common.logger.init_logger import logger


class TagClusterOperations:
Expand All @@ -29,9 +30,25 @@ def __init__(self, region: str, input_tags: dict = None, cluster_name: str = No
self.cluster_name = cluster_name
self.input_tags = input_tags
self.cloudtrail = CloudTrailOperations(region_name='us-east-1')
self._get_username_from_instance_id_and_time = CloudTrailOperations(region_name=region).get_username_by_instance_id_and_time
self._regional_cloudtrail = CloudTrailOperations(region_name=region)
self._get_username_from_instance_id_and_time = self._regional_cloudtrail.get_username_by_instance_id_and_time
self.dry_run = dry_run
self.iam_users = self.iam_operations.get_iam_users_list()
self._automation_user = self._get_automation_username()

@staticmethod
def _get_automation_username():
"""
Identify the current caller (the automation/service account running
this policy) so it can be excluded from CloudTrail fallback searches.
"""
try:
sts = boto3.client('sts')
identity = sts.get_caller_identity()
arn = identity.get('Arn', '')
return arn.split('/')[-1] if '/' in arn else ''
except Exception:
return ''

def _input_tags_list_builder(self):
"""
Expand Down Expand Up @@ -89,12 +106,61 @@ def get_user_name_from_name_tag(self, tags: list):
return user
return None

def get_username(self, start_time: datetime, resource_id: str, resource_type: str, tags: list):
def get_username(self, start_time: datetime, resource_id: str, resource_type: str,
tags: list, cluster_id: str = ''):
"""
This method returns the username
This method returns the username using multiple strategies:
1. Check existing tags (User tag, Name tag)
2. CloudTrail - resource creation event (RunInstances, etc.)
3. CloudTrail - any event on this resource by a known IAM user
(handles managed services like ROSA where service accounts create
resources but user events like CreateTags may exist)
4. CloudTrail - trace cluster IAM roles to find who created them
(handles ROSA/OSD where instances are created by service accounts
but IAM roles are created by the user)
:return:
"""
exclude = {self._automation_user} if self._automation_user else set()
iam_username = self.get_user_name_from_name_tag(tags=tags)
if not iam_username:
return self._get_username_from_instance_id_and_time(start_time=start_time, resource_id=resource_id, resource_type=resource_type)
ct_username = self._get_username_from_instance_id_and_time(
start_time=start_time, resource_id=resource_id,
resource_type=resource_type)
if ct_username == 'AutoScaling':
return ct_username
if ct_username and ct_username in self.iam_users and ct_username not in exclude:
return ct_username
if cluster_id:
iam_username = self.cloudtrail.get_username_from_cluster_role(
cluster_id=cluster_id, iam_users=self.iam_users,
launch_time=start_time)
if iam_username:
logger.info(f'Resolved cluster owner {iam_username} via IAM '
f'role lookup for cluster {cluster_id}')
return iam_username
iam_username = self._regional_cloudtrail.get_username_from_resource_events(
resource_id=resource_id, iam_users=self.iam_users,
start_time=start_time, exclude_users=exclude)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return iam_username

def get_username_from_cluster_instances(self, resources: list, cluster_name: str):
"""
Search all instances in a cluster for a valid User tag.
If any instance already has a tagged owner, return that username.
Handles managed services (ROSA, EKS) where CloudTrail cannot attribute
the creator - if at least one instance was tagged, propagate to siblings.
@param resources: list of instance groups from the cluster
@param cluster_name: the cluster identifier to match
@return: username string or None
"""
for instance_group in resources:
for item in instance_group:
tags = item.get('Tags', [])
for tag in tags:
if any(prefix in tag.get('Key', '') for prefix in self.cluster_prefix):
if cluster_name in tag.get('Key', ''):
user = self.ec2_operations.get_tag_value_from_tags(
tags=tags, tag_name='User')
if user and user != 'NA' and user in self.iam_users:
return user
return None
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,11 @@ def update_cluster_tags(self, resources: list):
else:
username = self.get_username(start_time=item.get('LaunchTime'),
resource_id=instance_id,
resource_type='AWS::EC2::Instance', tags=tags)
resource_type='AWS::EC2::Instance', tags=tags,
cluster_id=cluster_name)
if not username:
username = self.get_username_from_cluster_instances(
resources=resources, cluster_name=cluster_name)
if username:
if username == 'AutoScaling':
add_tags.extend(self._fill_na_tags(user=username))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ def __init__(self, region: str = 'us-east-2', dry_run: str = 'yes', input_tags:
self.ec2_operations = EC2Operations(region=region)
self.utils = Utils(region=region)
self.iam_users = self.iam_client.get_iam_users_list()
self._automation_user = self._get_automation_username()

@staticmethod
def _get_automation_username():
try:
sts = boto3.client('sts')
identity = sts.get_caller_identity()
arn = identity.get('Arn', '')
return arn.split('/')[-1] if '/' in arn else ''
except Exception:
return ''

def _get_instances_data(self, instance_id: str = ''):
"""
Expand Down Expand Up @@ -209,14 +220,27 @@ def get_user_name_from_name_tag(self, tags: list = None, resource_name: str = No

def get_username(self, start_time: datetime, resource_id: str, resource_type: str, tags: list, resource_name: str = '', end_time: datetime = None):
"""
This method returns the username
This method returns the username using multiple strategies:
1. Check existing tags (Name tag)
2. CloudTrail - resource creation event
3. CloudTrail - any event on this resource by a known IAM user
(handles managed services where service accounts create resources)
:return:
"""
iam_username = self.get_user_name_from_name_tag(tags=tags, resource_name=resource_name)
if not iam_username:
iam_username = self.get_user_name_from_name_tag(resource_name=resource_name)
if not iam_username:
return self._get_username_from_cloudtrail(start_time=start_time, resource_id=resource_id, resource_type=resource_type, end_time=end_time)
ct_username = self._get_username_from_cloudtrail(
start_time=start_time, resource_id=resource_id,
resource_type=resource_type, end_time=end_time)
exclude = {self._automation_user} if self._automation_user else set()
if ct_username and ct_username in self.iam_users and ct_username not in exclude:
return ct_username
iam_username = self.cloudtrail.get_username_from_resource_events(
resource_id=resource_id, iam_users=self.iam_users,
start_time=start_time, end_time=end_time,
exclude_users=exclude)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return iam_username

def validate_existing_tag(self, tags: list):
Expand Down
Loading
Loading