Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions POLICIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This tool support the following policies:
monitoring the active connection count.
* [s3_inactive](cloud_governance/policy/aws/s3_inactive.py): Identify the empty s3 buckets, causing the resource quota
issues.
* [unused_access_key](cloud_governance/policy/aws/unused_access_key.py): Identify user with unused active access key, causing the security issue
* [empty_roles](cloud_governance/policy/aws/empty_roles.py): Identify the empty roles that do not have any attached
policies to them.
* [ebs_in_use](cloud_governance/policy/aws/ebs_in_use.py): list in use volumes.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ List of Policies:
- zombie_snapshots
- unused_nat_gateway
- s3_inactive
- unused_access_key
- empty_roles
- tag_resources
- tag_iam_user
Expand Down
157 changes: 157 additions & 0 deletions cloud_governance/common/clouds/aws/iam/iam_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
from cloud_governance.common.clouds.aws.utils.common_methods import get_boto3_client
from cloud_governance.common.clouds.aws.utils.utils import Utils
from cloud_governance.common.logger.init_logger import logger
from datetime import datetime, timezone


class IAMOperations:

ACCESS_KEY_LABEL_MAP = {"access key 1": 0, "access key 2": 1}

def __init__(self, iam_client=None):
self.iam_client = iam_client if iam_client else get_boto3_client('iam')
self.utils = Utils()
Expand Down Expand Up @@ -158,3 +161,157 @@ def untag_role(self, role_name: str, tags: list):
return True
except Exception as err:
raise err

def tag_user(self, user_name: str, tags: list):
"""
This method tags the IAM user.
:param user_name: The name of the IAM user to tag.
:param tags: A list of tags to associate with the user.
:return: True if tagging is successful, otherwise raises an exception.
"""
try:
self.iam_client.tag_user(UserName=user_name, Tags=tags)
return True
except Exception as err:
raise err

def get_iam_users_access_keys(self):
"""
Retrieves IAM users and summarizes:
- Access key status (active/inactive)
- Access key age in days
- Access key last used in days (or "N/A" if never used)
- Tags (as a list of dictionaries)
- Most recent key usage: last_activity_days
- IAM client region (global context, since IAM is non-regional)
- IAM user unique ID: ResourceId

Returns:
dict: {
"username": {
"Access key 1": [status, age_days, last_used_days],
"Access key 2": [...],
"last_activity_days": int or "N/A",
"tags": [{"Key": "tag_key", "Value": "tag_value"}, ...],
"region": "us-east-1",
"ResourceId": "AIDAEXAMPLEUSERID"
},
...
}
"""
result = {}
now = datetime.now(timezone.utc)
region_name = self.iam_client.meta.region_name or "global"

paginator = self.iam_client.get_paginator('list_users')
for page in paginator.paginate():
for user in page['Users']:
username = user['UserName']
result[username] = {}
# Access keys
access_keys = self.iam_client.list_access_keys(UserName=username)['AccessKeyMetadata']
for idx, key in enumerate(access_keys, start=1):
label = f"Access key {idx}"
status = key['Status'].lower()
age_days = (now - key['CreateDate']).days

# Get access key last used
try:
response = self.iam_client.get_access_key_last_used(AccessKeyId=key['AccessKeyId'])
last_used_date = response.get('AccessKeyLastUsed', {}).get('LastUsedDate')
if last_used_date:
last_used_days = (now - last_used_date).days
else:
last_used_days = "N/A"
except Exception:
last_used_days = "N/A"

result[username][label] = {'label': label, 'status': status, 'age_days': age_days, 'last_activity_days': last_used_days if last_used_days is not None else "N/A"}

# Tags as list of dicts
try:
tag_response = self.iam_client.list_user_tags(UserName=username)
tags = tag_response.get('Tags', [])
except Exception:
tags = []

result[username]["tags"] = tags
result[username]["region"] = region_name
result[username]["ResourceId"] = user.get('UserId') # <-- Unique ID

return result

def has_active_access_keys(self, username: str, access_key_label: str = None) -> bool:
"""
Checks if the given IAM user has any active access keys.
Optionally filters by access key label ("Access Key 1" or "Access Key 2").

Args:
username (str): IAM user name
access_key_label (str): Label to filter access keys ("Access Key 1"/"Access Key 2")

Returns:
bool: True if any access key is active (and matches the label if provided), False otherwise
"""
try:
keys = self.iam_client.list_access_keys(UserName=username)['AccessKeyMetadata']
except Exception as e:
logger.error(f"Failed to list access keys for user '{username}': {e}")
return False

# Sort keys by CreateDate ascending (oldest first)
keys.sort(key=lambda k: k['CreateDate'])

if access_key_label:
idx = self.ACCESS_KEY_LABEL_MAP.get(access_key_label.lower())
if idx is None or idx >= len(keys):
return False
return keys[idx].get('Status') == 'Active'

return any(k.get('Status') == 'Active' for k in keys)

def deactivate_user_access_key(self, username: str, **kwargs):
"""
Deactivates the specified access key for the given IAM user.

Args:
username (str): IAM user name
access_key_label (str): Access Key 1 or Access Key 2 (case-insensitive)
"""
access_key_label = kwargs.get('access_key_label', '').lower()
if not access_key_label:
logger.warning("No access key label provided for deactivation.")
return

try:
access_keys = self.iam_client.list_access_keys(UserName=username)['AccessKeyMetadata']
except Exception as e:
logger.error(f"Failed to list access keys for user '{username}': {e}")
return

# Sort keys by CreateDate ascending (oldest first) for consistent indexing
access_keys.sort(key=lambda k: k['CreateDate'])

idx = self.ACCESS_KEY_LABEL_MAP.get(access_key_label)
if idx is None or idx >= len(access_keys):
logger.warning(f"Access key label '{access_key_label}' not found for user '{username}'")
return

key_to_deactivate = access_keys[idx]
access_key_id = key_to_deactivate['AccessKeyId']
current_status = key_to_deactivate['Status'].lower()

if current_status == 'active':
try:
self.iam_client.update_access_key(
UserName=username,
AccessKeyId=access_key_id,
Status='Inactive'
)
logger.info(f"Access key '{access_key_id}' deactivated for user '{username}'")
except Exception as e:
logger.error(f"Failed to deactivate access key '{access_key_id}' for user '{username}': {e}")
else:
logger.info(f"Access key '{access_key_id}' is already inactive for user '{username}'")

logger.info(f"Access key deactivation processed for user '{username}'.")
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class PolicyEsMetaData(dict):
launch_time: str = ''
running_days: int = ''
create_date: str = ''
age_days: int = ''
last_activity_days: int = ''

def __post_init__(self):
"""
Expand Down
2 changes: 2 additions & 0 deletions cloud_governance/common/utils/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
EC2_NAMESPACE = 'AWS/EC2'
CLOUDWATCH_METRICS_AVAILABLE_DAYS = 14
AWS_DEFAULT_GLOBAL_REGION = 'us-east-1'
UNUSED_ACCESS_KEY_DAYS = 90
UNUSED_ACCESS_KEY_MAX_DAY = 1000

# X86 to Graviton
GRAVITON_MAPPINGS = {
Expand Down
4 changes: 2 additions & 2 deletions cloud_governance/main/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(self):
'ip_unattached', 'unused_nat_gateway',
'instance_idle',
'ec2_stop', 'ebs_in_use', 'database_idle',
's3_inactive',
's3_inactive', 'unused_access_key',
'empty_roles',
'zombie_snapshots', 'skipped_resources',
'monthly_report', 'optimize_resources_report']
Expand Down Expand Up @@ -277,7 +277,7 @@ def __init__(self):
self._environment_variables_dict['POLICY_ACTIONS_DAYS'] = literal_eval(
EnvironmentVariables.get_env('POLICY_ACTIONS_DAYS', '[]'))
self._environment_variables_dict['DEFAULT_ADMINS'] = literal_eval(
EnvironmentVariables.get_env('DEFAULT_ADMINS', '[]'))
EnvironmentVariables.get_env('DEFAULT_ADMINS', '["yinsong@redhat.com", "ebattat@redhat.com"]'))
self._environment_variables_dict['KERBEROS_USERS'] = literal_eval(
EnvironmentVariables.get_env('KERBEROS_USERS', '[]'))
self._environment_variables_dict['POLICIES_TO_ALERT'] = literal_eval(
Expand Down
2 changes: 1 addition & 1 deletion cloud_governance/main/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ATHENA_ACCOUNT_SECRET_KEY: ""

non_cluster_policies: [ 'instance_run', 'unattached_volume', 'cluster_run',
'ip_unattached', 'unused_nat_gateway', 'instance_idle',
'ec2_stop', 'ebs_in_use', 'database_idle', 's3_inactive',
'ec2_stop', 'ebs_in_use', 'database_idle', 's3_inactive', 'unused_access_key',
'empty_roles', 'tag_resources', 'cost_usage_reports',
'zombie_snapshots', 'skipped_resources',
'monthly_report', 'optimize_resources_report' ]
Expand Down
2 changes: 1 addition & 1 deletion cloud_governance/main/main_oerations/main_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run(self):
# @Todo support for all the aws policies, currently supports ec2_run as urgent requirement
if self._policy in policies and self._policy in ["instance_run", "unattached_volume", "cluster_run",
"ip_unattached", "unused_nat_gateway", "instance_idle",
"zombie_snapshots", "database_idle", "s3_inactive",
"zombie_snapshots", "database_idle", "s3_inactive", "unused_access_key",
"empty_roles", "tag_resources", "cost_usage_reports"]:
source = policy_type
if Utils.equal_ignore_case(policy_type, self._public_cloud_name):
Expand Down
3 changes: 2 additions & 1 deletion cloud_governance/policy/aws/monthly_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def policy_description(self, policy_name: str):
'ip_unattached': 'Delete all the elastic_ips that are unused',
'unused_nat_gateway': ' Delete all unused nat gateways',
'zombie_snapshots': 'Delete all the snapshots which the AMI does not use',
's3_inactive': 'Delete the empty buckets which don’t have any content.',
's3_inactive': 'Delete the empty buckets which don’t have any content',
'unused_access_key': 'Deactivate user access keys that are still active but have not been used',
'empty_roles': 'Delete the empty role which does\'t have any policies',
'zombie_cluster_resource': 'Delete up the cluster resources which are not deleted while cleaning the cluster'
}
Expand Down
55 changes: 55 additions & 0 deletions cloud_governance/policy/aws/unused_access_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from cloud_governance.common.utils.configs import UNUSED_ACCESS_KEY_DAYS, UNUSED_ACCESS_KEY_MAX_DAY
from cloud_governance.policy.helpers.aws.aws_policy_operations import AWSPolicyOperations


class UnusedAccessKey(AWSPolicyOperations):
RESOURCE_ACTION = "DeActivate"

def __init__(self):
super().__init__()

def run_policy_operations(self):
"""
This method returns a list of users with at least one active access key whose last used date is greater than UNUSED_ACCESS_KEY_DAYS
:return:
:rtype:
"""
unused_access_keys = []
iam_users_access_keys = self._get_iam_users_access_keys()
for username, user_data in iam_users_access_keys.items():
for access_key_label, access_key_data in user_data.items():
if 'access key' in access_key_label.lower():
last_activity_days = access_key_data['last_activity_days']
age_days = access_key_data['age_days']
# if access key last_activity_days is "N/A", use age_days
if last_activity_days == "N/A":
last_activity_days = age_days
region = user_data['region']
user_name = username
tags = user_data.get('Tags', [])
cleanup_result = False
cleanup_days = 0
if int(last_activity_days) >= UNUSED_ACCESS_KEY_DAYS and self._has_active_access_keys(user_name, access_key_label) and self.get_skip_policy_value(tags=tags) not in ('NOTDELETE', 'SKIP'):
cleanup_days = self.get_clean_up_days_count(tags=tags)
cleanup_result = self.verify_and_delete_resource(resource_id=user_name, tags=tags,
clean_up_days=cleanup_days, access_key_label=access_key_label)
resource_data = self._get_es_schema(resource_id=user_name,
user=self.get_tag_name_from_tags(tags=tags, tag_name='User'),
skip_policy=self.get_skip_policy_value(tags=tags),
cleanup_days=cleanup_days,
dry_run=self._dry_run,
name=user_name,
region=region,
cleanup_result=str(cleanup_result),
resource_action=self.RESOURCE_ACTION,
cloud_name=self._cloud_name,
resource_type='UnusedAccessKey',
resource_state='Active',
age_days=age_days,
last_activity_days=last_activity_days,
unit_price=0)
unused_access_keys.append(resource_data)
if not cleanup_result:
self.update_resource_day_count_tag(resource_id=user_name, cleanup_days=cleanup_days, tags=tags)

return unused_access_keys
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def __update_delete_days(self, policy_es_data: list):
if days >= days_to_take_action:
delete_date = datetime.utcnow().date().__str__()
alert_user = True
if record.get('policy') in ['empty_roles', 's3_inactive']:
# Cross region policies
if record.get('policy') in ['empty_roles', 's3_inactive', 'unused_access_key']:
record['RegionName'] = 'us-east-1'
if Utils.equal_ignore_case(dry_run, 'yes'):
record['DeleteDate'] = 'dry_run=yes'
Expand Down
6 changes: 3 additions & 3 deletions cloud_governance/policy/helpers/abstract_policy_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def get_skip_policy_value(self, tags: Union[list, dict]) -> str:
return 'NA'

@abstractmethod
def _delete_resource(self, resource_id: str):
def _delete_resource(self, resource_id: str, **kwargs):
"""
This method deletes the resource
:param resource_id:
Expand Down Expand Up @@ -144,7 +144,7 @@ def verify_and_delete_resource(self, resource_id: str, tags: Union[list, dict],
:rtype:
"""
if self._resource_id == resource_id and self._force_delete and self._dry_run == 'no':
self._delete_resource(resource_id=resource_id)
self._delete_resource(resource_id=resource_id, **kwargs)
return True
if not days_to_delete_resource:
days_to_delete_resource = self._days_to_take_action
Expand All @@ -157,7 +157,7 @@ def verify_and_delete_resource(self, resource_id: str, tags: Union[list, dict],
if clean_up_days >= days_to_delete_resource:
if self._dry_run == 'no':
if self.get_skip_policy_value(tags=tags) not in ('NOTDELETE', 'SKIP'):
self._delete_resource(resource_id=resource_id)
self._delete_resource(resource_id=resource_id, **kwargs)
cleanup_resources = True
return cleanup_resources

Expand Down
Loading