-
Notifications
You must be signed in to change notification settings - Fork 24
Adding support for new resource types in check-no-public-access api. #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| """ | ||
| Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| SPDX-License-Identifier: MIT-0 | ||
| """ | ||
|
|
||
| from cfn_policy_validator import client | ||
| from cfn_policy_validator.application_error import ApplicationError | ||
|
|
||
| def get_dashboard_created_time(region, resource_name, resource=None): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: why resource as parameter if it's anyway never going to be used?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the method call in cfn_policy_validator/parsers/utils/intrinsic_functions/fn_get_att_evaluator.py is from a map of resource types to appropriate method. For one of them resource is needed. Hence, ended up adding to all. |
||
| return get_dashboard_attribute(region, resource_name, 'CreatedTimestamp') | ||
|
|
||
| def get_dashboard_status(region, resource_name, resource=None): | ||
| return get_dashboard_attribute(region, resource_name, 'Status') | ||
|
|
||
| def get_dashboard_type(region, resource_name, resource=None): | ||
| return get_dashboard_attribute(region, resource_name, 'Type') | ||
|
|
||
| def get_dashboard_updated_time(region, resource_name, resource=None): | ||
| return get_dashboard_attribute(region, resource_name, 'UpdatedTimestamp') | ||
|
|
||
| def get_eventdatastore_arn(arn_pattern, resource_name, resource, visited_nodes, region): | ||
| return get_eventdatastore_arn_from_client(region, resource_name) | ||
|
|
||
| def get_eventdatastore_created_time(region, resource_name, resource=None): | ||
| return get_eventdatastore_attribute(region, resource_name, 'CreatedTimestamp') | ||
|
|
||
| def get_eventdatastore_status(region, resource_name, resource=None): | ||
| return get_eventdatastore_attribute(region, resource_name, 'Status') | ||
|
|
||
| def get_eventdatastore_updated_time(region, resource_name, resource=None): | ||
| return get_eventdatastore_attribute(region, resource_name, 'UpdatedTimestamp') | ||
|
|
||
|
|
||
| def get_dashboard_attribute(region, resource_name, attribute): | ||
| supported_attributes = ['Type', 'CreatedTimestamp', 'Status', 'UpdatedTimestamp'] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's define it as an enum, instead of copy-pasting the literal string values. |
||
| cloudtrail_client = client.build('cloudtrail', region) | ||
| try: | ||
| if attribute not in supported_attributes: | ||
| raise ApplicationError(f"Attribute {attribute} is not supported. Supported attributes are {supported_attributes}") | ||
| response = cloudtrail_client.get_dashboard( | ||
| DashboardId=resource_name | ||
| ) | ||
| return response[attribute] | ||
| except Exception as e: | ||
| raise ApplicationError(f"Error: {e}") | ||
|
|
||
| def get_eventdatastore_arn_from_client(region, resource_name): | ||
| cloudtrail_client = client.build('cloudtrail', region) | ||
| next_token = None | ||
| client_config = { | ||
| 'MaxResults': 25 | ||
| } | ||
|
|
||
| while True: | ||
| if next_token: | ||
| client_config['NextToken'] = nextToken | ||
| response = cloudtrail_client.list_event_data_stores(**client_config) | ||
| for eventdatastore in response['EventDataStores']: | ||
| if eventdatastore['Name'] == resource_name: | ||
| return eventdatastore['EventDataStoreArn'] | ||
| nextToken = response.get('NextToken') | ||
| if not nextToken: | ||
| break | ||
| raise ApplicationError(f"CloudTrail Event Data Store {resource_name} not found") | ||
|
|
||
|
|
||
| def get_eventdatastore_attribute(region, resource_name, attribute): | ||
| cloudtrail_client = client.build('cloudtrail', region) | ||
| supported_attributes = ['CreatedTimestamp', 'Status', 'UpdatedTimestamp'] | ||
|
|
||
| if attribute not in supported_attributes: | ||
| raise ApplicationError(f"Attribute {attribute} is not supported. Supported attributes are {supported_attributes}") | ||
|
|
||
| arn=get_eventdatastore_arn_from_client(region, resource_name) | ||
| event_data_store_response = cloudtrail_client.get_event_data_store(EventDataStore=arn) | ||
| ret = event_data_store_response[attribute] | ||
| return ret | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """ | ||
| Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| SPDX-License-Identifier: MIT-0 | ||
| """ | ||
|
|
||
| from cfn_policy_validator import ApplicationError | ||
| from cfn_policy_validator.parsers.output import Policy, Resource | ||
|
|
||
| class ApiGatewayRestApiPolicyParser: | ||
| """ AWS::ApiGateway::RestApi | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| self.rest_api_policies = [] | ||
|
|
||
| def parse(self, _, resource): | ||
| evaluated_resource = resource.eval(rest_api_policy_schema) | ||
| properties = evaluated_resource['Properties'] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably in a separate PR, but i think we will be benefited from integrating with https://github.com/keleshev/schema, rather than defining our own json based schema here |
||
|
|
||
| policy_document = properties.get('Policy') | ||
| if policy_document is None: | ||
| # we don't need to parse resources that don't have policies and policy is optional | ||
| return | ||
| name = properties['Name'] | ||
|
|
||
| policy = Policy('Policy', policy_document) | ||
| resource = Resource(name, 'AWS::ApiGateway::RestApi', policy) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we have a enum for all the different type of resource types like |
||
|
|
||
| self.rest_api_policies.append(resource) | ||
|
|
||
| def get_policies(self): | ||
| return self.rest_api_policies | ||
|
|
||
| rest_api_policy_schema = { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'Properties': { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'Policy': { | ||
| 'type': 'object' | ||
| }, | ||
| 'Name': { | ||
| 'type': 'string' | ||
| } | ||
| }, | ||
| 'required': ['Name'] | ||
| } | ||
| }, | ||
| 'required': ['Properties'] | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| """ | ||
| Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| SPDX-License-Identifier: MIT-0 | ||
| """ | ||
|
|
||
| from cfn_policy_validator import ApplicationError | ||
| from cfn_policy_validator.parsers.output import Policy, Resource | ||
|
|
||
| class BackupBackupVaultPolicyParser: | ||
| """ AWS::Backup::BackupVault | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| self.backup_vault_policies = [] | ||
|
|
||
| def parse(self, _, resource): | ||
| evaluated_resource = resource.eval(backup_vault_policy_schema) | ||
| properties = evaluated_resource['Properties'] | ||
|
|
||
| policy_document = properties.get('AccessPolicy') | ||
| if policy_document is None: | ||
| # we don't need to parse resources that don't have policies and policy is optional | ||
| return | ||
| name = properties['BackupVaultName'] | ||
|
|
||
| policy = Policy('AccessPolicy', policy_document) | ||
| resource = Resource(name, 'AWS::Backup::BackupVault', policy) | ||
| self.backup_vault_policies.append(resource) | ||
|
|
||
| def get_policies(self): | ||
| return self.backup_vault_policies | ||
|
|
||
| backup_vault_policy_schema = { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'Properties': { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'AccessPolicy': { | ||
| 'type': 'object' | ||
| }, | ||
| 'BackupVaultName': { | ||
| 'type': 'string' | ||
| } | ||
| }, | ||
| 'required': ['BackupVaultName'] | ||
| } | ||
| }, | ||
| 'required': ['Properties'] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| """ | ||
| Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| SPDX-License-Identifier: MIT-0 | ||
| """ | ||
|
|
||
| from cfn_policy_validator import ApplicationError | ||
| from cfn_policy_validator.parsers.output import Policy, Resource | ||
| import re | ||
| from typing import Tuple, Optional | ||
|
|
||
| class CloudTrailResourcePolicyParser: | ||
| """ AWS::CloudTrail::ResourcePolicy | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| self.resource_policies = [] | ||
|
|
||
| @staticmethod | ||
| def extract_cloudtrail_resource_info(arn) -> Optional[Tuple[str, str]]: | ||
| """ | ||
| Extract both the resource type and resource name from a CloudTrail ARN. | ||
|
|
||
| Args: | ||
| arn (str): The CloudTrail ARN to parse | ||
|
|
||
| Returns: | ||
| Tuple[str, str] or None: A tuple containing (resource_type, resource_name) if match found, | ||
| or None if no match | ||
| """ | ||
| # Pattern captures both resource type and resource name | ||
| pattern = r'arn:aws:cloudtrail:[^:]*:[^:]*:([^/]+)/([^/]+)' | ||
| match = re.match(pattern, arn) | ||
|
|
||
| if match: | ||
| resource_type = match.group(1) # Extract resource type | ||
| resource_name = match.group(2) # Extract resource name | ||
| return resource_type, resource_name | ||
|
|
||
| return None | ||
|
|
||
| def parse(self, _, resource): | ||
| evaluated_resource = resource.eval(resource_policy_schema) | ||
| properties = evaluated_resource['Properties'] | ||
|
|
||
| policy_document = properties['ResourcePolicy'] | ||
| resource, name = self.extract_cloudtrail_resource_info(properties['ResourceArn']) | ||
| supported_resource_types = {'dashboard': 'AWS::CloudTrail::Dashboard', 'eventdatastore':'AWS::CloudTrail::EventDataStore'} | ||
| resource_type = supported_resource_types.get(resource) | ||
| if resource_type is None: | ||
| raise ApplicationError(f"Unsupported resource type {resource}") | ||
| policy = Policy('ResourcePolicy', policy_document) | ||
| resource = Resource(name, resource_type, policy) | ||
|
|
||
| self.resource_policies.append(resource) | ||
|
|
||
| def get_policies(self): | ||
| return self.resource_policies | ||
|
|
||
| resource_policy_schema = { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'Properties': { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'ResourcePolicy': { | ||
| 'type': 'object' | ||
| }, | ||
| 'ResourceArn': { | ||
| 'type': 'string' | ||
| } | ||
| }, | ||
| 'required': ['ResourcePolicy', 'ResourceArn'] | ||
| } | ||
| }, | ||
| 'required': ['Properties'] | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """ | ||
| Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| SPDX-License-Identifier: MIT-0 | ||
| """ | ||
|
|
||
| from cfn_policy_validator import ApplicationError | ||
| from cfn_policy_validator.parsers.output import Policy, Resource | ||
|
|
||
| class CodeArtifactDomainPolicyParser: | ||
| """ AWS::CodeArtifact::Domain | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| self.code_artifact_policies = [] | ||
|
|
||
| def parse(self, _, resource): | ||
| evaluated_resource = resource.eval(code_artifact_policy_schema) | ||
| properties = evaluated_resource['Properties'] | ||
|
|
||
| policy_document = properties.get('PermissionsPolicyDocument') | ||
| if policy_document is None: | ||
| # we don't need to parse resources that don't have policies and policy is optional | ||
| return | ||
| name = properties['DomainName'] | ||
|
|
||
| policy = Policy('PermissionsPolicyDocument', policy_document) | ||
| resource = Resource(name, 'AWS::CodeArtifact::Domain', policy) | ||
|
|
||
| self.code_artifact_policies.append(resource) | ||
|
|
||
| def get_policies(self): | ||
| return self.code_artifact_policies | ||
|
|
||
| code_artifact_policy_schema = { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'Properties': { | ||
| 'type': 'object', | ||
| 'properties': { | ||
| 'PermissionsPolicyDocument': { | ||
| 'type': 'object' | ||
| }, | ||
| 'DomainName': { | ||
| 'type': 'string' | ||
| } | ||
| }, | ||
| 'required': ['DomainName'] | ||
| } | ||
| }, | ||
| 'required': ['Properties'] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xmeans supported?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes