diff --git a/.gitignore b/.gitignore
index d6a70a72..d075f63a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -221,3 +221,6 @@ empty_test_environment_variables.py
cloudsensei/.env.txt
.vscode
env.yaml
+
+# macOS
+.DS_Store
diff --git a/cloud_governance/common/clouds/aws/iam/iam_operations.py b/cloud_governance/common/clouds/aws/iam/iam_operations.py
index 351456f5..d5685a56 100644
--- a/cloud_governance/common/clouds/aws/iam/iam_operations.py
+++ b/cloud_governance/common/clouds/aws/iam/iam_operations.py
@@ -175,6 +175,69 @@ def tag_user(self, user_name: str, tags: list):
except Exception as err:
raise err
+ def untag_user(self, user_name: str, tag_keys: list):
+ """
+ Removes the given tag keys from the IAM user.
+ :param user_name: The name of the IAM user.
+ :param tag_keys: List of tag key names to remove (e.g. ['UnusedAccessKey1InactiveDate']).
+ """
+ if not tag_keys:
+ return
+ try:
+ self.iam_client.untag_user(UserName=user_name, TagKeys=tag_keys)
+ logger.info(f"Untagged user '{user_name}': {tag_keys}")
+ except Exception as err:
+ logger.error(f"Failed to untag user '{user_name}': {err}")
+ raise err
+
+ def delete_user_access_key(
+ self,
+ username: str,
+ access_key_label: str,
+ remove_inactive_tag: bool = True,
+ access_key_id: str = None,
+ ):
+ """
+ Deletes the specified access key for the given IAM user. Optionally removes the
+ UnusedAccessKeyNInactiveDate tag (only when this policy had set it by deactivating the key).
+ :param username: IAM user name.
+ :param access_key_label: "Access key 1" or "Access key 2" (case-insensitive).
+ :param remove_inactive_tag: If True, remove UnusedAccessKeyNInactiveDate after delete.
+ Set False when the key was not deactivated by this policy (e.g. manually deactivated).
+ :param access_key_id: Optional. When provided, delete this key by ID instead of resolving by
+ label.
+ """
+ access_key_label_lower = (access_key_label or '').lower()
+ if not access_key_label_lower or 'access key' not in access_key_label_lower:
+ logger.warning("Invalid access key label for deletion.")
+ return
+ key_num = access_key_label_lower.split()[-1]
+ inactive_tag_key = f"UnusedAccessKey{key_num}InactiveDate"
+
+ if not access_key_id:
+ try:
+ access_keys = self.iam_client.list_access_keys(UserName=username)['AccessKeyMetadata']
+ except Exception as e:
+ logger.error(f"Failed to list access keys for user '{username}': {e}")
+ raise
+ access_keys.sort(key=lambda k: k['CreateDate'])
+ idx = self.ACCESS_KEY_LABEL_MAP.get(access_key_label_lower)
+ if idx is None or idx >= len(access_keys):
+ logger.warning(f"Access key label '{access_key_label}' not found for user '{username}'")
+ return
+ access_key_id = access_keys[idx]['AccessKeyId']
+
+ try:
+ self.iam_client.delete_access_key(UserName=username, AccessKeyId=access_key_id)
+ logger.info(f"Deleted access key '{access_key_id}' for user '{username}'")
+ except Exception as e:
+ logger.error(f"Failed to delete access key '{access_key_id}' for user '{username}': {e}")
+ raise
+ tag_keys_to_remove = [access_key_id]
+ if remove_inactive_tag:
+ tag_keys_to_remove.append(inactive_tag_key)
+ self.untag_user(username, tag_keys_to_remove)
+
def get_iam_users_access_keys(self):
"""
Retrieves IAM users and summarizes:
@@ -208,8 +271,8 @@ def get_iam_users_access_keys(self):
for user in page['Users']:
username = user['UserName']
result[username] = {}
- # Access keys
access_keys = self.iam_client.list_access_keys(UserName=username)['AccessKeyMetadata']
+ access_keys = sorted(access_keys, key=lambda k: k['CreateDate'])
for idx, key in enumerate(access_keys, start=1):
label = f"Access key {idx}"
status = key['Status'].lower()
@@ -229,7 +292,13 @@ def get_iam_users_access_keys(self):
logger.error(f"Failed to get last used date for access key")
last_used_days = None
- result[username][label] = {'label': label, 'status': status, 'age_days': age_days, 'last_activity_days': last_used_days}
+ result[username][label] = {
+ 'label': label,
+ 'status': status,
+ 'age_days': age_days,
+ 'last_activity_days': last_used_days,
+ 'access_key_id': key['AccessKeyId'],
+ }
# Tags as list of dicts
try:
@@ -312,6 +381,11 @@ def deactivate_user_access_key(self, username: str, **kwargs):
Status='Inactive'
)
logger.info(f"Access key '{access_key_id}' deactivated for user '{username}'")
+ # Tag the user so we only delete keys we deactivated (after DELETE_ACCESS_KEY_DAYS)
+ key_num = access_key_label.split()[-1]
+ inactive_tag_key = f"UnusedAccessKey{key_num}InactiveDate"
+ inactive_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+ self.tag_user(username, [{'Key': inactive_tag_key, 'Value': inactive_date}])
except Exception as e:
logger.error(f"Failed to deactivate access key '{access_key_id}' for user '{username}': {e}")
else:
diff --git a/cloud_governance/common/mails/mail_message.py b/cloud_governance/common/mails/mail_message.py
index ec31e94a..0db503ab 100644
--- a/cloud_governance/common/mails/mail_message.py
+++ b/cloud_governance/common/mails/mail_message.py
@@ -3,6 +3,7 @@
from jinja2 import Environment, FileSystemLoader
from cloud_governance.common.ldap.ldap_search import LdapSearch
+from cloud_governance.common.utils.configs import DELETE_ACCESS_KEY_DAYS
from cloud_governance.main.environment_variables import environment_variables
@@ -105,6 +106,27 @@ def iam_user_add_tags(self, name: str, user: str, spreadsheet_id: str):
Cloud-governance Team""".strip()
return subject, body
+ def _get_unused_access_key_alert_message(self):
+ """
+ Returns the custom message for unused_access_key policy in aggregated alerts.
+ """
+ return (
+ f"For the IAM access keys listed below: please rotate these access keys before they are "
+ f"automatically deactivated. Keys will be deactivated after the grace period if no action is taken. "
+ f"Keys older than {DELETE_ACCESS_KEY_DAYS} days (including deactivated ones) will be permanently deleted. "
+ "To avoid deactivation, create a new access key and update your applications, then deactivate or delete the old key."
+ )
+
+ def _get_delete_access_key_alert_message(self):
+ """
+ Returns the custom message for delete_access_key policy in aggregated alerts.
+ """
+ return (
+ f"Your IAM access key age has exceeded {DELETE_ACCESS_KEY_DAYS} days. "
+ "The key(s) listed below are in the deletion grace period and will be permanently deleted "
+ "if no action is taken. Please rotate or remove these keys before the grace period ends."
+ )
+
def aws_user_over_usage_cost(self, user: str, usage_cost: int, name: str, user_usage: int):
"""
This method send subject, body to over usage cost
@@ -517,5 +539,18 @@ def get_policy_alert_message(self, policy_data: list, user: str = ''):
template_loader = self.env_loader.get_template('policy_alert_agg_message.j2')
columns = ['User', 'PublicCloud', 'policy', 'RegionName', 'ResourceId', 'Name', 'DeleteDate']
context = {'records': policy_data, 'columns': columns, 'User': user, 'account': self.account, 'cloud_name': self.__public_cloud_name}
+ has_unused_access_key = any(
+ (r.get('policy') or r.get('Policy') or '').lower() == 'unused_access_key'
+ for r in (policy_data or [])
+ )
+ has_delete_access_key = any(
+ (r.get('policy') or r.get('Policy') or '').lower() == 'delete_access_key'
+ for r in (policy_data or [])
+ )
+ if has_unused_access_key:
+ context['unused_access_key_message'] = self._get_unused_access_key_alert_message()
+ if has_delete_access_key:
+ context['delete_access_key_days'] = DELETE_ACCESS_KEY_DAYS
+ context['delete_access_key_message'] = self._get_delete_access_key_alert_message()
body = template_loader.render(context)
return subject, body
diff --git a/cloud_governance/common/mails/templates/policy_alert_agg_message.j2 b/cloud_governance/common/mails/templates/policy_alert_agg_message.j2
index 2c749d5c..ae962de7 100644
--- a/cloud_governance/common/mails/templates/policy_alert_agg_message.j2
+++ b/cloud_governance/common/mails/templates/policy_alert_agg_message.j2
@@ -44,6 +44,12 @@
You can find below your unused resources in the {{ cloud_name }} account ({{ account }}).
If you want to keep them, please add "Policy=Not_Delete" or "Policy=skip" tag for each resource
+ {% if unused_access_key_message is defined and unused_access_key_message %}
+
Unused access keys: {{ unused_access_key_message }}
+ {% endif %}
+ {% if delete_access_key_message is defined and delete_access_key_message %}
+
Access keys pending deletion (inactive, key age > {{ delete_access_key_days }} days): {{ delete_access_key_message }}
+ {% endif %}
diff --git a/cloud_governance/common/utils/configs.py b/cloud_governance/common/utils/configs.py
index 2310efdf..d982d85c 100644
--- a/cloud_governance/common/utils/configs.py
+++ b/cloud_governance/common/utils/configs.py
@@ -23,7 +23,7 @@
CLOUDWATCH_METRICS_AVAILABLE_DAYS = 14
AWS_DEFAULT_GLOBAL_REGION = 'us-east-1'
UNUSED_ACCESS_KEY_DAYS = 90
-UNUSED_ACCESS_KEY_MAX_DAY = 1000
+DELETE_ACCESS_KEY_DAYS = 365
# X86 to Graviton
GRAVITON_MAPPINGS = {
diff --git a/cloud_governance/main/environment_variables.py b/cloud_governance/main/environment_variables.py
index 43f07871..f4cd6b72 100644
--- a/cloud_governance/main/environment_variables.py
+++ b/cloud_governance/main/environment_variables.py
@@ -98,7 +98,7 @@ def __init__(self):
'ip_unattached', 'unused_nat_gateway',
'instance_idle',
'ec2_stop', 'ebs_in_use', 'database_idle',
- 's3_inactive', 'unused_access_key',
+ 's3_inactive', 'unused_access_key', 'delete_access_key',
'empty_roles',
'zombie_snapshots', 'skipped_resources',
'monthly_report', 'optimize_resources_report']
@@ -121,6 +121,7 @@ def __init__(self):
self._environment_variables_dict['user_tags'] = EnvironmentVariables.get_env('user_tags', '')
self._environment_variables_dict['user_tag_operation'] = EnvironmentVariables.get_env('user_tag_operation', '')
self._environment_variables_dict['username'] = EnvironmentVariables.get_env('username', '')
+ self._environment_variables_dict['DELETE_INACTIVE_KEYS_WITHOUT_TAG'] = EnvironmentVariables.get_env('DELETE_INACTIVE_KEYS_WITHOUT_TAG', 'true').lower() == 'true'
self._environment_variables_dict['remove_tags'] = EnvironmentVariables.get_env('remove_tags', '')
self._environment_variables_dict['resource'] = EnvironmentVariables.get_env('resource', '')
self._environment_variables_dict['cluster_tag'] = EnvironmentVariables.get_env('cluster_tag', '')
diff --git a/cloud_governance/main/main_oerations/main_operations.py b/cloud_governance/main/main_oerations/main_operations.py
index b2ebbf48..bebc9d8d 100644
--- a/cloud_governance/main/main_oerations/main_operations.py
+++ b/cloud_governance/main/main_oerations/main_operations.py
@@ -43,7 +43,7 @@ def run(self):
if self._policy in policies and self._policy in ["instance_run", "unattached_volume", "cluster_run",
"ip_unattached", "unused_nat_gateway", "instance_idle",
"zombie_snapshots", "database_idle", "s3_inactive", "unused_access_key",
- "empty_roles", "tag_resources", "cost_usage_reports"]:
+ "delete_access_key", "empty_roles", "tag_resources", "cost_usage_reports"]:
source = policy_type
if Utils.equal_ignore_case(policy_type, self._public_cloud_name):
source = ''
diff --git a/cloud_governance/policy/aws/delete_access_key.py b/cloud_governance/policy/aws/delete_access_key.py
new file mode 100644
index 00000000..5c11165a
--- /dev/null
+++ b/cloud_governance/policy/aws/delete_access_key.py
@@ -0,0 +1,71 @@
+"""
+Policy to delete inactive IAM access keys older than DELETE_ACCESS_KEY_DAYS.
+"""
+from cloud_governance.common.utils.configs import DELETE_ACCESS_KEY_DAYS
+from cloud_governance.policy.helpers.aws.aws_policy_operations import AWSPolicyOperations
+
+
+class DeleteAccessKey(AWSPolicyOperations):
+ RESOURCE_ACTION = "Delete"
+
+ def run_policy_operations(self):
+ """
+ For inactive keys with age > DELETE_ACCESS_KEY_DAYS: apply a grace period
+ (deletion_grace_days = age_days - DELETE_ACCESS_KEY_DAYS, capped at DAYS_TO_TAKE_ACTION). During
+ grace period write to ES with cleanup_days 1..7 so send_aggregated_alerts sends
+ reminder emails. After grace period, delete the key.
+ """
+ result = []
+ days_to_take_action = int(self._days_to_take_action)
+ iam_users_access_keys = self._get_iam_users_access_keys()
+
+ for username, user_data in iam_users_access_keys.items():
+ tags = user_data.get('tags', user_data.get('Tags', []))
+ region = user_data['region']
+ user_name = username
+
+ for access_key_label, access_key_data in user_data.items():
+ if 'access key' not in access_key_label.lower():
+ continue
+ age_days = access_key_data.get('age_days')
+ status = (access_key_data.get('status') or '').lower()
+ if age_days is None or status != 'inactive':
+ continue
+ age_days = int(age_days)
+
+ key_num = access_key_label.split()[-1]
+ inactive_tag_key = f"UnusedAccessKey{key_num}InactiveDate"
+ inactive_date_str = self.get_tag_name_from_tags(tags=tags, tag_name=inactive_tag_key)
+ delete_all_inactive = self._environment_variables_dict.get('DELETE_INACTIVE_KEYS_WITHOUT_TAG', False)
+ if not (age_days > DELETE_ACCESS_KEY_DAYS and (inactive_date_str or delete_all_inactive)):
+ continue
+
+ deletion_grace_days = min(age_days - DELETE_ACCESS_KEY_DAYS, days_to_take_action)
+ cleanup_result = self.verify_and_delete_resource(
+ resource_id=user_name,
+ tags=tags,
+ clean_up_days=deletion_grace_days,
+ access_key_label=access_key_label,
+ access_key_id=access_key_data.get('access_key_id'),
+ remove_inactive_tag=bool(inactive_date_str),
+ )
+ resource_data = self._get_es_schema(
+ resource_id=user_name,
+ user=self.get_tag_name_from_tags(tags=tags, tag_name='User'),
+ skip_policy=self.get_skip_policy_value(tags=tags),
+ cleanup_days=deletion_grace_days,
+ dry_run=self._dry_run,
+ name=user_name,
+ region=region,
+ cleanup_result=str(cleanup_result),
+ resource_action=self.RESOURCE_ACTION,
+ cloud_name=self._cloud_name,
+ resource_type='UnusedAccessKey',
+ resource_state='Inactive',
+ age_days=age_days,
+ last_activity_days=access_key_data.get('last_activity_days'),
+ unit_price=0,
+ )
+ result.append(resource_data)
+
+ return result
diff --git a/cloud_governance/policy/aws/unused_access_key.py b/cloud_governance/policy/aws/unused_access_key.py
index e8803110..eb45ab72 100644
--- a/cloud_governance/policy/aws/unused_access_key.py
+++ b/cloud_governance/policy/aws/unused_access_key.py
@@ -1,52 +1,69 @@
-from cloud_governance.common.utils.configs import UNUSED_ACCESS_KEY_DAYS, UNUSED_ACCESS_KEY_MAX_DAY
+from cloud_governance.common.utils.configs import UNUSED_ACCESS_KEY_DAYS
from cloud_governance.policy.helpers.aws.aws_policy_operations import AWSPolicyOperations
class UnusedAccessKey(AWSPolicyOperations):
RESOURCE_ACTION = "DeActivate"
- def __init__(self):
- super().__init__()
-
def run_policy_operations(self):
"""
- This method returns a list of users with at least one active access key whose last used date is greater than UNUSED_ACCESS_KEY_DAYS
- :return:
- :rtype:
+ For key age >= UNUSED_ACCESS_KEY_DAYS (e.g. 90): apply a grace period (deactivation_grace_days
+ = age_days - UNUSED_ACCESS_KEY_DAYS, capped at DAYS_TO_TAKE_ACTION). During grace period
+ write to ES with cleanup_days 1..7 so send_aggregated_alerts sends reminder emails. After
+ grace period, deactivate the key.
"""
unused_access_keys = []
+ days_to_take_action = int(self._days_to_take_action)
iam_users_access_keys = self._get_iam_users_access_keys()
for username, user_data in iam_users_access_keys.items():
+ tags = user_data.get('tags', user_data.get('Tags', []))
+ region = user_data['region']
+ user_name = username
+
for access_key_label, access_key_data in user_data.items():
- if 'access key' in access_key_label.lower():
- last_activity_days = access_key_data['last_activity_days']
- age_days = access_key_data['age_days']
- region = user_data['region']
- user_name = username
- tags = user_data.get('Tags', [])
- cleanup_result = False
- cleanup_days = 0
- if last_activity_days and int(last_activity_days) >= UNUSED_ACCESS_KEY_DAYS and self._has_active_access_keys(user_name, access_key_label) and self.get_skip_policy_value(tags=tags) not in ('NOTDELETE', 'SKIP'):
- cleanup_days = self.get_clean_up_days_count(tags=tags)
- cleanup_result = self.verify_and_delete_resource(resource_id=user_name, tags=tags,
- clean_up_days=cleanup_days, access_key_label=access_key_label)
- resource_data = self._get_es_schema(resource_id=user_name,
- user=self.get_tag_name_from_tags(tags=tags, tag_name='User'),
- skip_policy=self.get_skip_policy_value(tags=tags),
- cleanup_days=cleanup_days,
- dry_run=self._dry_run,
- name=user_name,
- region=region,
- cleanup_result=str(cleanup_result),
- resource_action=self.RESOURCE_ACTION,
- cloud_name=self._cloud_name,
- resource_type='UnusedAccessKey',
- resource_state='Active',
- age_days=age_days,
- last_activity_days=last_activity_days,
- unit_price=0)
- unused_access_keys.append(resource_data)
- if not cleanup_result:
- self.update_resource_day_count_tag(resource_id=user_name, cleanup_days=cleanup_days, tags=tags)
+ if 'access key' not in access_key_label.lower():
+ continue
+ last_activity_days = access_key_data.get('last_activity_days')
+ age_days = access_key_data.get('age_days')
+ status = (access_key_data.get('status') or '').lower()
+ if age_days is None:
+ continue
+ age_days = int(age_days)
+
+ if status == 'inactive':
+ continue
+
+ if not self._has_active_access_keys(user_name, access_key_label):
+ continue
+ if self.get_skip_policy_value(tags=tags) in ('NOTDELETE', 'SKIP'):
+ continue
+
+ if age_days < UNUSED_ACCESS_KEY_DAYS:
+ continue
+ deactivation_grace_days = min(age_days - UNUSED_ACCESS_KEY_DAYS, days_to_take_action)
+ cleanup_result = self.verify_and_delete_resource(
+ resource_id=user_name,
+ tags=tags,
+ clean_up_days=deactivation_grace_days,
+ access_key_label=access_key_label,
+ )
+ resource_data = self._get_es_schema(
+ resource_id=user_name,
+ user=self.get_tag_name_from_tags(tags=tags, tag_name='User'),
+ skip_policy=self.get_skip_policy_value(tags=tags),
+ cleanup_days=deactivation_grace_days,
+ dry_run=self._dry_run,
+ name=user_name,
+ region=region,
+ cleanup_result=str(cleanup_result),
+ resource_action=self.RESOURCE_ACTION,
+ cloud_name=self._cloud_name,
+ resource_type='UnusedAccessKey',
+ resource_state='Active',
+ age_days=age_days,
+ last_activity_days=last_activity_days,
+ unit_price=0,
+ )
+ unused_access_keys.append(resource_data)
return unused_access_keys
diff --git a/cloud_governance/policy/common_policies/send_aggregated_alerts.py b/cloud_governance/policy/common_policies/send_aggregated_alerts.py
index acbf7764..1bc2fa07 100644
--- a/cloud_governance/policy/common_policies/send_aggregated_alerts.py
+++ b/cloud_governance/policy/common_policies/send_aggregated_alerts.py
@@ -80,20 +80,24 @@ def __get_es_data(self):
def __remove_duplicates(self, policy_es_data: list):
"""
- This method removes the duplicate data
- :return:
- :rtype:
+ This method removes the duplicate data.
+ For unused_access_key and delete_access_key, one user can have two keys, so we do not
+ deduplicate by ResourceId (username); for all other policies we keep one record per ResourceId.
"""
- if policy_es_data:
- df = pandas.DataFrame(policy_es_data)
- policy_col = 'policy' if 'policy' in df.columns else 'Policy'
- sort_col = policy_col if policy_col in df.columns else df.columns[0]
- df.sort_values(inplace=True, by=[sort_col])
- # Avoid fillna(value='') on numeric columns (pandas 2.x+ raises LossySetitemError)
- df = df.astype(object).fillna('')
- df.drop_duplicates(subset='ResourceId', inplace=True)
- return df.to_dict(orient="records")
- return policy_es_data
+ if not policy_es_data:
+ return policy_es_data
+ df = pandas.DataFrame(policy_es_data)
+ policy_col = 'policy' if 'policy' in df.columns else 'Policy'
+ sort_col = policy_col if policy_col in df.columns else df.columns[0]
+ df.sort_values(inplace=True, by=[sort_col])
+ # Avoid fillna(value='') on numeric columns (pandas 2.x+ raises LossySetitemError)
+ df = df.astype(object).fillna('')
+ access_key_policies = ['unused_access_key', 'delete_access_key']
+ mask = df[policy_col].str.lower().isin(access_key_policies)
+ df_other = df[~mask].drop_duplicates(subset='ResourceId', keep='first')
+ df_access_keys = df[mask]
+ df = pandas.concat([df_other, df_access_keys], ignore_index=True)
+ return df.to_dict(orient="records")
def __group_by_policy(self, policy_data: list):
"""
@@ -168,7 +172,7 @@ def __update_delete_days(self, policy_es_data: list):
delete_date = datetime.utcnow().date().__str__()
alert_user = True
# Cross region policies
- if record.get('policy') in ['empty_roles', 's3_inactive', 'unused_access_key']:
+ if record.get('policy') in ['empty_roles', 's3_inactive', 'unused_access_key', 'delete_access_key']:
record['RegionName'] = 'us-east-1'
if Utils.equal_ignore_case(dry_run, 'yes'):
record['DeleteDate'] = 'dry_run=yes'
diff --git a/cloud_governance/policy/helpers/aws/aws_policy_operations.py b/cloud_governance/policy/helpers/aws/aws_policy_operations.py
index 3d7af264..7b84ee9a 100644
--- a/cloud_governance/policy/helpers/aws/aws_policy_operations.py
+++ b/cloud_governance/policy/helpers/aws/aws_policy_operations.py
@@ -59,7 +59,16 @@ def _delete_resource(self, resource_id: str, **kwargs):
if self._policy == 's3_inactive':
self._s3_client.delete_bucket(Bucket=resource_id)
elif self._policy == 'unused_access_key':
+ action = "deactivated"
self._iam_operations.deactivate_user_access_key(username=resource_id, **kwargs)
+ elif self._policy == 'delete_access_key':
+ action = "deleted"
+ self._delete_inactive_access_key(
+ user_name=resource_id,
+ access_key_label=kwargs.get('access_key_label'),
+ remove_inactive_tag=kwargs.get('remove_inactive_tag', True),
+ access_key_id=kwargs.get('access_key_id'),
+ )
elif self._policy == 'empty_roles':
response = self._iam_operations.delete_role(role_name=resource_id)
elif self._policy == 'unattached_volume':
@@ -81,6 +90,27 @@ def _delete_resource(self, resource_id: str, **kwargs):
logger.error(f'Exception raised: {err}: {resource_id}')
raise err
+ def _delete_inactive_access_key(
+ self,
+ user_name: str,
+ access_key_label: str,
+ remove_inactive_tag: bool = True,
+ access_key_id: str = None,
+ ):
+ """
+ Deletes an access key that was previously deactivated by this policy (and tagged with
+ UnusedAccessKeyNInactiveDate). Used when the key has been inactive for more than
+ DELETE_ACCESS_KEY_DAYS.
+ :param remove_inactive_tag: If True, remove the UnusedAccessKeyNInactiveDate tag after delete.
+ Pass False when the key was not deactivated by this policy (e.g. DELETE_INACTIVE_KEYS_WITHOUT_TAG).
+ """
+ self._iam_operations.delete_user_access_key(
+ username=user_name,
+ access_key_label=access_key_label,
+ remove_inactive_tag=remove_inactive_tag,
+ access_key_id=access_key_id,
+ )
+
def __remove_tag_key_aws(self, tags: list):
"""
This method returns the tags that does not contain key startswith aws:
@@ -149,7 +179,7 @@ def update_resource_tags(self, tags: list, resource_id: str):
try:
if self._policy == 's3_inactive':
self._s3_client.put_bucket_tagging(Bucket=resource_id, Tagging={'TagSet': tags})
- elif self._policy == 'unused_access_key':
+ elif self._policy in ('unused_access_key', 'delete_access_key'):
self._iam_operations.tag_user(user_name=resource_id, tags=tags)
elif self._policy == 'empty_roles':
self._iam_operations.tag_role(role_name=resource_id, tags=tags)
diff --git a/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceDeletePolicy.json b/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceDeletePolicy.json
index a954dd3a..10122e2a 100644
--- a/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceDeletePolicy.json
+++ b/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceDeletePolicy.json
@@ -118,6 +118,7 @@
"Sid": "IAM",
"Effect": "Allow",
"Action": [
+ "iam:DeleteAccessKey",
"iam:DeleteInstanceProfile",
"iam:DeletePolicy",
"iam:DeleteRole",
@@ -136,6 +137,7 @@
"iam:ListRoles",
"iam:ListUserPolicies",
"iam:ListUsers",
+ "iam:ListUserTags",
"iam:RemoveRoleFromInstanceProfile",
"iam:TagRole",
"iam:TagUser",
diff --git a/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceReadPolicy.json b/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceReadPolicy.json
index 65d2f3ee..a732d617 100644
--- a/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceReadPolicy.json
+++ b/iam/clouds/aws/CloudGovernanceInfra/CloudGovernanceReadPolicy.json
@@ -95,6 +95,7 @@
"iam:ListRoles",
"iam:ListUserPolicies",
"iam:ListUsers",
+ "iam:ListUserTags",
"iam:TagRole",
"iam:TagUser",
"iam:UntagRole",
diff --git a/jenkins/clouds/aws/daily/policies/Jenkinsfile b/jenkins/clouds/aws/daily/policies/Jenkinsfile
index b6f677ad..c475de47 100644
--- a/jenkins/clouds/aws/daily/policies/Jenkinsfile
+++ b/jenkins/clouds/aws/daily/policies/Jenkinsfile
@@ -1,4 +1,4 @@
-accounts_list = ['perf-dept' : "", 'perfscale': "", 'psap': ""]
+accounts_list = ['perf-dept': '', 'perfscale': '', 'psap': '']
pipeline {
options {
disableConcurrentBuilds()
diff --git a/jenkins/clouds/aws/daily/policies/run_policies.py b/jenkins/clouds/aws/daily/policies/run_policies.py
index 1d968a3c..11c29522 100644
--- a/jenkins/clouds/aws/daily/policies/run_policies.py
+++ b/jenkins/clouds/aws/daily/policies/run_policies.py
@@ -47,7 +47,7 @@ def get_policies(file_type: str = '.py', exclude_policies: list = None):
exclude_global_cost_policies = ['cost_explorer', 'optimize_resources_report', 'monthly_report', 'cost_over_usage',
'skipped_resources', 'cost_explorer_payer_billings', 'cost_billing_reports',
'spot_savings_analysis', 'yearly_savings_report']
-GLOBAL_POLICIES = ["s3_inactive", "empty_roles", "unused_access_key"]
+GLOBAL_POLICIES = ["s3_inactive", "empty_roles", "unused_access_key", "delete_access_key"]
available_policies = get_policies(exclude_policies=exclude_global_cost_policies)
@@ -104,10 +104,10 @@ def run_policies(policies: list, dry_run: str = 'yes'):
for policy in policies:
container_env_dict.update({"AWS_DEFAULT_REGION": region, 'policy': policy})
container_cmd = ''
- if policy in ('empty_roles', 's3_inactive', 'unused_access_key') and region == 'us-east-1':
+ if policy in ('empty_roles', 's3_inactive', 'unused_access_key', 'delete_access_key') and region == 'us-east-1':
container_cmd = get_container_cmd(container_env_dict)
else:
- if policy not in ('empty_roles', 's3_inactive', 'unused_access_key'):
+ if policy not in ('empty_roles', 's3_inactive', 'unused_access_key', 'delete_access_key'):
container_cmd = get_container_cmd(container_env_dict)
if container_cmd:
run_cmd(container_cmd)
diff --git a/jenkins/tenant/aws/common/run_policies.py b/jenkins/tenant/aws/common/run_policies.py
index ff60bf1c..39aa55f1 100644
--- a/jenkins/tenant/aws/common/run_policies.py
+++ b/jenkins/tenant/aws/common/run_policies.py
@@ -24,7 +24,8 @@ def get_policies(file_type: str = '.py', exclude_policies: list = None):
exclude_policies = ['cost_explorer', 'optimize_resources_report', 'monthly_report', 'cost_over_usage',
'skipped_resources', 'cost_explorer_payer_billings', 'cost_billing_reports',
- 'spot_savings_analysis', 'yearly_savings_report']
+ 'spot_savings_analysis', 'yearly_savings_report',
+ 'delete_access_key']
available_policies = get_policies(exclude_policies=exclude_policies)
QUAY_CLOUD_GOVERNANCE_REPOSITORY = os.environ.get('QUAY_CLOUD_GOVERNANCE_REPOSITORY',
'quay.io/cloud-governance/cloud-governance')
@@ -77,6 +78,8 @@ def get_container_cmd(env_dict: dict):
policies_in_action = os.environ.get('POLICIES_IN_ACTION', [])
if isinstance(policies_in_action, str):
policies_in_action = literal_eval(policies_in_action)
+# POLICIES_IN_ACTION cannot run policies outside available_policies (e.g. tenant-excluded delete_access_key).
+policies_in_action = [p for p in policies_in_action if p in available_policies]
policies_not_action = list(set(available_policies) - set(policies_in_action))
regions = ['us-east-1', 'us-east-2', 'us-west-1', 'us-west-2', 'ap-south-1', 'eu-north-1', 'eu-west-3', 'eu-west-2',
diff --git a/setup.py b/setup.py
index 578790d9..d3309af1 100644
--- a/setup.py
+++ b/setup.py
@@ -72,7 +72,7 @@
'python-ldap==3.4.2', # prerequisite: sudo dnf install -y python39-devel openldap-devel gcc
'requests==2.32.2', # rest api & lambda
'retry==0.9.2',
- 'setuptools', # Requires for python3.12
+ 'setuptools', # CI: setuptools<82 for IBM sdist builds on 3.9
'SoftLayer==6.0.0', # IBM SoftLayer
'sphinx-rtd-theme==1.0.0', # readthedocs
'sphinx==5.0.0', # readthedocs
diff --git a/tests/unittest/cloud_governance/policy/aws/test_delete_access_key.py b/tests/unittest/cloud_governance/policy/aws/test_delete_access_key.py
new file mode 100644
index 00000000..b2875abd
--- /dev/null
+++ b/tests/unittest/cloud_governance/policy/aws/test_delete_access_key.py
@@ -0,0 +1,195 @@
+"""
+Unit tests for cloud_governance.policy.aws.delete_access_key.DeleteAccessKey.
+"""
+from unittest.mock import patch
+
+from moto import mock_ec2, mock_s3, mock_iam
+
+from cloud_governance.main.environment_variables import environment_variables
+from cloud_governance.policy.aws.delete_access_key import DeleteAccessKey
+from cloud_governance.common.utils.configs import DELETE_ACCESS_KEY_DAYS
+from tests.unittest.configs import DRY_RUN_YES, DRY_RUN_NO, AWS_DEFAULT_REGION, TEST_USER_NAME
+
+
+def _mock_iam_users_inactive_key(
+ age_days: int,
+ last_activity_days: int = 130,
+ with_inactive_tag: bool = True,
+ access_key_id: str = 'AKIAIOSFODNN7EXAMPLE',
+):
+ """Build mock IAM user with one inactive access key, as returned by get_iam_users_access_keys()."""
+ tags = [{'Key': 'User', 'Value': TEST_USER_NAME}]
+ if with_inactive_tag:
+ tags.append({'Key': 'UnusedAccessKey1InactiveDate', 'Value': '2024-01-15'})
+ return {
+ TEST_USER_NAME: {
+ 'Access key 1': {
+ 'label': 'Access key 1',
+ 'status': 'Inactive',
+ 'age_days': age_days,
+ 'last_activity_days': last_activity_days,
+ 'access_key_id': access_key_id,
+ },
+ 'tags': tags,
+ 'region': AWS_DEFAULT_REGION,
+ 'ResourceId': 'AIDAEXAMPLE',
+ }
+ }
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_skips_active_keys():
+ """Only inactive keys are considered; active keys are skipped."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_inactive_key(age_days=DELETE_ACCESS_KEY_DAYS + 10, with_inactive_tag=True)
+ mock_data[TEST_USER_NAME]['Access key 1']['status'] = 'Active'
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ policy = DeleteAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_skips_when_age_at_or_below_threshold():
+ """Inactive keys with age_days <= DELETE_ACCESS_KEY_DAYS are skipped."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_inactive_key(age_days=DELETE_ACCESS_KEY_DAYS, with_inactive_tag=True)
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ policy = DeleteAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_includes_inactive_old_key_with_tag():
+ """Inactive key with age > DELETE_ACCESS_KEY_DAYS and UnusedAccessKey1InactiveDate tag is included."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ age_days = DELETE_ACCESS_KEY_DAYS + 5
+ mock_data = _mock_iam_users_inactive_key(age_days=age_days, with_inactive_tag=True)
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ policy = DeleteAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 1
+ assert result[0]['ResourceId'] == TEST_USER_NAME
+ # ES schema stores cleanup_result in ResourceAction (dry_run=yes -> verify returns False)
+ assert result[0]['ResourceAction'] == 'False'
+ assert result[0]['ResourceType'] == 'UnusedAccessKey'
+ assert result[0]['ResourceState'] == 'Inactive'
+ assert result[0]['AgeDays'] == age_days
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_skips_inactive_old_key_without_tag_unless_flag():
+ """Inactive key with age > threshold but no UnusedAccessKey1InactiveDate tag is skipped unless DELETE_INACTIVE_KEYS_WITHOUT_TAG."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+ environment_variables.environment_variables_dict.pop('DELETE_INACTIVE_KEYS_WITHOUT_TAG', None)
+
+ mock_data = _mock_iam_users_inactive_key(age_days=DELETE_ACCESS_KEY_DAYS + 10, with_inactive_tag=False)
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ policy = DeleteAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_includes_inactive_old_key_without_tag_when_flag_set():
+ """When DELETE_INACTIVE_KEYS_WITHOUT_TAG is True, inactive keys over threshold are included even without tag."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+ environment_variables.environment_variables_dict['DELETE_INACTIVE_KEYS_WITHOUT_TAG'] = True
+
+ age_days = DELETE_ACCESS_KEY_DAYS + 10
+ mock_data = _mock_iam_users_inactive_key(age_days=age_days, with_inactive_tag=False)
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ policy = DeleteAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 1
+ assert result[0]['ResourceId'] == TEST_USER_NAME
+ # ES schema stores cleanup_result in ResourceAction (dry_run=yes -> verify returns False)
+ assert result[0]['ResourceAction'] == 'False'
+ assert result[0]['AgeDays'] == age_days
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_empty_when_no_users():
+ """When no IAM users have access keys, result is empty."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value={}):
+ policy = DeleteAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_deletion_grace_days_capped():
+ """Deletion grace days is min(age_days - DELETE_ACCESS_KEY_DAYS, DAYS_TO_TAKE_ACTION)."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ age_days = DELETE_ACCESS_KEY_DAYS + 20
+ mock_data = _mock_iam_users_inactive_key(age_days=age_days, with_inactive_tag=True)
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(DeleteAccessKey, 'verify_and_delete_resource', return_value=False) as mock_verify:
+ policy = DeleteAccessKey()
+ policy.run_policy_operations()
+ call_kwargs = mock_verify.call_args[1]
+ assert call_kwargs['clean_up_days'] == 7
+ assert call_kwargs['access_key_id'] == 'AKIAIOSFODNN7EXAMPLE'
+ assert call_kwargs['remove_inactive_tag'] is True
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_delete_access_key_remove_inactive_tag_false_when_no_tag():
+ """When key has no UnusedAccessKey1InactiveDate tag, verify_and_delete_resource is called with remove_inactive_tag=False."""
+ environment_variables.environment_variables_dict['policy'] = 'delete_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+ environment_variables.environment_variables_dict['DELETE_INACTIVE_KEYS_WITHOUT_TAG'] = True
+
+ mock_data = _mock_iam_users_inactive_key(age_days=DELETE_ACCESS_KEY_DAYS + 5, with_inactive_tag=False)
+ with patch.object(DeleteAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(DeleteAccessKey, 'verify_and_delete_resource', return_value=False) as mock_verify:
+ policy = DeleteAccessKey()
+ policy.run_policy_operations()
+ call_kwargs = mock_verify.call_args[1]
+ assert call_kwargs['remove_inactive_tag'] is False
diff --git a/tests/unittest/cloud_governance/policy/aws/test_unused_access_key.py b/tests/unittest/cloud_governance/policy/aws/test_unused_access_key.py
new file mode 100644
index 00000000..ee0c5333
--- /dev/null
+++ b/tests/unittest/cloud_governance/policy/aws/test_unused_access_key.py
@@ -0,0 +1,166 @@
+"""
+Unit tests for cloud_governance.policy.aws.unused_access_key.UnusedAccessKey.
+"""
+from unittest.mock import patch
+
+from moto import mock_ec2, mock_s3, mock_iam
+
+from cloud_governance.main.environment_variables import environment_variables
+from cloud_governance.policy.aws.unused_access_key import UnusedAccessKey
+from cloud_governance.common.utils.configs import UNUSED_ACCESS_KEY_DAYS
+from tests.unittest.configs import DRY_RUN_YES, DRY_RUN_NO, AWS_DEFAULT_REGION, TEST_USER_NAME
+
+
+def _mock_iam_users_access_keys(age_days: int, status: str = 'Active', last_activity_days: int = 100):
+ """Build mock IAM users access keys dict as returned by IAMOperations.get_iam_users_access_keys()."""
+ return {
+ TEST_USER_NAME: {
+ 'Access key 1': {
+ 'label': 'Access key 1',
+ 'status': status,
+ 'age_days': age_days,
+ 'last_activity_days': last_activity_days,
+ 'access_key_id': 'AKIAIOSFODNN7EXAMPLE',
+ },
+ 'tags': [{'Key': 'User', 'Value': TEST_USER_NAME}],
+ 'region': AWS_DEFAULT_REGION,
+ 'ResourceId': 'AIDAEXAMPLE',
+ }
+ }
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_skips_when_age_below_threshold():
+ """Keys with age_days < UNUSED_ACCESS_KEY_DAYS are skipped (no deactivation)."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_NO
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_access_keys(age_days=UNUSED_ACCESS_KEY_DAYS - 1, status='Active')
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(UnusedAccessKey, '_has_active_access_keys', return_value=True):
+ policy = UnusedAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_includes_when_age_at_or_above_threshold():
+ """Keys with age_days >= UNUSED_ACCESS_KEY_DAYS are included for deactivation path."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_access_keys(age_days=UNUSED_ACCESS_KEY_DAYS, status='Active')
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(UnusedAccessKey, '_has_active_access_keys', return_value=True):
+ policy = UnusedAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 1
+ assert result[0]['ResourceId'] == TEST_USER_NAME
+ # ES schema stores cleanup_result in ResourceAction (dry_run=yes -> verify returns False)
+ assert result[0]['ResourceAction'] == 'False'
+ assert result[0]['ResourceType'] == 'UnusedAccessKey'
+ assert result[0]['AgeDays'] == UNUSED_ACCESS_KEY_DAYS
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_skips_inactive_keys():
+ """Keys with status 'Inactive' are skipped."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_access_keys(age_days=UNUSED_ACCESS_KEY_DAYS + 10, status='Inactive')
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(UnusedAccessKey, '_has_active_access_keys', return_value=False):
+ policy = UnusedAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_skips_when_skip_policy_tag():
+ """Users with Policy=notdelete or skip are skipped."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_access_keys(age_days=UNUSED_ACCESS_KEY_DAYS + 5, status='Active')
+ mock_data[TEST_USER_NAME]['tags'] = [
+ {'Key': 'User', 'Value': TEST_USER_NAME},
+ {'Key': 'Policy', 'Value': 'not-delete'},
+ ]
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(UnusedAccessKey, '_has_active_access_keys', return_value=True):
+ policy = UnusedAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_skips_when_no_active_keys():
+ """When _has_active_access_keys returns False for the key, it is skipped."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ mock_data = _mock_iam_users_access_keys(age_days=UNUSED_ACCESS_KEY_DAYS + 5, status='Active')
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(UnusedAccessKey, '_has_active_access_keys', return_value=False):
+ policy = UnusedAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_empty_when_no_users():
+ """When no IAM users have access keys, result is empty."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value={}):
+ policy = UnusedAccessKey()
+ result = policy.run_policy_operations()
+ assert len(result) == 0
+
+
+@mock_ec2
+@mock_s3
+@mock_iam
+def test_unused_access_key_deactivation_grace_days_capped():
+ """Deactivation grace days is min(age_days - UNUSED_ACCESS_KEY_DAYS, DAYS_TO_TAKE_ACTION)."""
+ environment_variables.environment_variables_dict['policy'] = 'unused_access_key'
+ environment_variables.environment_variables_dict['dry_run'] = DRY_RUN_YES
+ environment_variables.environment_variables_dict['AWS_DEFAULT_REGION'] = AWS_DEFAULT_REGION
+ environment_variables.environment_variables_dict['DAYS_TO_TAKE_ACTION'] = 7
+
+ age_days = UNUSED_ACCESS_KEY_DAYS + 20
+ mock_data = _mock_iam_users_access_keys(age_days=age_days, status='Active')
+ with patch.object(UnusedAccessKey, '_get_iam_users_access_keys', return_value=mock_data):
+ with patch.object(UnusedAccessKey, '_has_active_access_keys', return_value=True):
+ with patch.object(UnusedAccessKey, 'verify_and_delete_resource', return_value=False) as mock_verify:
+ policy = UnusedAccessKey()
+ policy.run_policy_operations()
+ call_kwargs = mock_verify.call_args[1]
+ # grace = min(20, 7) = 7
+ assert call_kwargs['clean_up_days'] == 7