From ab5414a0df3a32c1939db137caba5226bd71c3bc Mon Sep 17 00:00:00 2001 From: Haripriya Bendapudi Date: Mon, 19 May 2025 22:28:52 +0000 Subject: [PATCH] Adding support for new resource types in check-no-public-access api. --- README.md | 7 + cfn_policy_validator/canonical_user_id.py | 2 +- .../cloud_trail_attributes.py | 79 +++ .../parsers/resource/api_gateway.py | 51 ++ .../parsers/resource/backup.py | 50 ++ .../parsers/resource/cloud_trail.py | 76 +++ .../parsers/resource/code_artifact.py | 51 ++ .../parsers/resource/parser.py | 15 +- cfn_policy_validator/parsers/resource/s3.py | 46 ++ .../parsers/resource/s3_express.py | 64 ++ .../parsers/utils/arn_generator.py | 20 +- .../parsers/utils/cfn_to_arn_map.json | 25 + .../utils/intrinsic_functions/__init__.py | 6 + .../fn_get_att_evaluator.py | 46 +- .../intrinsic_functions/ref_evaluator.py | 24 +- cfn_policy_validator/rest_api_attributes.py | 58 ++ .../test_api_gateway_rest_api_policy.py | 283 ++++++++ .../test_backup_backup_vault_policy.py | 233 +++++++ .../test_cloud_trail_resource_policy.py | 620 ++++++++++++++++++ .../test_code_artifact_domain_policy.py | 252 +++++++ .../test_s3_express_access_point_policy.py | 300 +++++++++ .../test_s3_tables_table_bucket_policy.py | 267 ++++++++ cfn_policy_validator/tests/test_cli.py | 4 +- ..._s3_multi_region_access_point_validator.py | 24 +- .../validation/policy_analysis.py | 9 +- cfn_policy_validator/version.py | 2 +- test_files/public_access_test.yml | 144 ++++ 27 files changed, 2739 insertions(+), 19 deletions(-) create mode 100644 cfn_policy_validator/cloud_trail_attributes.py create mode 100644 cfn_policy_validator/parsers/resource/api_gateway.py create mode 100644 cfn_policy_validator/parsers/resource/backup.py create mode 100644 cfn_policy_validator/parsers/resource/cloud_trail.py create mode 100644 cfn_policy_validator/parsers/resource/code_artifact.py create mode 100644 cfn_policy_validator/parsers/resource/s3_express.py create mode 100644 cfn_policy_validator/rest_api_attributes.py create mode 100644 cfn_policy_validator/tests/parsers_tests/resource_tests/test_api_gateway_rest_api_policy.py create mode 100644 cfn_policy_validator/tests/parsers_tests/resource_tests/test_backup_backup_vault_policy.py create mode 100644 cfn_policy_validator/tests/parsers_tests/resource_tests/test_cloud_trail_resource_policy.py create mode 100644 cfn_policy_validator/tests/parsers_tests/resource_tests/test_code_artifact_domain_policy.py create mode 100644 cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_express_access_point_policy.py create mode 100644 cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_tables_table_bucket_policy.py diff --git a/README.md b/README.md index 8794c84..5323c33 100644 --- a/README.md +++ b/README.md @@ -185,6 +185,13 @@ Parses IAM identity-based and resource-based policies from AWS CloudFormation te | AWS::SNS::TopicPolicy | x | | x | | AWS::SecretsManager::ResourcePolicy | x | | x | | AWS::IAM::Role (trust policy) | x | x | x | +| AWS::S3Tables::TableBucket | | | x | +| AWS::ApiGateway::RestApi | | | x | +| AWS::CodeArtifact::Domain | | | x | +| AWS::Backup::BackupVault | | | x | +| AWS::CloudTrail::Dashboard | | | x | +| AWS::CloudTrail::EventDataStore | | | x | +| AWS::S3Express::AccessPoint | | | x | ### Intrinsic function and Pseudo parameter support diff --git a/cfn_policy_validator/canonical_user_id.py b/cfn_policy_validator/canonical_user_id.py index 7dde2c2..e7983dd 100644 --- a/cfn_policy_validator/canonical_user_id.py +++ b/cfn_policy_validator/canonical_user_id.py @@ -11,7 +11,7 @@ # Resolution of the canonical user in an account which is a possible principal value for a policy and also # used when evaluating S3 bucket ACLs. -def get_canonical_user(region): +def get_canonical_user(region, logical_name_of_resource=None, resource=None): global canonical_user_id if canonical_user_id is not None: return canonical_user_id diff --git a/cfn_policy_validator/cloud_trail_attributes.py b/cfn_policy_validator/cloud_trail_attributes.py new file mode 100644 index 0000000..a584fea --- /dev/null +++ b/cfn_policy_validator/cloud_trail_attributes.py @@ -0,0 +1,79 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import client +from cfn_policy_validator.application_error import ApplicationError + +def get_dashboard_created_time(region, resource_name, resource=None): + return get_dashboard_attribute(region, resource_name, 'CreatedTimestamp') + +def get_dashboard_status(region, resource_name, resource=None): + return get_dashboard_attribute(region, resource_name, 'Status') + +def get_dashboard_type(region, resource_name, resource=None): + return get_dashboard_attribute(region, resource_name, 'Type') + +def get_dashboard_updated_time(region, resource_name, resource=None): + return get_dashboard_attribute(region, resource_name, 'UpdatedTimestamp') + +def get_eventdatastore_arn(arn_pattern, resource_name, resource, visited_nodes, region): + return get_eventdatastore_arn_from_client(region, resource_name) + +def get_eventdatastore_created_time(region, resource_name, resource=None): + return get_eventdatastore_attribute(region, resource_name, 'CreatedTimestamp') + +def get_eventdatastore_status(region, resource_name, resource=None): + return get_eventdatastore_attribute(region, resource_name, 'Status') + +def get_eventdatastore_updated_time(region, resource_name, resource=None): + return get_eventdatastore_attribute(region, resource_name, 'UpdatedTimestamp') + + +def get_dashboard_attribute(region, resource_name, attribute): + supported_attributes = ['Type', 'CreatedTimestamp', 'Status', 'UpdatedTimestamp'] + cloudtrail_client = client.build('cloudtrail', region) + try: + if attribute not in supported_attributes: + raise ApplicationError(f"Attribute {attribute} is not supported. Supported attributes are {supported_attributes}") + response = cloudtrail_client.get_dashboard( + DashboardId=resource_name + ) + return response[attribute] + except Exception as e: + raise ApplicationError(f"Error: {e}") + +def get_eventdatastore_arn_from_client(region, resource_name): + cloudtrail_client = client.build('cloudtrail', region) + next_token = None + client_config = { + 'MaxResults': 25 + } + + while True: + if next_token: + client_config['NextToken'] = nextToken + response = cloudtrail_client.list_event_data_stores(**client_config) + for eventdatastore in response['EventDataStores']: + if eventdatastore['Name'] == resource_name: + return eventdatastore['EventDataStoreArn'] + nextToken = response.get('NextToken') + if not nextToken: + break + raise ApplicationError(f"CloudTrail Event Data Store {resource_name} not found") + + +def get_eventdatastore_attribute(region, resource_name, attribute): + cloudtrail_client = client.build('cloudtrail', region) + supported_attributes = ['CreatedTimestamp', 'Status', 'UpdatedTimestamp'] + + if attribute not in supported_attributes: + raise ApplicationError(f"Attribute {attribute} is not supported. Supported attributes are {supported_attributes}") + + arn=get_eventdatastore_arn_from_client(region, resource_name) + event_data_store_response = cloudtrail_client.get_event_data_store(EventDataStore=arn) + ret = event_data_store_response[attribute] + return ret + + \ No newline at end of file diff --git a/cfn_policy_validator/parsers/resource/api_gateway.py b/cfn_policy_validator/parsers/resource/api_gateway.py new file mode 100644 index 0000000..3aa1efb --- /dev/null +++ b/cfn_policy_validator/parsers/resource/api_gateway.py @@ -0,0 +1,51 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import ApplicationError +from cfn_policy_validator.parsers.output import Policy, Resource + +class ApiGatewayRestApiPolicyParser: + """ AWS::ApiGateway::RestApi + """ + + def __init__(self): + self.rest_api_policies = [] + + def parse(self, _, resource): + evaluated_resource = resource.eval(rest_api_policy_schema) + properties = evaluated_resource['Properties'] + + policy_document = properties.get('Policy') + if policy_document is None: + # we don't need to parse resources that don't have policies and policy is optional + return + name = properties['Name'] + + policy = Policy('Policy', policy_document) + resource = Resource(name, 'AWS::ApiGateway::RestApi', policy) + + self.rest_api_policies.append(resource) + + def get_policies(self): + return self.rest_api_policies + +rest_api_policy_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'Policy': { + 'type': 'object' + }, + 'Name': { + 'type': 'string' + } + }, + 'required': ['Name'] + } + }, + 'required': ['Properties'] +} \ No newline at end of file diff --git a/cfn_policy_validator/parsers/resource/backup.py b/cfn_policy_validator/parsers/resource/backup.py new file mode 100644 index 0000000..eb346d0 --- /dev/null +++ b/cfn_policy_validator/parsers/resource/backup.py @@ -0,0 +1,50 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import ApplicationError +from cfn_policy_validator.parsers.output import Policy, Resource + +class BackupBackupVaultPolicyParser: + """ AWS::Backup::BackupVault + """ + + def __init__(self): + self.backup_vault_policies = [] + + def parse(self, _, resource): + evaluated_resource = resource.eval(backup_vault_policy_schema) + properties = evaluated_resource['Properties'] + + policy_document = properties.get('AccessPolicy') + if policy_document is None: + # we don't need to parse resources that don't have policies and policy is optional + return + name = properties['BackupVaultName'] + + policy = Policy('AccessPolicy', policy_document) + resource = Resource(name, 'AWS::Backup::BackupVault', policy) + self.backup_vault_policies.append(resource) + + def get_policies(self): + return self.backup_vault_policies + +backup_vault_policy_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'AccessPolicy': { + 'type': 'object' + }, + 'BackupVaultName': { + 'type': 'string' + } + }, + 'required': ['BackupVaultName'] + } + }, + 'required': ['Properties'] +} \ No newline at end of file diff --git a/cfn_policy_validator/parsers/resource/cloud_trail.py b/cfn_policy_validator/parsers/resource/cloud_trail.py new file mode 100644 index 0000000..5c7d22a --- /dev/null +++ b/cfn_policy_validator/parsers/resource/cloud_trail.py @@ -0,0 +1,76 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import ApplicationError +from cfn_policy_validator.parsers.output import Policy, Resource +import re +from typing import Tuple, Optional + +class CloudTrailResourcePolicyParser: + """ AWS::CloudTrail::ResourcePolicy + """ + + def __init__(self): + self.resource_policies = [] + + @staticmethod + def extract_cloudtrail_resource_info(arn) -> Optional[Tuple[str, str]]: + """ + Extract both the resource type and resource name from a CloudTrail ARN. + + Args: + arn (str): The CloudTrail ARN to parse + + Returns: + Tuple[str, str] or None: A tuple containing (resource_type, resource_name) if match found, + or None if no match + """ + # Pattern captures both resource type and resource name + pattern = r'arn:aws:cloudtrail:[^:]*:[^:]*:([^/]+)/([^/]+)' + match = re.match(pattern, arn) + + if match: + resource_type = match.group(1) # Extract resource type + resource_name = match.group(2) # Extract resource name + return resource_type, resource_name + + return None + + def parse(self, _, resource): + evaluated_resource = resource.eval(resource_policy_schema) + properties = evaluated_resource['Properties'] + + policy_document = properties['ResourcePolicy'] + resource, name = self.extract_cloudtrail_resource_info(properties['ResourceArn']) + supported_resource_types = {'dashboard': 'AWS::CloudTrail::Dashboard', 'eventdatastore':'AWS::CloudTrail::EventDataStore'} + resource_type = supported_resource_types.get(resource) + if resource_type is None: + raise ApplicationError(f"Unsupported resource type {resource}") + policy = Policy('ResourcePolicy', policy_document) + resource = Resource(name, resource_type, policy) + + self.resource_policies.append(resource) + + def get_policies(self): + return self.resource_policies + +resource_policy_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'ResourcePolicy': { + 'type': 'object' + }, + 'ResourceArn': { + 'type': 'string' + } + }, + 'required': ['ResourcePolicy', 'ResourceArn'] + } + }, + 'required': ['Properties'] +} \ No newline at end of file diff --git a/cfn_policy_validator/parsers/resource/code_artifact.py b/cfn_policy_validator/parsers/resource/code_artifact.py new file mode 100644 index 0000000..9c5b3a0 --- /dev/null +++ b/cfn_policy_validator/parsers/resource/code_artifact.py @@ -0,0 +1,51 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import ApplicationError +from cfn_policy_validator.parsers.output import Policy, Resource + +class CodeArtifactDomainPolicyParser: + """ AWS::CodeArtifact::Domain + """ + + def __init__(self): + self.code_artifact_policies = [] + + def parse(self, _, resource): + evaluated_resource = resource.eval(code_artifact_policy_schema) + properties = evaluated_resource['Properties'] + + policy_document = properties.get('PermissionsPolicyDocument') + if policy_document is None: + # we don't need to parse resources that don't have policies and policy is optional + return + name = properties['DomainName'] + + policy = Policy('PermissionsPolicyDocument', policy_document) + resource = Resource(name, 'AWS::CodeArtifact::Domain', policy) + + self.code_artifact_policies.append(resource) + + def get_policies(self): + return self.code_artifact_policies + +code_artifact_policy_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'PermissionsPolicyDocument': { + 'type': 'object' + }, + 'DomainName': { + 'type': 'string' + } + }, + 'required': ['DomainName'] + } + }, + 'required': ['Properties'] +} \ No newline at end of file diff --git a/cfn_policy_validator/parsers/resource/parser.py b/cfn_policy_validator/parsers/resource/parser.py index 9a35986..9cd0568 100644 --- a/cfn_policy_validator/parsers/resource/parser.py +++ b/cfn_policy_validator/parsers/resource/parser.py @@ -6,11 +6,16 @@ from cfn_policy_validator.parsers.resource.kms import KmsKeyPolicyParser from cfn_policy_validator.parsers.resource.s3 import S3BucketPolicyParser, S3AccessPointPolicyParser, \ - S3MultiRegionAccessPointPolicyParser, S3BucketAclParser + S3MultiRegionAccessPointPolicyParser, S3BucketAclParser, S3TableBucketPolicyParser from cfn_policy_validator.parsers.resource.sns import SnsTopicPolicyParser from cfn_policy_validator.parsers.resource.sqs import SqsQueuePolicyParser from cfn_policy_validator.parsers.resource.lambda_aws import LambdaPermissionPolicyParser, LambdaLayerVersionPermissionParser from cfn_policy_validator.parsers.resource.secrets_manager import SecretsManagerPolicyParser +from cfn_policy_validator.parsers.resource.api_gateway import ApiGatewayRestApiPolicyParser +from cfn_policy_validator.parsers.resource.code_artifact import CodeArtifactDomainPolicyParser +from cfn_policy_validator.parsers.resource.cloud_trail import CloudTrailResourcePolicyParser +from cfn_policy_validator.parsers.resource.s3_express import S3ExpressAccessPointPolicyParser +from cfn_policy_validator.parsers.resource.backup import BackupBackupVaultPolicyParser from cfn_policy_validator.parsers.utils.topological_sorter import TopologicalSorter @@ -35,12 +40,18 @@ def parse(cls, template, account_config, excluded_resource_types={}): 'AWS::S3::MultiRegionAccessPointPolicy': S3MultiRegionAccessPointPolicyParser(), 'AWS::S3::Bucket': S3BucketAclParser(), 'AWS::S3::BucketPolicy': S3BucketPolicyParser(), + 'AWS::S3Tables::TableBucketPolicy': S3TableBucketPolicyParser(), 'AWS::SQS::QueuePolicy': SqsQueuePolicyParser(), 'AWS::SNS::TopicPolicy': SnsTopicPolicyParser(), 'AWS::KMS::Key': KmsKeyPolicyParser(), 'AWS::Lambda::Permission': LambdaPermissionPolicyParser(account_config), 'AWS::Lambda::LayerVersionPermission': LambdaLayerVersionPermissionParser(account_config.partition), - 'AWS::SecretsManager::ResourcePolicy': SecretsManagerPolicyParser() + 'AWS::SecretsManager::ResourcePolicy': SecretsManagerPolicyParser(), + 'AWS::ApiGateway::RestApi': ApiGatewayRestApiPolicyParser(), + 'AWS::CodeArtifact::Domain' : CodeArtifactDomainPolicyParser(), + 'AWS::CloudTrail::ResourcePolicy' : CloudTrailResourcePolicyParser(), + 'AWS::S3Express::AccessPoint' : S3ExpressAccessPointPolicyParser(), + 'AWS::Backup::BackupVault' : BackupBackupVaultPolicyParser() } invoked_parsers = set() diff --git a/cfn_policy_validator/parsers/resource/s3.py b/cfn_policy_validator/parsers/resource/s3.py index c134ea1..7286983 100644 --- a/cfn_policy_validator/parsers/resource/s3.py +++ b/cfn_policy_validator/parsers/resource/s3.py @@ -7,6 +7,52 @@ from cfn_policy_validator.parsers.resource import get_parser_of_type +class S3TableBucketPolicyParser: + """ AWS::S3Tables::TableBucketPolicy + """ + def __init__(self): + self.table_bucket_policies = [] + + def parse(self, _, resource): + evaluated_resource = resource.eval(table_bucket_policy_schema) + properties = evaluated_resource['Properties'] + + policy_document = properties['ResourcePolicy'] + table_bucket_arn = properties['TableBucketARN'] + + try: + table_bucket_name = table_bucket_arn.split(":bucket/", 1)[1] + except IndexError: + raise ApplicationError(f'Invalid value for {resource.ancestors_as_string()}.Properties.TableBucketARN. Must be a valid TableBucket ARN. TableBucketARN value: {table_bucket_arn}') + + + policy = Policy('TableBucketPolicy', policy_document) + resource = Resource(table_bucket_name, 'AWS::S3Tables::TableBucket', policy) + + self.table_bucket_policies.append(resource) + + def get_policies(self): + return self.table_bucket_policies + +table_bucket_policy_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'ResourcePolicy': { + 'type': 'object' + }, + 'TableBucketARN': { + 'type': 'string' + } + }, + 'required': ['ResourcePolicy', 'TableBucketARN'] + } + }, + 'required': ['Properties'] +} + class S3BucketPolicyParser: """ AWS::S3::BucketPolicy """ diff --git a/cfn_policy_validator/parsers/resource/s3_express.py b/cfn_policy_validator/parsers/resource/s3_express.py new file mode 100644 index 0000000..d148adc --- /dev/null +++ b/cfn_policy_validator/parsers/resource/s3_express.py @@ -0,0 +1,64 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import ApplicationError +from cfn_policy_validator.parsers.output import Policy, Resource +import logging + +LOGGER = logging.getLogger("cfn-policy-validator") + +class S3ExpressAccessPointPolicyParser: + """ AWS::S3Express::AccessPoint + """ + + def __init__(self): + self.access_point_policies = [] + + def parse(self, resourceName, resource): + evaluated_resource = resource.eval(s3_express_access_point_policy_schema) + properties = evaluated_resource['Properties'] + + policy_document = properties.get('Policy') + if policy_document is None: + # we don't need to parse resources that don't have policies and policy is optional + return + + name = properties.get('Name', resourceName) + policy = Policy('Policy', policy_document) + vpc_id = properties.get('VpcConfiguration', {}).get('VpcId') + + configuration = None + if vpc_id is not None: + configuration = { + 'VpcId': vpc_id + } + + resource = Resource(name, 'AWS::S3Express::AccessPoint', policy, configuration) + + self.access_point_policies.append(resource) + + def get_policies(self): + return self.access_point_policies + +s3_express_access_point_policy_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'Policy': { + 'type': 'object' + }, + 'Name': { + 'type': 'string' + }, + 'VpcConfiguration': { + 'type': 'object' + } + } + } + }, + 'required': ['Properties'] +} \ No newline at end of file diff --git a/cfn_policy_validator/parsers/utils/arn_generator.py b/cfn_policy_validator/parsers/utils/arn_generator.py index 25d7a9f..3716ff1 100644 --- a/cfn_policy_validator/parsers/utils/arn_generator.py +++ b/cfn_policy_validator/parsers/utils/arn_generator.py @@ -10,6 +10,7 @@ from cfn_policy_validator.parsers.utils.arn_generator_schemas import iam_role_schema, iam_user_schema, \ elbv2_load_balancer_schema, elbv2_listener_schema, elbv2_target_group_schema, network_firewall_rulegroup_schema, \ iam_managed_policy_schema +from cfn_policy_validator.cloud_trail_attributes import get_eventdatastore_arn class ArnGenerator: @@ -54,6 +55,9 @@ def __init__(self, account_config): 'AWS::NetworkFirewall::RuleGroup': { 'Ref': generate_network_firewall_rule_group, 'RuleGroupArn': generate_network_firewall_rule_group + }, + 'AWS::CloudTrail::EventDataStore': { + 'EventDataStoreArn': get_eventdatastore_arn } } @@ -93,7 +97,7 @@ def try_generate_arn(self, resource_name, resource, attribute_or_ref, visited_no # but have different ARNs custom_generator = self.custom_generators.get(cfn_type, {}).get(attribute_or_ref) if custom_generator is not None: - arn_pattern = custom_generator(arn_pattern, resource_name, resource, visited_nodes) + arn_pattern = custom_generator(arn_pattern, resource_name, resource, visited_nodes, self.account_config.region) # match any variable (anything not Partition, Account, Region) within brackets and replace with the resource name # e.g. arn:aws:..:${SomeResourceName} -> arn:aws:..:ResourceName @@ -109,7 +113,7 @@ def callback(match): # AWS::IAM::Role # include the path in the Role ARN which will be helpful for analysis -def generate_role_arn(arn_pattern, resource_name, resource, visited_nodes): +def generate_role_arn(arn_pattern, resource_name, resource, visited_nodes, region): evaluated_resource = resource.eval(iam_role_schema, visited_nodes) properties = evaluated_resource['Properties'] @@ -125,7 +129,7 @@ def generate_role_arn(arn_pattern, resource_name, resource, visited_nodes): # AWS::IAM::User # include the path in the User ARN which will be helpful for analysis -def generate_user_arn(arn_pattern, resource_name, resource, visited_nodes): +def generate_user_arn(arn_pattern, resource_name, resource, visited_nodes, region): evaluated_resource = resource.eval(iam_user_schema, visited_nodes) properties = evaluated_resource.get('Properties', {}) @@ -138,7 +142,7 @@ def generate_user_arn(arn_pattern, resource_name, resource, visited_nodes): return arn_pattern.replace("${UserNameWithPath}", path + name) -def generate_managed_policy_arn(arn_pattern, resource_name, resource, visited_nodes): +def generate_managed_policy_arn(arn_pattern, resource_name, resource, visited_nodes, region): evaluated_resource = resource.eval(iam_managed_policy_schema, visited_nodes) properties = evaluated_resource['Properties'] @@ -152,7 +156,7 @@ def generate_managed_policy_arn(arn_pattern, resource_name, resource, visited_no # Multiple load balancers share the same CFN resources, but have different ARNs depending on load balancer type # AWS::ElasticLoadBalancingV2::LoadBalancer -def generate_elbv2_load_balancer_arn(arn_pattern, _, resource, visited_nodes): +def generate_elbv2_load_balancer_arn(arn_pattern, _, resource, visited_nodes, region): evaluated_resource = resource.eval(elbv2_load_balancer_schema, visited_nodes) properties = evaluated_resource.get('Properties', {}) @@ -169,7 +173,7 @@ def generate_elbv2_load_balancer_arn(arn_pattern, _, resource, visited_nodes): # AWS::ElasticLoadBalancingV2::Listener -def generate_elbv2_listener_arn(arn_pattern, _, resource, visited_nodes): +def generate_elbv2_listener_arn(arn_pattern, _, resource, visited_nodes, region): evaluated_resource = resource.eval(elbv2_listener_schema, visited_nodes) properties = evaluated_resource['Properties'] @@ -185,7 +189,7 @@ def generate_elbv2_listener_arn(arn_pattern, _, resource, visited_nodes): # AWS::ElasticLoadBalancingV2::TargetGroup -def generate_elbv2_target_group_load_balancer_arn(arn_pattern, _, resource, visited_nodes): +def generate_elbv2_target_group_load_balancer_arn(arn_pattern, _, resource, visited_nodes, region): evaluated_resource = resource.eval(elbv2_target_group_schema, visited_nodes) properties = evaluated_resource.get('Properties', {}) @@ -205,7 +209,7 @@ def generate_elbv2_target_group_load_balancer_arn(arn_pattern, _, resource, visi # AWS::NetworkFirewall::RuleGroup # network firewall rule groups can be either stateful or stateless and have different ARNs depending on the type -def generate_network_firewall_rule_group(arn_pattern, _, resource, visited_nodes): +def generate_network_firewall_rule_group(arn_pattern, _, resource, visited_nodes, region): evaluated_resource = resource.eval(network_firewall_rulegroup_schema, visited_nodes) properties = evaluated_resource['Properties'] diff --git a/cfn_policy_validator/parsers/utils/cfn_to_arn_map.json b/cfn_policy_validator/parsers/utils/cfn_to_arn_map.json index 7daf68f..c607fae 100644 --- a/cfn_policy_validator/parsers/utils/cfn_to_arn_map.json +++ b/cfn_policy_validator/parsers/utils/cfn_to_arn_map.json @@ -1605,6 +1605,13 @@ "ResourceType": "eventdatastore" } }, + "Dashboard": { + "DashboardArn": { + "Value": "arn:${Partition}:cloudtrail:${Region}:${Account}:dashboard/${DashboardStoreId}", + "ServicePrefix": "cloudtrail", + "ResourceType": "dashboard" + } + }, "ResourcePolicy": { "Ref": null }, @@ -2098,6 +2105,24 @@ } } }, + "S3Tables": { + "TableBucket": { + "TableBucketARN": { + "Value": "arn:${Partition}:s3tables:${Region}:${Account}:bucket/${BucketName}", + "ServicePrefix": "s3tables", + "ResourceType": "bucket" + } + } + }, + "S3Express": { + "AccessPoint": { + "Arn": { + "Value": "arn:${Partition}:s3express:${Region}:${Account}:accesspoint/${AccessPointName}", + "ServicePrefix": "s3express", + "ResourceType": "accesspoint" + } + } + }, "Cognito": { "UserPool": { "Arn": { diff --git a/cfn_policy_validator/parsers/utils/intrinsic_functions/__init__.py b/cfn_policy_validator/parsers/utils/intrinsic_functions/__init__.py index 31638b3..fc86f48 100644 --- a/cfn_policy_validator/parsers/utils/intrinsic_functions/__init__.py +++ b/cfn_policy_validator/parsers/utils/intrinsic_functions/__init__.py @@ -8,4 +8,10 @@ 'AWS::IAM::Group': 'GroupName', 'AWS::SQS::Queue': 'QueueName', 'AWS::SSM::Parameter': 'Name', + 'AWS::Backup::BackupVault': 'BackupVaultName', + 'AWS::S3Tables::TableBucket': 'TableBucketName', + 'AWS::ApiGateway::RestApi': 'Name', + 'AWS::CloudTrail::EventDataStore': 'Name', + 'AWS::CloudTrail::Dashboard': 'Name', + 'AWS::S3Express::AccessPoint': 'Name' } diff --git a/cfn_policy_validator/parsers/utils/intrinsic_functions/fn_get_att_evaluator.py b/cfn_policy_validator/parsers/utils/intrinsic_functions/fn_get_att_evaluator.py index 56fc512..320aad3 100644 --- a/cfn_policy_validator/parsers/utils/intrinsic_functions/fn_get_att_evaluator.py +++ b/cfn_policy_validator/parsers/utils/intrinsic_functions/fn_get_att_evaluator.py @@ -3,6 +3,9 @@ SPDX-License-Identifier: MIT-0 """ from cfn_policy_validator.canonical_user_id import get_canonical_user +from cfn_policy_validator.rest_api_attributes import get_rest_api_id, get_root_resource_id +from cfn_policy_validator.cloud_trail_attributes import get_dashboard_created_time, get_dashboard_status, get_dashboard_type, get_dashboard_updated_time +from cfn_policy_validator.cloud_trail_attributes import get_eventdatastore_created_time, get_eventdatastore_status, get_eventdatastore_updated_time from cfn_policy_validator.application_error import ApplicationError from cfn_policy_validator.cfn_tools.schema_validator import validate_schema from cfn_policy_validator.parsers.utils.cycle_detection import validate_no_cycle @@ -22,6 +25,28 @@ def __init__(self, resources, arn_generator, node_evaluator, region): self.custom_get_att_evals = { 'AWS::CloudFront::CloudFrontOriginAccessIdentity': { 'S3CanonicalUserId': get_canonical_user + }, + 'AWS::ApiGateway::RestApi': { + 'RestApiId': get_rest_api_id, + 'RootResourceId': get_root_resource_id + }, + 'AWS::CloudTrail::Dashboard': { + 'CreatedTimestamp': get_dashboard_created_time, + 'Status': get_dashboard_status, + 'Type': get_dashboard_type, + 'UpdatedTimestamp': get_dashboard_updated_time + }, + 'AWS::CloudTrail::EventDataStore': { + 'CreatedTimestamp': get_eventdatastore_created_time, + 'Status': get_eventdatastore_status, + 'UpdatedTimestamp': get_eventdatastore_updated_time + }, + 'AWS::CodeArtifact::Domain': { + 'Name': self.get_code_artifact_name, + 'Owner': self.get_code_artifact_owner + }, + 'AWS::S3Express::AccessPoint': { + 'NetworkOrigin': self.evaluate_network_origin } } @@ -61,7 +86,7 @@ def evaluate(self, get_att_lookup, visited_nodes=None): # IAM policies (canonical username) custom_get_att_eval = self.custom_get_att_evals.get(resource['Type'], {}).get(attribute_name) if custom_get_att_eval is not None: - return custom_get_att_eval(self.region) + return custom_get_att_eval(self.region, resource_name, resource) # For calls to GetAtt that are not ARNs, try to find a property with the same name. This is a last resort and # should probably not occur often. We expect to almost always see GetAtt used for ARNs in the context of an @@ -78,6 +103,23 @@ def evaluate(self, get_att_lookup, visited_nodes=None): # there are many return types for GetAtt, so it's the caller's responsibility to validate expected type return self.node_evaluator.eval(property_value, visited_nodes=visited_nodes) + def get_code_artifact_name(self, region, resource_name, resource): + properties = resource.get('Properties',[]) + return properties.get('DomainName') + + def get_code_artifact_owner(self, region, resource_name, resource): + arn = self.arn_generator.try_generate_arn(resource_name, resource, 'Arn',visited_nodes=None) + parts = arn.split(':') + if len(parts) >= 5: + return parts[4] + return None + + def evaluate_network_origin(self, region, resource_name, resource): + properties = resource.get('Properties', []) + if properties.get('VpcConfiguration'): + return 'VPC' + return 'Internet' + get_att_schema = { 'type': 'array', @@ -88,3 +130,5 @@ def evaluate(self, get_att_lookup, visited_nodes=None): ], 'additionalItems': False } + + diff --git a/cfn_policy_validator/parsers/utils/intrinsic_functions/ref_evaluator.py b/cfn_policy_validator/parsers/utils/intrinsic_functions/ref_evaluator.py index 5a5a6b1..5bbe7fa 100644 --- a/cfn_policy_validator/parsers/utils/intrinsic_functions/ref_evaluator.py +++ b/cfn_policy_validator/parsers/utils/intrinsic_functions/ref_evaluator.py @@ -6,6 +6,7 @@ import os from cfn_policy_validator.application_error import ApplicationError +from cfn_policy_validator.rest_api_attributes import get_rest_api_id from cfn_policy_validator.cfn_tools.schema_validator import validate_schema from cfn_policy_validator.parsers.utils.cycle_detection import validate_no_cycle from cfn_policy_validator.parsers.utils.intrinsic_functions import name_hints @@ -30,7 +31,8 @@ def __init__(self, resources, arn_generator, parameters, parameter_values, accou # some resources require custom evaluation logic, but we should only need to care about this for resources that # have resource policies that we want to parse self.custom_ref_evals = { - 'AWS::SQS::Queue': evaluate_sqs_queue_ref + 'AWS::SQS::Queue': evaluate_sqs_queue_ref, + 'AWS::ApiGateway::RestApi': evaluate_api_gateway_rest_api_ref, } def evaluate(self, resource_logical_name_or_param, visited_nodes=None): @@ -123,6 +125,26 @@ def evaluate(self, resource_logical_name_or_param, visited_nodes=None): } +def evaluate_api_gateway_rest_api_ref(resource_name, api_gateway_rest_api_resource, account_config, visited_nodes): + evaluated_resource = api_gateway_rest_api_resource.eval(rest_api_schema, visited_nodes) + properties = evaluated_resource.get('Properties', {}) + name = properties.get('Name', resource_name) + return get_rest_api_id(account_config.region, name) + +rest_api_schema = { + 'type': 'object', + 'properties': { + 'Properties': { + 'type': 'object', + 'properties': { + 'Name': { + 'type': 'string' + } + } + } + } +} + # for SQS, we need to know that Ref returns a queue URL. This queue URL is used to link the queue's policy to the queue def evaluate_sqs_queue_ref(resource_name, sqs_queue_resource, account_config, visited_nodes): evaluated_resource = sqs_queue_resource.eval(sqs_queue_schema, visited_nodes) diff --git a/cfn_policy_validator/rest_api_attributes.py b/cfn_policy_validator/rest_api_attributes.py new file mode 100644 index 0000000..7461476 --- /dev/null +++ b/cfn_policy_validator/rest_api_attributes.py @@ -0,0 +1,58 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" + +from cfn_policy_validator import client +from cfn_policy_validator.application_error import ApplicationError +from collections import OrderedDict + +# Cache for REST API attributes with a maximum size +MAX_CACHE_SIZE = 100 +api_cache = OrderedDict() # Format: {rest_api_name: {'id': 'api_id', 'rootResourceId': 'root_id'}} + + +# Resolution of the rest api id. +def get_rest_api_id(region, rest_api_name, resource=None): + if rest_api_name in api_cache: + # Move to end to mark as recently used + api_data = api_cache.pop(rest_api_name) + api_cache[rest_api_name] = api_data + return api_cache[rest_api_name]['id'] + + get_attributes(region, rest_api_name) + return api_cache[rest_api_name]['id'] + +def get_root_resource_id(region, rest_api_name, resource=None): + if rest_api_name in api_cache: + # Move to end to mark as recently used + api_data = api_cache.pop(rest_api_name) + api_cache[rest_api_name] = api_data + return api_cache[rest_api_name]['rootResourceId'] + + get_attributes(region, rest_api_name) + return api_cache[rest_api_name]['rootResourceId'] + + +def get_attributes(region, rest_api_name): + apigateway_client = client.build('apigateway', region) + paginator = apigateway_client.get_paginator('get_rest_apis') + + pagination_config={ + 'limit': 1 + } + + for page in paginator.paginate(**pagination_config): + for item in page.get('items', []): + if item.get('name') == rest_api_name: + # If cache is full, remove the least recently used item + if len(api_cache) >= MAX_CACHE_SIZE: + api_cache.popitem(last=False) + + api_cache[rest_api_name] = { + 'id': item['id'], + 'rootResourceId': item['rootResourceId'] + } + return + + raise ApplicationError(f'No rest api found with logical name: {rest_api_name}') diff --git a/cfn_policy_validator/tests/parsers_tests/resource_tests/test_api_gateway_rest_api_policy.py b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_api_gateway_rest_api_policy.py new file mode 100644 index 0000000..3262b2f --- /dev/null +++ b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_api_gateway_rest_api_policy.py @@ -0,0 +1,283 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" +import copy +import unittest + +from cfn_policy_validator.parsers.resource.parser import ResourceParser +from cfn_policy_validator.tests.parsers_tests import mock_node_evaluator_setup +from cfn_policy_validator import client +from cfn_policy_validator.tests.utils import required_property_error, load, account_config, expected_type_error, \ + load_resources +from cfn_policy_validator.application_error import ApplicationError +from cfn_policy_validator.tests.boto_mocks import BotoResponse, get_test_mode + + +api_gateway_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 'execute-api:Invoke', + 'Resource': 'arn:aws:execute-api:us-east-1:123456789012:api123/*/GET/', + 'Principal': '*', + 'Condition': { + 'IpAddress': { + 'aws:SourceIp': '192.0.2.0/24' + } + } + } + ] +} + + +api_gateway_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Deny', + 'Action': 'execute-api:Invoke', + 'Sid': {"Fn::Join": ["", ["Policy-for-", {"Fn::GetAtt": ["MyRestApi", "RestApiId"]}, "-", {"Fn::GetAtt": ["MyRestApi", "RootResourceId"]}]]}, + 'Resource': [ + {"Fn::Sub": "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${MyRestApi}/*/*/*"}, + {"Fn::Sub": "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${MyRestApi}/*/GET/"} + ], + 'Principal': '*', + 'Condition': { + 'NotIpAddress': { + 'aws:SourceIp': [ + "192.0.2.0/24", + "198.51.100.0/24" + ] + } + } + } + ] +} + + +class WhenParsingAnApiGatewayRestApiPolicyAndValidatingSchema(unittest.TestCase): + @mock_node_evaluator_setup() + def test_with_no_properties(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi' + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Properties', 'Resources.ResourceA'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_name(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Policy': copy.deepcopy(api_gateway_policy_with_no_reference) + } + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Name', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_name_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': ['MyRestApi'], + 'Policy': copy.deepcopy(api_gateway_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.Name', 'string', "['MyRestApi']"), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_policy_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': 'MyRestApi', + 'Policy': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.Policy', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_unsupported_function_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': 'MyRestApi', + 'Policy': copy.deepcopy(api_gateway_policy_with_no_reference), + 'UnusedProperty': {"Fn::GetAZs": {"Ref": "AWS::Region"}} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + @mock_node_evaluator_setup() + def test_with_ref_to_parameter_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': 'MyRestApi', + 'Policy': copy.deepcopy(api_gateway_policy_with_no_reference), + 'UnusedProperty': {'Ref': 'SomeProperty'} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + +class WhenParsingAnApiGatewayRestApiPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': 'MyRestApi', + 'Policy': copy.deepcopy(api_gateway_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyRestApi", resource.ResourceName) + self.assertEqual('AWS::ApiGateway::RestApi', resource.ResourceType) + + self.assertEqual('Policy', resource.Policy.Name) + self.assertEqual(api_gateway_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingAnApiGatewayRestApiPolicyWithReferencesInEachField(unittest.TestCase): + rest_api_id = "a1bcdef2gh" + root_resource_id="qqfd3vd4gd" + + def setUp(self): + if get_test_mode() == "OFFLINE": + return + # Create a real API Gateway for testing + self.apigateway_client = client.build('apigateway', account_config.region) + + # Create a test API + response = self.apigateway_client.create_rest_api( + name='MyCustomRestApi', + description='Test API for policy validator' + ) + self.rest_api_id = response['id'] + self.root_resource_id = response['rootResourceId'] + + def tearDown(self): + if get_test_mode() == "OFFLINE": + return + # Clean up the test API + if hasattr(self, 'rest_api_id'): + self.apigateway_client.delete_rest_api( + restApiId=self.rest_api_id + ) + + + # this is a test to ensure that each field is being evaluated for references in a rest api + @mock_node_evaluator_setup( + apigateway=[ + BotoResponse( + method='get_rest_apis', + service_response= { + 'items': [ + { + 'id': rest_api_id, + 'name': 'MyCustomRestApi', + 'rootResourceId': root_resource_id + } + ] + }, + expected_params=None + ) + ] + ) + def test_returns_a_resource_with_references_resolved(self): + template = load_resources({ + 'MyRestApi': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': 'MyCustomRestApi' + } + }, + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': {"Ref": "MyRestApi"}, + 'Policy': copy.deepcopy(api_gateway_policy_with_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual(self.rest_api_id, resource.ResourceName) + self.assertEqual('AWS::ApiGateway::RestApi', resource.ResourceType) + + expected_policy = copy.deepcopy(api_gateway_policy_with_reference) + expected_policy['Statement'][0]['Resource'] = [ + f'arn:aws:execute-api:{account_config.region}:{account_config.account_id}:{self.rest_api_id}/*/*/*', + f'arn:aws:execute-api:{account_config.region}:{account_config.account_id}:{self.rest_api_id}/*/GET/' + ] + expected_policy['Statement'][0]['Sid'] = f'Policy-for-{self.rest_api_id}-{self.root_resource_id}' + self.assertEqual('Policy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingAnApiGatewayRestApiWithNoPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_no_resources(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::ApiGateway::RestApi', + 'Properties': { + 'Name': 'MyRestApi' + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 0) \ No newline at end of file diff --git a/cfn_policy_validator/tests/parsers_tests/resource_tests/test_backup_backup_vault_policy.py b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_backup_backup_vault_policy.py new file mode 100644 index 0000000..bfbb80b --- /dev/null +++ b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_backup_backup_vault_policy.py @@ -0,0 +1,233 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" +import copy +import unittest + +from cfn_policy_validator.parsers.resource.parser import ResourceParser +from cfn_policy_validator.tests.parsers_tests import mock_node_evaluator_setup + +from cfn_policy_validator.tests.utils import required_property_error, load, account_config, expected_type_error, \ + load_resources +from cfn_policy_validator.application_error import ApplicationError + + +backup_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 'backup:*', + 'Resource': 'arn:aws:backup:us-east-1:123456789012:backup-vault:MyTestVault', + 'Principal': '*', + 'Condition': { + 'ArnEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::123456789012:role/MyTestRoleArn" + ] + } + } + } + ] +} + + +backup_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Deny', + 'Action': 'backup:*', + 'Resource': [ + {"Fn::GetAtt": ["MyBackupVault", "BackupVaultArn"]}, + {"Fn::Sub": 'arn:aws:backup:::backup-vault:${MyBackupVault}'} + ], + 'Principal': '*', + 'Condition': { + 'ArnNotEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::123456789012:role/MyTestRoleArn" + ] + } + } + } + ] +} + +class WhenParsingABackupVaultPolicyAndValidatingSchema(unittest.TestCase): + @mock_node_evaluator_setup() + def test_with_no_properties(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault' + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Properties', 'Resources.ResourceA'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_backup_vault_name(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'AccessPolicy': copy.deepcopy(backup_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('BackupVaultName', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_backup_vault_name_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': ['MyVault'], + 'AccessPolicy': copy.deepcopy(backup_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.BackupVaultName', 'string', "['MyVault']"), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_access_policy_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': 'MyVault', + 'AccessPolicy': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.AccessPolicy', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_unsupported_function_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': 'MyVault', + 'AccessPolicy': copy.deepcopy(backup_policy_with_no_reference), + 'UnusedProperty': {"Fn::GetAZs": {"Ref": "AWS::Region"}} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + @mock_node_evaluator_setup() + def test_with_ref_to_parameter_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': 'MyVault', + 'AccessPolicy': copy.deepcopy(backup_policy_with_no_reference), + 'UnusedProperty': {'Ref': 'SomeProperty'} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + +class WhenParsingABackupVaultPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load_resources({ + 'TestVault': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': 'MyVault', + 'AccessPolicy': copy.deepcopy(backup_policy_with_no_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyVault", resource.ResourceName) + self.assertEqual('AWS::Backup::BackupVault', resource.ResourceType) + + self.assertEqual('AccessPolicy', resource.Policy.Name) + self.assertEqual(backup_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingABackupVaultPolicyWithReferencesInEachField(unittest.TestCase): + # this is a test to ensure that each field is being evaluated for references in a backup vault + @mock_node_evaluator_setup() + def test_returns_a_resource_with_references_resolved(self): + template = load_resources({ + 'MyBackupVault': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': 'MyCustomVaultName' + } + }, + 'ResourceA': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': {'Ref': 'MyBackupVault'}, + 'AccessPolicy': copy.deepcopy(backup_policy_with_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyCustomVaultName", resource.ResourceName) + self.assertEqual('AWS::Backup::BackupVault', resource.ResourceType) + + expected_policy = copy.deepcopy(backup_policy_with_reference) + expected_policy['Statement'][0]['Resource'] = [ + f'arn:aws:backup:{account_config.region}:{account_config.account_id}:backup-vault:MyCustomVaultName', + 'arn:aws:backup:::backup-vault:MyCustomVaultName' + ] + self.assertEqual('AccessPolicy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingABackupVaultWithNoAccessPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_no_resources(self): + template = load_resources({ + 'TestVault': { + 'Type': 'AWS::Backup::BackupVault', + 'Properties': { + 'BackupVaultName': 'MyVault' + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 0) \ No newline at end of file diff --git a/cfn_policy_validator/tests/parsers_tests/resource_tests/test_cloud_trail_resource_policy.py b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_cloud_trail_resource_policy.py new file mode 100644 index 0000000..60787a2 --- /dev/null +++ b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_cloud_trail_resource_policy.py @@ -0,0 +1,620 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" +import copy +import unittest +from datetime import datetime + +from cfn_policy_validator import client +from cfn_policy_validator.parsers.resource.parser import ResourceParser +from cfn_policy_validator.tests.parsers_tests import mock_node_evaluator_setup + +from cfn_policy_validator.tests.utils import required_property_error, load, account_config, expected_type_error, \ + load_resources +from cfn_policy_validator.application_error import ApplicationError +from cfn_policy_validator.tests.boto_mocks import BotoResponse, get_test_mode + +dashboard_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 'cloudtrail:StartDashboardRefresh', + 'Resource': 'arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyTestDashboard', + 'Principal': '*', + 'Condition': { + 'ArnEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::971691587463:role/MyTestRoleArn" + ] + } + } + } + ] +} + + +dashboard_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 'cloudtrail:StartDashboardRefresh', + 'Resource': [ + {"Fn::GetAtt": ["MyDashboard", "DashboardArn"]}, + {"Fn::Sub": 'arn:aws:cloudtrail::${AWS::AccountId}:dashboard/${MyDashboard}'} + ], + 'Principal': '*', + 'Condition': { + 'StringEquals': { + 'aws:ResourceTag/DashboardStatus': {'Fn::GetAtt': ['MyDashboard', 'Status']}, + 'aws:ResourceTag/DashboardType': {'Fn::GetAtt': ['MyDashboard', 'Type']} + }, + 'DateGreaterThanEquals': { + 'aws:TokenIssueTime': {'Fn::GetAtt': ['MyDashboard', 'CreatedTimestamp']} + }, + 'DateLessThanEquals': { + 'aws:TokenIssueTime': {'Fn::GetAtt': ['MyDashboard', 'UpdatedTimestamp']} + } + } + } + ] +} + + +eventdatastore_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': [ + 'cloudtrail:StartQuery', + 'cloudtrail:GetQueryResults' + ], + 'Resource': 'arn:aws:cloudtrail:us-east-1:123456789012:eventdatastore/MyTestEventDataStore', + 'Principal': '*', + 'Condition': { + 'ArnEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::971691587463:role/MyTestRoleArn" + ] + } + } + } + ] +} + +eventdatastore_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 'cloudtrail:StartQuery', + 'Resource': [ + {"Fn::GetAtt": ["MyEventDataStore", "EventDataStoreArn"]} + ], + 'Principal': '*', + 'Condition': { + 'StringEquals': { + "aws:ResourceTag/EventDataStoreStatus": [ + {'Fn::GetAtt': ['MyEventDataStore', 'Status']} + ] + }, + 'DateGreaterThanEquals': { + "aws:TokenIssueTime": [ + {'Fn::GetAtt': ['MyEventDataStore', 'CreatedTimestamp']} + ] + }, + 'DateLessThanEquals': { + "aws:TokenIssueTime": [ + {'Fn::GetAtt': ['MyEventDataStore', 'UpdatedTimestamp']} + ] + } + } + } + ] +} + + +class WhenParsingACloudTrailResourcePolicyAndValidatingSchema(unittest.TestCase): + @mock_node_evaluator_setup() + def test_with_no_properties(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy' + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Properties', 'Resources.ResourceA'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_resource_arn(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourcePolicy': copy.deepcopy(dashboard_policy_with_no_reference) + } + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('ResourceArn', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_resource_arn_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': ['arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard'], + 'ResourcePolicy': copy.deepcopy(dashboard_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.ResourceArn', 'string', "['arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard']"), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_resource_policy(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': 'arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard' + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('ResourcePolicy', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_resource_policy_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': 'arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard', + 'ResourcePolicy': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.ResourcePolicy', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_unsupported_function_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': 'arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard', + 'ResourcePolicy': copy.deepcopy(dashboard_policy_with_no_reference), + 'UnusedProperty': {"Fn::GetAZs": {"Ref": "AWS::Region"}} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + @mock_node_evaluator_setup() + def test_with_ref_to_parameter_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': 'arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard', + 'ResourcePolicy': copy.deepcopy(dashboard_policy_with_no_reference), + 'UnusedProperty': {'Ref': 'SomeProperty'} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + +class WhenParsingACloudTrailDashboardPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': 'arn:aws:cloudtrail:us-east-1:123456789012:dashboard/MyDashboard', + 'ResourcePolicy': copy.deepcopy(dashboard_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyDashboard", resource.ResourceName) + self.assertEqual('AWS::CloudTrail::Dashboard', resource.ResourceType) + + self.assertEqual('ResourcePolicy', resource.Policy.Name) + self.assertEqual(dashboard_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingACloudTrailPolicyWithReferencesInEachField(unittest.TestCase): + # this is a test to ensure that each field is being evaluated for references in a dashboard + import random + + dashboardCreatedTimestamp = datetime(2025, 5, 15, 0, 27, 21, 914000) + dashboardUpdatedTimestamp = datetime(2025, 5, 15, 5, 0, 2, 184000) + dashboardType = 'CUSTOM' + dashboardStatus = 'UPDATED' + dashboardARN = f'arn:aws:cloudtrail:{account_config.region}:{account_config.account_id}:dashboard/MyCustomDashboardName' + + eventStoreRandNum = random.randint(1, 100) + eventDataStoreName = 'MyCustomEventDataStoreName-' + str(eventStoreRandNum) + eventStoreCreatedTimestamp = datetime(2025, 5, 15, 0, 27, 21, 914000) + eventStoreStatus = 'ENABLED' + eventStoreUpdatedTimestamp = datetime(2025, 5, 15, 5, 0, 2, 184000) + eventStoreId = 'ba4c2eed-6713-40cc-8fae-a7b2fb05897f' + eventStoreArn = f'arn:aws:cloudtrail:{account_config.region}:{account_config.account_id}:eventdatastore/{eventStoreId}' + + @classmethod + def setUpClass(cls): + if get_test_mode() == "OFFLINE": + return + cls.dashboard_created = False + cls.eventstore_created = False + try: + cloud_trail_client = client.build('cloudtrail', account_config.region) + + # Create an EventDataStore + response = cloud_trail_client.create_event_data_store( + Name=cls.eventDataStoreName, + TerminationProtectionEnabled=False) + cls.eventStoreArn = response['EventDataStoreArn'] + cls.eventStoreId = response['EventDataStoreArn'].split('/')[-1] + cls.eventStoreCreatedTimestamp = response['CreatedTimestamp'] + cls.eventStoreUpdatedTimestamp = response['UpdatedTimestamp'] + cls.eventStoreStatus = response['Status'] + cls.eventstore_created = True + print(f'Created EventDataStore with the following attributes: Name: {cls.eventDataStoreName}, EventDataStoreARN: {cls.eventStoreArn}'+ + f'EventDataStoreId: {cls.eventStoreId}, EventDataStoreStatus: {cls.eventStoreStatus}, CreatedTimeStamp: {cls.eventStoreCreatedTimestamp}' + + f'UpdatedTimeStamp: {cls.eventStoreUpdatedTimestamp}' + ) + + # Create a Dashboard + response = cloud_trail_client.create_dashboard( + Name='MyCustomDashboardName', + TerminationProtectionEnabled=False) + cls.dashboardARN = response['DashboardArn'] + cls.dashboardType = response['Type'] + response = cloud_trail_client.get_dashboard( + DashboardId='MyCustomDashboardName') + cls.dashboardStatus = response['Status'] + cls.dashboardCreatedTimestamp = response['CreatedTimestamp'] + cls.dashboardUpdatedTimestamp = response['UpdatedTimestamp'] + cls.dashboard_created = True + print(f'Created Dashboard with the following attributes: DashboardARN: {cls.dashboardARN}'+ + f'DashboardStatus: {cls.dashboardStatus}, CreatedTimeStamp: {cls.dashboardCreatedTimestamp}' + + f'UpdatedTimeStamp: {cls.dashboardUpdatedTimestamp}' + ) + except Exception as e: + print(f"Error in setUpClass: {str(e)}") + # Let the exception propagate after we've recorded what was created + raise + @classmethod + def tearDownClass(cls): + if get_test_mode() == "OFFLINE": + return + cloud_trail_client = client.build('cloudtrail', account_config.region) + if hasattr(cls, 'dashboard_created') and cls.dashboard_created: + try: + # Clean up the Dashboard + cloud_trail_client.delete_dashboard( + DashboardId='MyCustomDashboardName') + print(f'Cleaned up Dashboard: MyCustomDashboardName') + except Exception as e: + print(f"Error deleting dashboard: {str(e)}") + + if hasattr(cls, 'eventstore_created') and cls.eventstore_created and cls.eventStoreArn: + try: + # Clean up the EventDataStore + cloud_trail_client.delete_event_data_store( + EventDataStore=cls.eventStoreArn) + print(f'Cleaned up EventDataStore: {cls.eventDataStoreName}') + except Exception as e: + print(f"Error deleting event data store: {str(e)}") + + @mock_node_evaluator_setup( + cloudtrail=[ + BotoResponse( + method='get_dashboard', + service_response= { + 'DashboardArn': dashboardARN, + 'Type': dashboardType, + 'Status': dashboardStatus, + 'CreatedTimestamp': dashboardCreatedTimestamp, + 'UpdatedTimestamp': dashboardUpdatedTimestamp + }, + expected_params={ + 'DashboardId': 'MyCustomDashboardName' + } + ), + BotoResponse( + method='get_dashboard', + service_response= { + 'DashboardArn': dashboardARN, + 'Type': dashboardType, + 'Status': dashboardStatus, + 'CreatedTimestamp': dashboardCreatedTimestamp, + 'UpdatedTimestamp': dashboardUpdatedTimestamp + }, + expected_params={ + 'DashboardId': 'MyCustomDashboardName' + } + ), + BotoResponse( + method='get_dashboard', + service_response= { + 'DashboardArn': dashboardARN, + 'Type': dashboardType, + 'Status': dashboardStatus, + 'CreatedTimestamp': dashboardCreatedTimestamp, + 'UpdatedTimestamp': dashboardUpdatedTimestamp + }, + expected_params={ + 'DashboardId': 'MyCustomDashboardName' + } + ), + BotoResponse( + method='get_dashboard', + service_response= { + 'DashboardArn': dashboardARN, + 'Type': dashboardType, + 'Status': dashboardStatus, + 'CreatedTimestamp': dashboardCreatedTimestamp, + 'UpdatedTimestamp': dashboardUpdatedTimestamp + }, + expected_params={ + 'DashboardId': 'MyCustomDashboardName' + } + ) + ] + + ) + def test_returns_a_dashboard_resource_with_references_resolved(self): + template = load_resources({ + 'MyDashboard': { + 'Type': 'AWS::CloudTrail::Dashboard', + 'Properties': { + 'Name': 'MyCustomDashboardName' + } + }, + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': {'Fn::GetAtt': ['MyDashboard', 'DashboardArn']}, + 'ResourcePolicy': copy.deepcopy(dashboard_policy_with_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyCustomDashboardName", resource.ResourceName) + self.assertEqual('AWS::CloudTrail::Dashboard', resource.ResourceType) + + expected_policy = copy.deepcopy(dashboard_policy_with_reference) + expected_policy['Statement'][0]['Resource'] = [ + f'arn:aws:cloudtrail:{account_config.region}:{account_config.account_id}:dashboard/MyCustomDashboardName', + f'arn:aws:cloudtrail::{account_config.account_id}:dashboard/MyCustomDashboardName' + ] + expected_policy['Statement'][0]['Condition']['StringEquals'] = { + 'aws:ResourceTag/DashboardStatus': self.dashboardStatus, + 'aws:ResourceTag/DashboardType': self.dashboardType + } + expected_policy['Statement'][0]['Condition']['DateGreaterThanEquals'] = { + 'aws:TokenIssueTime': self.dashboardCreatedTimestamp + } + expected_policy['Statement'][0]['Condition']['DateLessThanEquals'] = { + 'aws:TokenIssueTime': self.dashboardUpdatedTimestamp + } + self.assertEqual('ResourcePolicy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + + @mock_node_evaluator_setup( + cloudtrail=[ + BotoResponse( + method='list_event_data_stores', + service_response= { + 'EventDataStores': [ + { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName + } + ] + }, + expected_params=None + ), + BotoResponse( + method='list_event_data_stores', + service_response= { + 'EventDataStores': [ + { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName + } + ] + }, + expected_params=None + ), + BotoResponse( + method='list_event_data_stores', + service_response= { + 'EventDataStores': [ + { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName + } + ] + }, + expected_params=None + ), + BotoResponse( + method='get_event_data_store', + service_response= { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName, + 'Status': eventStoreStatus, + 'CreatedTimestamp': eventStoreCreatedTimestamp, + 'UpdatedTimestamp': eventStoreUpdatedTimestamp + }, + expected_params={ + 'EventDataStore': eventStoreArn + } + ), + BotoResponse( + method='list_event_data_stores', + service_response= { + 'EventDataStores': [ + { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName + } + ] + }, + expected_params=None + ), + BotoResponse( + method='get_event_data_store', + service_response= { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName, + 'Status': eventStoreStatus, + 'CreatedTimestamp': eventStoreCreatedTimestamp, + 'UpdatedTimestamp': eventStoreUpdatedTimestamp + }, + expected_params={ + 'EventDataStore': eventStoreArn + } + ), + BotoResponse( + method='list_event_data_stores', + service_response= { + 'EventDataStores': [ + { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName + } + ] + }, + expected_params=None + ), + BotoResponse( + method='get_event_data_store', + service_response= { + 'EventDataStoreArn': eventStoreArn, + 'Name': eventDataStoreName, + 'Status': eventStoreStatus, + 'CreatedTimestamp': eventStoreCreatedTimestamp, + 'UpdatedTimestamp': eventStoreUpdatedTimestamp + }, + expected_params={ + 'EventDataStore': eventStoreArn + } + ) + ] + ) + def test_returns_an_event_ds_resource_with_references_resolved(self): + template = load_resources({ + 'MyEventDataStore': { + 'Type': 'AWS::CloudTrail::EventDataStore', + 'Properties': { + 'Name': self.eventDataStoreName + } + }, + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': {'Fn::GetAtt': ['MyEventDataStore', 'EventDataStoreArn']}, + 'ResourcePolicy': copy.deepcopy(eventdatastore_policy_with_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual(self.eventStoreId, resource.ResourceName) + self.assertEqual('AWS::CloudTrail::EventDataStore', resource.ResourceType) + + expected_policy = copy.deepcopy(eventdatastore_policy_with_reference) + expected_policy['Statement'][0]['Resource'] = [ + self.eventStoreArn + ] + expected_policy['Statement'][0]['Condition']['StringEquals'] = { + # Status moves from CREATED -> ENABLED + 'aws:ResourceTag/EventDataStoreStatus': ['ENABLED'] + } + expected_policy['Statement'][0]['Condition']['DateGreaterThanEquals'] = { + 'aws:TokenIssueTime': [self.eventStoreCreatedTimestamp] + } + expected_policy['Statement'][0]['Condition']['DateLessThanEquals'] = { + 'aws:TokenIssueTime': [self.eventStoreUpdatedTimestamp] + } + self.assertEqual('ResourcePolicy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + +class WhenParsingACloudTrailEventDataStorePolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::CloudTrail::ResourcePolicy', + 'Properties': { + 'ResourceArn': 'arn:aws:cloudtrail:us-east-1:123456789012:eventdatastore/MyEventDataStore', + 'ResourcePolicy': copy.deepcopy(eventdatastore_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyEventDataStore", resource.ResourceName) + self.assertEqual('AWS::CloudTrail::EventDataStore', resource.ResourceType) + + self.assertEqual('ResourcePolicy', resource.Policy.Name) + self.assertEqual(eventdatastore_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) \ No newline at end of file diff --git a/cfn_policy_validator/tests/parsers_tests/resource_tests/test_code_artifact_domain_policy.py b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_code_artifact_domain_policy.py new file mode 100644 index 0000000..e79a597 --- /dev/null +++ b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_code_artifact_domain_policy.py @@ -0,0 +1,252 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" +import copy +import unittest + +from cfn_policy_validator.parsers.resource.parser import ResourceParser +from cfn_policy_validator.tests.parsers_tests import mock_node_evaluator_setup + +from cfn_policy_validator.tests.utils import required_property_error, load, account_config, expected_type_error, \ + load_resources +from cfn_policy_validator.application_error import ApplicationError + + +code_artifact_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 'codeartifact:*', + 'Resource': 'arn:aws:codeartifact:us-east-1:123456789012:domain/MyTestDomain', + 'Principal': '*', + 'Condition': { + 'ArnEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::123456789012:role/MyTestRoleArn" + ] + } + } + } + ] +} + + +code_artifact_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Deny', + 'Action': 'codeartifact:*', + 'Sid': {"Fn::Join": ["", ["Policy-for-", {"Fn::GetAtt": ["MyDomain", "Name"]}]]}, + 'Resource': [ + {"Fn::GetAtt": ["MyDomain", "Arn"]} + ], + 'Principal': '*', + 'Condition': { + 'ArnNotEquals': { + 'aws:PrincipalArn': [ + {"Fn::Join": ["", ["arn:aws:iam::", {"Fn::GetAtt": ["MyDomain", "Owner"]}, ":role/MyTestRoleArn"]]} + ] + } + } + }, + { + "Effect": "Allow", + "Principal": '*', + "Action": "kms:Decrypt", + "Resource": [ + {"Fn::GetAtt": ["MyDomain", "EncryptionKey"]} + ] + } + ] +} + + +class WhenParsingACodeArtifactDomainPolicyAndValidatingSchema(unittest.TestCase): + @mock_node_evaluator_setup() + def test_with_no_properties(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain' + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Properties', 'Resources.ResourceA'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_domain_name(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'PermissionsPolicyDocument': copy.deepcopy(code_artifact_policy_with_no_reference) + } + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('DomainName', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_domain_name_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': ['MyDomain'], + 'PermissionsPolicyDocument': copy.deepcopy(code_artifact_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.DomainName', 'string', "['MyDomain']"), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_permissions_policy_document_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': 'MyDomain', + 'PermissionsPolicyDocument': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.PermissionsPolicyDocument', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_unsupported_function_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': 'MyDomain', + 'PermissionsPolicyDocument': copy.deepcopy(code_artifact_policy_with_no_reference), + 'UnusedProperty': {"Fn::GetAZs": {"Ref": "AWS::Region"}} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + @mock_node_evaluator_setup() + def test_with_ref_to_parameter_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': 'MyDomain', + 'PermissionsPolicyDocument': copy.deepcopy(code_artifact_policy_with_no_reference), + 'UnusedProperty': {'Ref': 'SomeProperty'} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + +class WhenParsingACodeArtifactDomainPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': 'MyDomain', + 'PermissionsPolicyDocument': copy.deepcopy(code_artifact_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyDomain", resource.ResourceName) + self.assertEqual('AWS::CodeArtifact::Domain', resource.ResourceType) + + self.assertEqual('PermissionsPolicyDocument', resource.Policy.Name) + self.assertEqual(code_artifact_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingACodeArtifactDomainPolicyWithReferencesInEachField(unittest.TestCase): + # this is a test to ensure that each field is being evaluated for references in a domain + @mock_node_evaluator_setup() + def test_returns_a_resource_with_references_resolved(self): + template = load_resources({ + 'MyDomain': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': 'MyCustomDomainName', + 'EncryptionKey': 'arn:aws:kms:us-west-2:123456789012:key/12345678-9abc-def1-2345-6789abcdef12' + } + }, + 'ResourceA': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': {'Ref': 'MyDomain'}, + 'PermissionsPolicyDocument': copy.deepcopy(code_artifact_policy_with_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual(f'arn:aws:codeartifact:{account_config.region}:{account_config.account_id}:domain/MyDomain', resource.ResourceName) + self.assertEqual('AWS::CodeArtifact::Domain', resource.ResourceType) + + expected_policy = copy.deepcopy(code_artifact_policy_with_reference) + expected_policy['Statement'][0]['Sid'] = f'Policy-for-MyCustomDomainName' + expected_policy['Statement'][0]['Resource'] = [ + f'arn:aws:codeartifact:{account_config.region}:{account_config.account_id}:domain/MyDomain' + ] + expected_policy['Statement'][0]['Condition']['ArnNotEquals']['aws:PrincipalArn'][0] = f'arn:aws:iam::{account_config.account_id}:role/MyTestRoleArn' + expected_policy['Statement'][1]['Resource'] = [ + f'arn:aws:kms:us-west-2:123456789012:key/12345678-9abc-def1-2345-6789abcdef12' + ] + self.assertEqual('PermissionsPolicyDocument', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + +class WhenParsingACodeArtifactDomainWithNoPermissionsPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_no_resources(self): + template = load({ + 'Resources': { + 'TestDomain': { + 'Type': 'AWS::CodeArtifact::Domain', + 'Properties': { + 'DomainName': 'MyDomain' + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 0) diff --git a/cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_express_access_point_policy.py b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_express_access_point_policy.py new file mode 100644 index 0000000..f517ce9 --- /dev/null +++ b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_express_access_point_policy.py @@ -0,0 +1,300 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" +import copy +import unittest + +from cfn_policy_validator.parsers.resource.parser import ResourceParser +from cfn_policy_validator.tests.parsers_tests import mock_node_evaluator_setup + +from cfn_policy_validator.tests.utils import required_property_error, load, account_config, expected_type_error, \ + load_resources +from cfn_policy_validator.application_error import ApplicationError + + +s3_express_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 's3express:*', + 'Resource': 'arn:aws:s3express:us-east-1:123456789012:accesspoint/MyTestAccessPoint', + 'Principal': '*', + 'Condition': { + 'ArnEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::971691587463:role/MyTestRoleArn" + ] + } + } + } + ] +} + + +s3_express_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Deny', + 'Action': 's3express:*', + 'Resource': [ + {"Fn::GetAtt": ["MyAccessPoint", "Arn"]}, + {"Fn::Sub": 'arn:aws:s3express:${AWS::Region}:${AWS::AccountId}:accesspoint/${MyAccessPoint}'} + ], + 'Principal': '*', + 'Condition': { + 'StringEquals': { + 's3express:AccessPointNetworkOrigin': {"Fn::GetAtt": ["MyAccessPoint", "NetworkOrigin"]} + } + } + } + ] +} + + +class WhenParsingAnS3ExpressAccessPointPolicyAndValidatingSchema(unittest.TestCase): + @mock_node_evaluator_setup() + def test_with_no_properties(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint' + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Properties', 'Resources.ResourceA'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_name_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': ['MyAccessPoint'], + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.Name', 'string', "['MyAccessPoint']"), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_policy_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint', + 'Policy': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.Policy', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_vpc_configuration_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint', + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference), + 'VpcConfiguration': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.VpcConfiguration', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_unsupported_function_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint', + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference), + 'UnusedProperty': {"Fn::GetAZs": {"Ref": "AWS::Region"}} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + @mock_node_evaluator_setup() + def test_with_ref_to_parameter_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint', + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference), + 'UnusedProperty': {'Ref': 'SomeProperty'} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + +class WhenParsingAnS3ExpressAccessPointPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint', + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyAccessPoint", resource.ResourceName) + self.assertEqual('AWS::S3Express::AccessPoint', resource.ResourceType) + + self.assertEqual('Policy', resource.Policy.Name) + self.assertEqual(s3_express_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingAnS3ExpressAccessPointPolicyWithReferencesInEachField(unittest.TestCase): + # this is a test to ensure that each field is being evaluated for references in an access point + @mock_node_evaluator_setup() + def test_returns_a_resource_with_references_resolved(self): + template = load_resources({ + 'MyAccessPoint': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyCustomAccessPointName' + } + }, + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': {'Ref': 'MyAccessPoint'}, + 'Policy': copy.deepcopy(s3_express_policy_with_reference), + 'VpcConfiguration': { + 'VpcId': 'vpc-0a53287fa4EXAMPLE' + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyCustomAccessPointName", resource.ResourceName) + self.assertEqual('AWS::S3Express::AccessPoint', resource.ResourceType) + + expected_policy = copy.deepcopy(s3_express_policy_with_reference) + expected_policy['Statement'][0]['Resource'] = [ + f'arn:aws:s3express:{account_config.region}:{account_config.account_id}:accesspoint/MyCustomAccessPointName', + f'arn:aws:s3express:{account_config.region}:{account_config.account_id}:accesspoint/MyCustomAccessPointName' + ] + expected_policy['Statement'][0]['Condition']['StringEquals'] = { + 's3express:AccessPointNetworkOrigin': 'Internet' + } + self.assertEqual('Policy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + self.assertIsNotNone(resource.Configuration) + self.assertEqual('vpc-0a53287fa4EXAMPLE', resource.Configuration['VpcId']) + + +class WhenParsingAnS3ExpressAccessPointPolicyWithAnImplicitAccessPointName(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("ResourceA", resource.ResourceName) + self.assertEqual('AWS::S3Express::AccessPoint', resource.ResourceType) + + expected_policy = s3_express_policy_with_no_reference + self.assertEqual('Policy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingAnS3ExpressAccessPointAndThereIsNoPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_no_resources(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint' + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 0) + + +class WhenParsingAnS3ExpressAccessPointAndThereIsNoVpcConfiguration(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_resource_without_metadata(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::S3Express::AccessPoint', + 'Properties': { + 'Name': 'MyAccessPoint', + 'Policy': copy.deepcopy(s3_express_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyAccessPoint", resource.ResourceName) + self.assertEqual('AWS::S3Express::AccessPoint', resource.ResourceType) + + self.assertEqual('Policy', resource.Policy.Name) + self.assertEqual(s3_express_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + self.assertEqual(0, len(resource.Configuration)) \ No newline at end of file diff --git a/cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_tables_table_bucket_policy.py b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_tables_table_bucket_policy.py new file mode 100644 index 0000000..e0b8b26 --- /dev/null +++ b/cfn_policy_validator/tests/parsers_tests/resource_tests/test_s3_tables_table_bucket_policy.py @@ -0,0 +1,267 @@ +""" +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +SPDX-License-Identifier: MIT-0 +""" +import copy +import unittest + +from cfn_policy_validator.parsers.resource.parser import ResourceParser +from cfn_policy_validator.tests.parsers_tests import mock_node_evaluator_setup + +from cfn_policy_validator.tests.utils import required_property_error, load, account_config, expected_type_error, \ + load_resources +from cfn_policy_validator.application_error import ApplicationError + + +table_bucket_policy_with_no_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Allow', + 'Action': 's3:*', + 'Resource': 'arn:aws:s3tables:::bucket/MyTestTableBucket', + 'Principal': '*', + 'Condition': { + 'ArnEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::971691587463:role/MyTestRoleArn" + ] + } + } + } + ] +} + + +table_bucket_policy_with_reference = { + 'Version': '2012-10-17', + 'Statement': [ + { + 'Effect': 'Deny', + 'Action': 's3:*', + 'Sid': {"Fn::Join": ["", ["Policy-For-", {"Ref": "MyTableBucket"}]]}, + 'Resource': [ + {"Fn::GetAtt": ["MyTableBucket", "TableBucketARN"]}, + {"Fn::Sub": 'arn:aws:s3tables:::bucket/${MyTableBucket}/*'} + ], + 'Principal': '*', + 'Condition': { + 'ArnNotEquals': { + 'aws:PrincipalArn': [ + "arn:aws:iam::971691587463:role/MyTestRoleArn" + ] + } + } + } + ] +} + + +class WhenParsingAnS3TablesTableBucketPolicyAndValidatingSchema(unittest.TestCase): + @mock_node_evaluator_setup() + def test_with_no_properties(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy' + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('Properties', 'Resources.ResourceA'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_table_bucket_arn(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_no_reference) + } + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('TableBucketARN', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_table_bucket_arn_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': ['arn:aws:s3tables:::bucket/MyTableBucket'], + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_no_reference) + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.TableBucketARN', 'string', "['arn:aws:s3tables:::bucket/MyTableBucket']"), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_no_resource_policy(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': 'arn:aws:s3tables:::bucket/MyTableBucket' + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(required_property_error('ResourcePolicy', 'Resources.ResourceA.Properties'), str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_invalid_resource_policy_type(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': 'arn:aws:s3tables:::bucket/MyTableBucket', + 'ResourcePolicy': ['Invalid'] + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual(expected_type_error('Resources.ResourceA.Properties.ResourcePolicy', 'object', "['Invalid']"), + str(cm.exception)) + + @mock_node_evaluator_setup() + def test_with_unsupported_function_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': 'arn:aws:s3tables:::bucket/MyTableBucket', + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_no_reference), + 'UnusedProperty': {"Fn::GetAZs": {"Ref": "AWS::Region"}} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + @mock_node_evaluator_setup() + def test_with_ref_to_parameter_in_unused_property(self): + template = load_resources({ + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': 'arn:aws:s3tables:::bucket/MyTableBucket', + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_no_reference), + 'UnusedProperty': {'Ref': 'SomeProperty'} + } + } + }) + + ResourceParser.parse(template, account_config) + + self.assertTrue(True, 'Should not raise error.') + + +class WhenParsingAnS3TablesTableBucketPolicy(unittest.TestCase): + @mock_node_evaluator_setup() + def test_returns_a_resource(self): + template = load({ + 'Resources': { + 'TestTableBucket': { + 'Type': 'AWS::S3Tables::TableBucket', + 'Properties': { + 'TableBucketName': 'MyTableBucket' + } + }, + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': 'arn:aws:s3tables:::bucket/MyTableBucket', + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_no_reference) + } + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyTableBucket", resource.ResourceName) + self.assertEqual('AWS::S3Tables::TableBucket', resource.ResourceType) + + self.assertEqual('TableBucketPolicy', resource.Policy.Name) + self.assertEqual(table_bucket_policy_with_no_reference, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingAnS3TablesTableBucketPolicyWithReferencesInEachField(unittest.TestCase): + # this is a test to ensure that each field is being evaluated for references in a table bucket + @mock_node_evaluator_setup() + def test_returns_a_resource_with_references_resolved(self): + template = load_resources({ + 'MyTableBucket': { + 'Type': 'AWS::S3Tables::TableBucket', + 'Properties': { + 'TableBucketName': 'MyCustomTableBucketName' + } + }, + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': {'Fn::GetAtt': ['MyTableBucket', 'TableBucketARN']}, + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_reference) + } + } + }) + + resources = ResourceParser.parse(template, account_config) + self.assertEqual(len(resources), 1) + + resource = resources[0] + self.assertEqual("MyCustomTableBucketName", resource.ResourceName) + self.assertEqual('AWS::S3Tables::TableBucket', resource.ResourceType) + + expected_policy = copy.deepcopy(table_bucket_policy_with_reference) + expected_policy['Statement'][0]['Resource'] = [ + f'arn:aws:s3tables:{account_config.region}:{account_config.account_id}:bucket/MyCustomTableBucketName', + f'arn:aws:s3tables:::bucket/MyCustomTableBucketName/*' + ] + expected_policy['Statement'][0]['Sid'] = f'Policy-For-MyCustomTableBucketName' + self.assertEqual('TableBucketPolicy', resource.Policy.Name) + self.assertEqual(expected_policy, resource.Policy.Policy) + self.assertEqual('/', resource.Policy.Path) + + +class WhenParsingAnS3TablesTableBucketPolicyWithInvalidARN(unittest.TestCase): + @mock_node_evaluator_setup() + def test_raises_error_for_invalid_arn(self): + template = load({ + 'Resources': { + 'ResourceA': { + 'Type': 'AWS::S3Tables::TableBucketPolicy', + 'Properties': { + 'TableBucketARN': 'arn:aws:s3tables:::invalid-arn', + 'ResourcePolicy': copy.deepcopy(table_bucket_policy_with_no_reference) + } + } + } + }) + + with self.assertRaises(ApplicationError) as cm: + ResourceParser.parse(template, account_config) + + self.assertEqual("Invalid value for Resources.ResourceA.Properties.TableBucketARN. Must be a valid TableBucket ARN. TableBucketARN value: arn:aws:s3tables:::invalid-arn", str(cm.exception)) \ No newline at end of file diff --git a/cfn_policy_validator/tests/test_cli.py b/cfn_policy_validator/tests/test_cli.py index 626aa93..1405d36 100644 --- a/cfn_policy_validator/tests/test_cli.py +++ b/cfn_policy_validator/tests/test_cli.py @@ -182,7 +182,7 @@ def test_prints_report(self): self.assertEqual(0, len(self.output['NonBlockingFindings'])) - self.assertEqual(3, len(self.output['BlockingFindings'])) + self.assertEqual(9, len(self.output['BlockingFindings'])) self.assert_error('SECURITY_WARNING', 'policy-analysis-CheckNoPublicAccess', 'MyKey', 'KeyPolicy') self.assert_error('SECURITY_WARNING', 'policy-analysis-CheckNoPublicAccess', 'testBucket', 'BucketPolicy') self.assert_error('SECURITY_WARNING', 'policy-analysis-CheckNoPublicAccess', 'CodePipelineServiceRole', 'TrustPolicy') @@ -216,7 +216,7 @@ def test_prints_report(self): self.assertEqual(0, len(self.output['NonBlockingFindings'])) - self.assertEqual(1, len(self.output['BlockingFindings'])) + self.assertEqual(7, len(self.output['BlockingFindings'])) self.assert_error('SECURITY_WARNING', 'policy-analysis-CheckNoPublicAccess', 'MyKey', 'KeyPolicy') class WhenParsingArgumentsForVersion(unittest.TestCase): diff --git a/cfn_policy_validator/tests/validation_tests/test_s3_multi_region_access_point_validator.py b/cfn_policy_validator/tests/validation_tests/test_s3_multi_region_access_point_validator.py index 642e69b..39d2e10 100644 --- a/cfn_policy_validator/tests/validation_tests/test_s3_multi_region_access_point_validator.py +++ b/cfn_policy_validator/tests/validation_tests/test_s3_multi_region_access_point_validator.py @@ -255,6 +255,14 @@ def test_with_s3_multi_region_access_point_policy_with_statement_of_invalid_type ) @mock_access_analyzer_resource_setup( + MockInvalidAccessPreviewSetup( + code='MISSING_RESOURCE', + custom_validate_policy_type='AWS::S3::MultiRegionAccessPoint' + ), + MockInvalidAccessPreviewSetup( + code='MISSING_RESOURCE', + custom_validate_policy_type='AWS::S3::MultiRegionAccessPoint' + ), MockInvalidAccessPreviewSetup(custom_validate_policy_type='AWS::S3::MultiRegionAccessPoint'), MockInvalidAccessPreviewSetup(custom_validate_policy_type='AWS::S3::MultiRegionAccessPoint') ) @@ -272,15 +280,27 @@ def test_with_s3_multi_region_access_point_policy_with_no_resource(self): ) findings = validate_parser_output(self.output) - self.assert_has_findings(findings, errors=2) + self.assert_has_findings(findings, errors=4) self.assert_finding_is_equal( actual_finding=findings.errors[0], expected_policy_name='policy1', expected_resource_name='resource1', - expected_code='FAILED_ACCESS_PREVIEW_CREATION' + expected_code='MISSING_RESOURCE' ) self.assert_finding_is_equal( actual_finding=findings.errors[1], + expected_policy_name='policy1', + expected_resource_name='resource1', + expected_code='FAILED_ACCESS_PREVIEW_CREATION' + ) + self.assert_finding_is_equal( + actual_finding=findings.errors[2], + expected_policy_name='policy2', + expected_resource_name='resource2', + expected_code='MISSING_RESOURCE' + ) + self.assert_finding_is_equal( + actual_finding=findings.errors[3], expected_policy_name='policy2', expected_resource_name='resource2', expected_code='FAILED_ACCESS_PREVIEW_CREATION' diff --git a/cfn_policy_validator/validation/policy_analysis.py b/cfn_policy_validator/validation/policy_analysis.py index 3e6e8c7..7f3ac90 100644 --- a/cfn_policy_validator/validation/policy_analysis.py +++ b/cfn_policy_validator/validation/policy_analysis.py @@ -31,7 +31,14 @@ "AWS::SecretsManager::Secret", "AWS::SNS::Topic", "AWS::SQS::Queue", - "AWS::IAM::AssumeRolePolicyDocument" + "AWS::IAM::AssumeRolePolicyDocument", + "AWS::S3Tables::TableBucket", + "AWS::ApiGateway::RestApi", + "AWS::CodeArtifact::Domain", + "AWS::CloudTrail::Dashboard", + "AWS::CloudTrail::EventDataStore", + "AWS::S3Express::AccessPoint", + "AWS::Backup::BackupVault" } ASSUME_ROLE_POLICY_TYPE = "AWS::IAM::AssumeRolePolicyDocument" diff --git a/cfn_policy_validator/version.py b/cfn_policy_validator/version.py index 8063d18..3d76e76 100644 --- a/cfn_policy_validator/version.py +++ b/cfn_policy_validator/version.py @@ -2,4 +2,4 @@ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ -__version__ = '0.0.35' +__version__ = '0.0.36' diff --git a/test_files/public_access_test.yml b/test_files/public_access_test.yml index 02015cb..f5c79c8 100644 --- a/test_files/public_access_test.yml +++ b/test_files/public_access_test.yml @@ -82,6 +82,150 @@ Resources: Principal: AWS: - "*" + + ArtifactTableBucket: + Type: AWS::S3Tables::TableBucket + Properties : { + EncryptionConfiguration : { + KMSKeyArn: "arn:aws:kms:us-east-1:1122333:key/testKey1", + SSEAlgorithm: "AES256" + }, + TableBucketName : testTableBucket, + UnreferencedFileRemoval : { + NoncurrentDays : 10, + Status : Enabled, + UnreferencedDays : 50 + } + } + + ArtifactTableBucketPolicy: + Type: AWS::S3Tables::TableBucketPolicy + Properties : + TableBucketARN : arn:aws:s3tables:us-east-2:111122223333:bucket/testTableBucket, + ResourcePolicy: + Version: '2012-10-17' + Statement: + - Action: + - 's3:GetObject' + Effect: Allow + Resource: !Join + - '' + - - 'arn:aws:s3tables:us-east-2::bucket/' + - !Ref ArtifactTableBucket + - /* + Principal: + AWS: + - "*" + MyRestApi: + Type: AWS::ApiGateway::RestApi + Properties : + Body: + OpenAPI specification: null + Description: A test API + Name: MyRestAPI + Policy: + Version: '2012-10-17' + Statement: + - Action: + - 'execute-api:Invoke' + Effect: Allow + Resource: + - "arn:aws:execute-api:*:*:*/*/*/*" + Principal: + AWS: + - "*" + + MyCodeArtifactDomain: + Type: 'AWS::CodeArtifact::Domain' + Properties: + DomainName: "my-domain" + EncryptionKey: arn:aws:kms:us-west-2:123456789012:key/12345678-9abc-def1-2345-6789abcdef12 + PermissionsPolicyDocument: + Version: 2012-10-17 + Statement: + - Action: + - codeartifact:ReadFromRepository + - codeartifact:DescribePackageVersion + - codeartifact:DescribeRepository + - codeartifact:GetPackageVersionReadme + Effect: Allow + Principal: + AWS: "*" + Resource: "*" + + MyCloudTrailDashboardResourcePolicy: + Type: AWS::CloudTrail::ResourcePolicy + Properties: + ResourceArn: "arn:aws:cloudtrail:us-east-1:01234567890:dashboard/exampleDash" + ResourcePolicy: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + AWS: "*" + Action: cloudtrail:StartDashboardRefresh + + MyCloudTrailEventDataStoreResourcePolicy: + Type: AWS::CloudTrail::ResourcePolicy + Properties: + ResourceArn: "arn:aws:cloudtrail:us-east-2:123456789012:eventdatastore/EXAMPLE-f852-4e8f-8bd1-bcf6cEXAMPLE" + ResourcePolicy: + Version: 2012-10-17 + Statement: + - Effect: Allow + Sid: "policy1" + Principal: + AWS: "*" + Action: + - cloudtrail:StartQuery + - cloudtrail:GetEventDataStore + - cloudtrail:GetQueryResults + Resource: "arn:aws:cloudtrail:us-east-1:555555555555:eventdatastore/example80-699f-4045-a7d2-730dbf313ccf" + + MyS3ExpressAccessPoint: + Type: AWS::S3Express::AccessPoint + Properties: + Name: "MyS3ExpressAccessPoint" + Bucket: !Ref ArtifactBucket + PublicAccessBlockConfiguration: + BlockPublicAcls: false + BlockPublicPolicy: false + IgnorePublicAcls: false + RestrictPublicBuckets: false + Policy: + Version: 2012-10-17 + Statement: + - Action: + - 's3:GetObject' + - 's3:PutObject' + - 's3:DeleteObject' + Effect: Allow + Resource: "*" + Principal: + AWS: + - "*" + MyBackupVault: + Type: AWS::Backup::BackupVault + Properties: + BackupVaultName: "MyCustomVaultName" + + NewBackupVault: + Type: AWS::Backup::BackupVault + Properties: + BackupVaultName: !Ref MyBackupVault + AccessPolicy: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + AWS: "*" + Action: + - backup:UpdateRecoveryPointLifecycle, + - backup:DescribeRecoveryPoint, + - backup:DeleteRecoveryPoint, + - backup:GetRecoveryPointRestoreMetadata, + - backup:StartRestoreJob + Resource: "*" CodePipelineServiceRole: Type: AWS::IAM::Role