Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
vpc_cidr=config["VPC_CIDR"],
)

# Configure security group connections for the container port
network_stack.configure_security_group_connections(container_port=80)

ecs_stack = EcsStack(
scope=cdk_app,
construct_id=f"{STACK_NAME_PREFIX}-ecs",
Expand All @@ -41,6 +44,7 @@
scope=cdk_app,
construct_id=f"{STACK_NAME_PREFIX}-load-balancer",
vpc=network_stack.vpc,
alb_security_group=network_stack.alb_security_group,
)
load_balancer_stack.add_dependency(ecs_stack)

Expand All @@ -62,6 +66,7 @@
cluster=ecs_stack.cluster,
props=app_props,
load_balancer=load_balancer_stack.alb,
ecs_security_group=network_stack.ecs_security_group,
)

cdk_app.synth()
13 changes: 11 additions & 2 deletions src/load_balancer_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ class LoadBalancerStack(cdk.Stack):
"""

def __init__(
self, scope: Construct, construct_id: str, vpc: ec2.Vpc, **kwargs
self,
scope: Construct,
construct_id: str,
vpc: ec2.Vpc,
alb_security_group: ec2.SecurityGroup,
**kwargs,
) -> None:
super().__init__(scope, construct_id, **kwargs)

self.alb = elbv2.ApplicationLoadBalancer(
self, "AppLoadBalancer", vpc=vpc, internet_facing=True
self,
"AppLoadBalancer",
vpc=vpc,
internet_facing=True,
security_group=alb_security_group,
)

# WAF to protect against common web attacks
Expand Down
69 changes: 68 additions & 1 deletion src/network_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class NetworkStack(cdk.Stack):
"""
Network for applications
Network for applications with security groups configured for least privilege access
"""

def __init__(self, scope: Construct, construct_id: str, vpc_cidr, **kwargs) -> None:
Expand All @@ -19,3 +19,70 @@ def __init__(self, scope: Construct, construct_id: str, vpc_cidr, **kwargs) -> N
self.vpc = ec2.Vpc(
self, "Vpc", max_azs=2, ip_addresses=ec2.IpAddresses.cidr(vpc_cidr)
)

# -------------------
# Create security groups with least privilege access
# -------------------

# ALB Security Group - allows traffic from internet on HTTP/HTTPS
self.alb_security_group = ec2.SecurityGroup(
self,
"AlbSecurityGroup",
vpc=self.vpc,
description="Security group for Application Load Balancer",
allow_all_outbound=False, # Restrict outbound traffic
)

# Allow HTTP from internet (required for public ALB)
self.alb_security_group.add_ingress_rule(
peer=ec2.Peer.ipv4("0.0.0.0/0"),
connection=ec2.Port.tcp(80),
description="Allow HTTP from internet",
)

# Allow HTTPS from internet (required for public ALB)
self.alb_security_group.add_ingress_rule(
peer=ec2.Peer.ipv4("0.0.0.0/0"),
connection=ec2.Port.tcp(443),
description="Allow HTTPS from internet",
)

# ECS Security Group - allows traffic only from ALB
self.ecs_security_group = ec2.SecurityGroup(
self,
"EcsSecurityGroup",
vpc=self.vpc,
description="Security group for ECS tasks",
allow_all_outbound=True, # ECS tasks need outbound for pulling images, etc.
)

def configure_security_group_connections(self, container_port: int) -> None:
"""
Configure security group connections between ALB and ECS for the specified container port.
This uses separate SecurityGroupEgress and SecurityGroupIngress resources to avoid circular dependencies.
"""
# Create explicit security group rules to avoid circular dependencies

# Allow ALB to send traffic to ECS containers
ec2.CfnSecurityGroupEgress(
self,
f"AlbToEcsEgress{container_port}",
group_id=self.alb_security_group.security_group_id,
ip_protocol="tcp",
from_port=container_port,
to_port=container_port,
destination_security_group_id=self.ecs_security_group.security_group_id,
description=f"Allow outbound to ECS containers on port {container_port}",
)

# Allow ECS containers to receive traffic from ALB
ec2.CfnSecurityGroupIngress(
self,
f"EcsFromAlbIngress{container_port}",
group_id=self.ecs_security_group.security_group_id,
ip_protocol="tcp",
from_port=container_port,
to_port=container_port,
source_security_group_id=self.alb_security_group.security_group_id,
description=f"Allow traffic from ALB on port {container_port}",
)
18 changes: 10 additions & 8 deletions src/service_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ def __init__(
vpc: ec2.Vpc,
cluster: ecs.Cluster,
props: ServiceProps,
ecs_security_group: ec2.SecurityGroup,
**kwargs,
) -> None:
super().__init__(scope, construct_id, **kwargs)

# Use the security group from the network stack
self.security_group = ecs_security_group

# allow containers default task access and s3 bucket access
task_role = iam.Role(
self,
Expand Down Expand Up @@ -126,11 +130,8 @@ def _get_secret(scope: Construct, id: str, name: str) -> sm.Secret:
health_check=props.container_healthcheck,
)

self.security_group = ec2.SecurityGroup(self, "SecurityGroup", vpc=vpc)
self.security_group.add_ingress_rule(
peer=ec2.Peer.ipv4("0.0.0.0/0"),
connection=ec2.Port.tcp(props.container_port),
)
# Note: Security group will be passed from the network stack
# to ensure least privilege access without cyclic dependencies

# attach ECS task to ECS cluster
self.service = ecs.FargateService(
Expand Down Expand Up @@ -222,13 +223,16 @@ def __init__(
cluster: ecs.Cluster,
props: ServiceProps,
load_balancer: elbv2.ApplicationLoadBalancer,
ecs_security_group: ec2.SecurityGroup,
certificate_arn: str = None,
health_check_path: str = "/",
health_check_interval: int = 1, # max is 5
enable_https: bool = False,
**kwargs,
) -> None:
super().__init__(scope, construct_id, vpc, cluster, props, **kwargs)
super().__init__(
scope, construct_id, vpc, cluster, props, ecs_security_group, **kwargs
)

if enable_https and not certificate_arn:
raise ValueError(
Expand All @@ -251,7 +255,6 @@ def __init__(
"HttpsListener",
load_balancer=load_balancer,
port=ALB_HTTPS_LISTENER_PORT,
open=True,
protocol=elbv2.ApplicationProtocol.HTTPS,
certificates=[self.cert],
)
Expand Down Expand Up @@ -294,7 +297,6 @@ def __init__(
"HttpListener",
load_balancer=load_balancer,
port=ALB_HTTP_LISTENER_PORT,
open=True,
protocol=elbv2.ApplicationProtocol.HTTP,
)

Expand Down
93 changes: 93 additions & 0 deletions tests/unit/test_load_balancer_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import aws_cdk as cdk
import aws_cdk.assertions as assertions

from src.network_stack import NetworkStack
from src.load_balancer_stack import LoadBalancerStack


def test_load_balancer_stack_created():
"""Test that the load balancer stack creates an ALB with proper security group."""
cdk_app = cdk.App()
vpc_cidr = "10.254.192.0/24"

# Create network stack with security groups
network_stack = NetworkStack(cdk_app, "NetworkStack", vpc_cidr=vpc_cidr)
network_stack.configure_security_group_connections(container_port=80)

# Create load balancer stack
load_balancer_stack = LoadBalancerStack(
scope=cdk_app,
construct_id="LoadBalancerStack",
vpc=network_stack.vpc,
alb_security_group=network_stack.alb_security_group,
)

template = assertions.Template.from_stack(load_balancer_stack)

# Check that ALB is created
template.has_resource_properties(
"AWS::ElasticLoadBalancingV2::LoadBalancer",
{
"Scheme": "internet-facing",
"Type": "application",
},
)

# Check that WAF WebACL is created
template.has_resource_properties(
"AWS::WAFv2::WebACL",
{
"Scope": "REGIONAL",
"DefaultAction": {"Allow": {}},
},
)

# Check that WAF association is created
template.has_resource("AWS::WAFv2::WebACLAssociation", {})


def test_load_balancer_waf_rules():
"""Test that WAF rules are properly configured."""
cdk_app = cdk.App()
vpc_cidr = "10.254.192.0/24"

network_stack = NetworkStack(cdk_app, "NetworkStack", vpc_cidr=vpc_cidr)
network_stack.configure_security_group_connections(container_port=80)

load_balancer_stack = LoadBalancerStack(
scope=cdk_app,
construct_id="LoadBalancerStack",
vpc=network_stack.vpc,
alb_security_group=network_stack.alb_security_group,
)

template = assertions.Template.from_stack(load_balancer_stack)

# Check that WebACL has the expected managed rule groups
template.has_resource_properties(
"AWS::WAFv2::WebACL",
{
"Rules": [
{
"Name": "AWSManagedRulesCommonRuleSet",
"Priority": 0,
"Statement": {
"ManagedRuleGroupStatement": {
"Name": "AWSManagedRulesCommonRuleSet",
"VendorName": "AWS",
}
},
},
{
"Name": "AWSManagedRulesKnownBadInputsRuleSet",
"Priority": 1,
"Statement": {
"ManagedRuleGroupStatement": {
"Name": "AWSManagedRulesKnownBadInputsRuleSet",
"VendorName": "AWS",
}
},
},
]
},
)
57 changes: 57 additions & 0 deletions tests/unit/test_network_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,60 @@ def test_vpc_created():
network = NetworkStack(app, "NetworkStack", vpc_cidr)
template = assertions.Template.from_stack(network)
template.has_resource_properties("AWS::EC2::VPC", {"CidrBlock": vpc_cidr})


def test_security_groups_created():
"""Test that ALB and ECS security groups are created with proper configurations."""
app = core.App()
vpc_cidr = "10.254.192.0/24"
network = NetworkStack(app, "NetworkStack", vpc_cidr)
template = assertions.Template.from_stack(network)

# Check that ALB security group is created
template.has_resource_properties(
"AWS::EC2::SecurityGroup",
{
"GroupDescription": "Security group for Application Load Balancer",
"SecurityGroupIngress": [
{
"CidrIp": "0.0.0.0/0",
"Description": "Allow HTTP from internet",
"FromPort": 80,
"IpProtocol": "tcp",
"ToPort": 80,
},
{
"CidrIp": "0.0.0.0/0",
"Description": "Allow HTTPS from internet",
"FromPort": 443,
"IpProtocol": "tcp",
"ToPort": 443,
},
],
},
)

# Check that ECS security group is created
template.has_resource_properties(
"AWS::EC2::SecurityGroup",
{
"GroupDescription": "Security group for ECS tasks",
},
)


def test_security_group_connections():
"""Test that security group connections are properly configured."""
app = core.App()
vpc_cidr = "10.254.192.0/24"
network = NetworkStack(app, "NetworkStack", vpc_cidr)

# Configure connections for port 80
network.configure_security_group_connections(container_port=80)

template = assertions.Template.from_stack(network)

# Verify that security group rules are created
# Note: The actual verification of cross-references between security groups
# requires more complex template assertions, but the basic structure should be present
template.resource_count_is("AWS::EC2::SecurityGroup", 2)
Loading
Loading