Skip to content

Commit 5ec4a1c

Browse files
feat(gcp): add cloudfunction_function_inside_vpc check (#11021)
Co-authored-by: Lydia Vilchez <lydiavilchezlopez@gmail.com>
1 parent bae74b8 commit 5ec4a1c

11 files changed

Lines changed: 469 additions & 0 deletions

File tree

prowler/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
1111
- `config_delegated_admin_and_org_aggregator_all_regions` check for AWS provider, verifying that AWS Config has a delegated administrator and an organization aggregator covering all AWS regions [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259)
1212
- `sagemaker_clarify_exists` check for AWS provider [(#11211)](https://github.com/prowler-cloud/prowler/pull/11211)
1313
- `cloudsql_instance_high_availability_enabled` check for GCP provider, verifying Cloud SQL primary instances use `REGIONAL` availability for automatic zone failover [(#11024)](https://github.com/prowler-cloud/prowler/pull/11024)
14+
- `cloudfunction_function_inside_vpc` check for GCP provider, verifying Cloud Functions have a Serverless VPC Access connector for private egress [(#11021)](https://github.com/prowler-cloud/prowler/pull/11021)
1415
- `identity_storage_service_level_admins_scoped` check for OCI provider CIS 3.1 control 1.15, ensuring storage service-level administrators exclude delete permissions [(#11523)](https://github.com/prowler-cloud/prowler/pull/11523)
1516
- `cosmosdb_account_automatic_failover_enabled` check for Azure provider [(#11031)](https://github.com/prowler-cloud/prowler/pull/11031)
1617
- `cosmosdb_account_backup_policy_continuous` check for Azure provider [(#11032)](https://github.com/prowler-cloud/prowler/pull/11032)

prowler/providers/gcp/services/cloudfunction/__init__.py

Whitespace-only changes.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from prowler.providers.common.provider import Provider
2+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import (
3+
CloudFunction,
4+
)
5+
6+
cloudfunction_client = CloudFunction(Provider.get_global_provider())

prowler/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
{
2+
"Provider": "gcp",
3+
"CheckID": "cloudfunction_function_inside_vpc",
4+
"CheckTitle": "Cloud Function is connected to a VPC network",
5+
"CheckType": [],
6+
"ServiceName": "cloudfunction",
7+
"SubServiceName": "",
8+
"ResourceIdTemplate": "",
9+
"Severity": "medium",
10+
"ResourceType": "cloudfunctions.googleapis.com/Function",
11+
"Description": "Cloud Functions are attached to a **Serverless VPC Access connector** so egress traffic is routed through a private VPC network instead of the public internet.\n\nThe evaluation reviews each function's network configuration to confirm that a connector is configured.",
12+
"Risk": "Without a VPC connector, Cloud Functions cannot privately reach internal resources such as `Cloud SQL`, `Memorystore`, or `GKE`, forcing those services to be exposed over public IPs. This expands the **attack surface**, weakens **confidentiality** of internal traffic, and breaks network segmentation controls required by most security frameworks.",
13+
"RelatedUrl": "",
14+
"AdditionalURLs": [
15+
"https://cloud.google.com/functions/docs/networking/connecting-vpc",
16+
"https://cloud.google.com/vpc/docs/serverless-vpc-access"
17+
],
18+
"Remediation": {
19+
"Code": {
20+
"CLI": "gcloud functions deploy <FUNCTION_NAME> --region=<REGION> --vpc-connector=projects/<PROJECT_ID>/locations/<REGION>/connectors/<CONNECTOR_NAME> --egress-settings=all-traffic",
21+
"NativeIaC": "",
22+
"Other": "1. In Google Cloud Console, go to Cloud Functions\n2. Select the function and click Edit\n3. Under Connections, select the VPC connector for your network\n4. Set Egress settings to route all traffic through the VPC connector\n5. Save and redeploy the function",
23+
"Terraform": "```hcl\nresource \"google_cloudfunctions2_function\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n location = \"us-central1\"\n\n service_config {\n vpc_connector = \"<example_resource_id>\" # Critical: routes egress through the VPC\n vpc_connector_egress_settings = \"ALL_TRAFFIC\"\n }\n}\n```"
24+
},
25+
"Recommendation": {
26+
"Text": "Apply **defense in depth** by routing Cloud Function egress through a **Serverless VPC Access connector** when the function must reach internal resources.\n\nScope each connector to **least privilege** subnets so functions cannot reach unintended endpoints.",
27+
"Url": "https://hub.prowler.com/check/cloudfunction_function_inside_vpc"
28+
}
29+
},
30+
"Categories": [
31+
"trust-boundaries"
32+
],
33+
"DependsOn": [],
34+
"RelatedTo": [
35+
"cloudfunction_function_not_publicly_accessible",
36+
"cloudsql_instance_public_ip",
37+
"compute_instance_public_ip"
38+
],
39+
"Notes": "A VPC connector must be created in the same region as the Cloud Function. This check only verifies that a connector is attached; it does not validate egress settings or connector configuration."
40+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from prowler.lib.check.models import Check, Check_Report_GCP
2+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_client import (
3+
cloudfunction_client,
4+
)
5+
6+
7+
class cloudfunction_function_inside_vpc(Check):
8+
"""Check that Cloud Functions are attached to a Serverless VPC Access connector.
9+
10+
Verifies that each active Cloud Function has a `vpcConnector` configured so
11+
egress traffic flows through a private VPC network instead of the public
12+
internet. Functions in non-`ACTIVE` states are skipped because their network
13+
configuration is transient.
14+
"""
15+
16+
def execute(self) -> list[Check_Report_GCP]:
17+
"""Execute the VPC-connector check across all Cloud Functions.
18+
19+
Returns:
20+
A list of `Check_Report_GCP` findings, one per active Cloud
21+
Function. Status is `PASS` when a `vpc_connector` is set and `FAIL`
22+
otherwise.
23+
"""
24+
findings = []
25+
for function in cloudfunction_client.functions:
26+
if function.state != "ACTIVE":
27+
continue
28+
report = Check_Report_GCP(metadata=self.metadata(), resource=function)
29+
if function.vpc_connector:
30+
report.status = "PASS"
31+
report.status_extended = (
32+
f"Cloud Function {function.name} is connected to a VPC via "
33+
f"connector: {function.vpc_connector}."
34+
)
35+
else:
36+
report.status = "FAIL"
37+
report.status_extended = f"Cloud Function {function.name} is not connected to any VPC network."
38+
findings.append(report)
39+
return findings
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from typing import Optional
2+
3+
from pydantic.v1 import BaseModel
4+
5+
from prowler.lib.logger import logger
6+
from prowler.providers.gcp.config import DEFAULT_RETRY_ATTEMPTS
7+
from prowler.providers.gcp.gcp_provider import GcpProvider
8+
from prowler.providers.gcp.lib.service.service import GCPService
9+
10+
11+
class CloudFunction(GCPService):
12+
"""Cloud Functions v2 service client.
13+
14+
Enumerates Cloud Functions across every accessible project and region
15+
using the `cloudfunctions.googleapis.com` v2 API and exposes them through
16+
the `functions` attribute.
17+
"""
18+
19+
def __init__(self, provider: GcpProvider) -> None:
20+
"""Initialize the service and preload Cloud Functions."""
21+
super().__init__("cloudfunctions", provider, api_version="v2")
22+
self.functions = []
23+
self._get_functions()
24+
25+
def _get_functions(self) -> None:
26+
"""Fetch Cloud Functions for every project and location."""
27+
for project_id in self.project_ids:
28+
try:
29+
locations = self.client.projects().locations()
30+
locations_request = locations.list(name=f"projects/{project_id}")
31+
while locations_request is not None:
32+
locations_response = locations_request.execute(
33+
num_retries=DEFAULT_RETRY_ATTEMPTS
34+
)
35+
for location in locations_response.get("locations", []):
36+
location_id = location["locationId"]
37+
try:
38+
functions = locations.functions()
39+
request = functions.list(
40+
parent=f"projects/{project_id}/locations/{location_id}"
41+
)
42+
while request is not None:
43+
response = request.execute(
44+
num_retries=DEFAULT_RETRY_ATTEMPTS
45+
)
46+
for fn in response.get("functions", []):
47+
service_config = fn.get("serviceConfig", {})
48+
self.functions.append(
49+
Function(
50+
name=fn["name"].split("/")[-1],
51+
project_id=project_id,
52+
location=location_id,
53+
state=fn.get("state", "UNKNOWN"),
54+
vpc_connector=service_config.get(
55+
"vpcConnector"
56+
),
57+
)
58+
)
59+
request = functions.list_next(
60+
previous_request=request,
61+
previous_response=response,
62+
)
63+
except Exception as error:
64+
logger.error(
65+
f"{location_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
66+
)
67+
locations_request = locations.list_next(
68+
previous_request=locations_request,
69+
previous_response=locations_response,
70+
)
71+
except Exception as error:
72+
logger.error(
73+
f"{project_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
74+
)
75+
76+
77+
class Function(BaseModel):
78+
"""Cloud Function resource consumed by GCP checks."""
79+
80+
name: str
81+
project_id: str
82+
location: str
83+
state: str
84+
vpc_connector: Optional[str] = None

tests/providers/gcp/services/cloudfunction/__init__.py

Whitespace-only changes.

tests/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/__init__.py

Whitespace-only changes.
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
from unittest import mock
2+
3+
from tests.providers.gcp.gcp_fixtures import (
4+
GCP_PROJECT_ID,
5+
GCP_US_CENTER1_LOCATION,
6+
set_mocked_gcp_provider,
7+
)
8+
9+
_CHECK_PATH = (
10+
"prowler.providers.gcp.services.cloudfunction."
11+
"cloudfunction_function_inside_vpc.cloudfunction_function_inside_vpc"
12+
)
13+
_CLIENT_PATH = f"{_CHECK_PATH}.cloudfunction_client"
14+
15+
16+
class Test_cloudfunction_function_inside_vpc:
17+
def test_no_functions(self):
18+
cloudfunction_client = mock.MagicMock()
19+
20+
with (
21+
mock.patch(
22+
"prowler.providers.common.provider.Provider.get_global_provider",
23+
return_value=set_mocked_gcp_provider(),
24+
),
25+
mock.patch(
26+
_CLIENT_PATH,
27+
new=cloudfunction_client,
28+
),
29+
):
30+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_inside_vpc.cloudfunction_function_inside_vpc import (
31+
cloudfunction_function_inside_vpc,
32+
)
33+
34+
cloudfunction_client.functions = []
35+
36+
check = cloudfunction_function_inside_vpc()
37+
result = check.execute()
38+
assert len(result) == 0
39+
40+
def test_function_with_vpc_connector(self):
41+
cloudfunction_client = mock.MagicMock()
42+
43+
with (
44+
mock.patch(
45+
"prowler.providers.common.provider.Provider.get_global_provider",
46+
return_value=set_mocked_gcp_provider(),
47+
),
48+
mock.patch(
49+
_CLIENT_PATH,
50+
new=cloudfunction_client,
51+
),
52+
):
53+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_inside_vpc.cloudfunction_function_inside_vpc import (
54+
cloudfunction_function_inside_vpc,
55+
)
56+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import (
57+
Function,
58+
)
59+
60+
connector = (
61+
f"projects/{GCP_PROJECT_ID}/locations/{GCP_US_CENTER1_LOCATION}"
62+
f"/connectors/my-connector"
63+
)
64+
cloudfunction_client.functions = [
65+
Function(
66+
name="fn-vpc",
67+
project_id=GCP_PROJECT_ID,
68+
location=GCP_US_CENTER1_LOCATION,
69+
state="ACTIVE",
70+
vpc_connector=connector,
71+
)
72+
]
73+
74+
check = cloudfunction_function_inside_vpc()
75+
result = check.execute()
76+
assert len(result) == 1
77+
assert result[0].status == "PASS"
78+
assert (
79+
result[0].status_extended
80+
== f"Cloud Function fn-vpc is connected to a VPC via connector: {connector}."
81+
)
82+
assert result[0].resource_id == "fn-vpc"
83+
assert result[0].resource_name == "fn-vpc"
84+
assert result[0].location == GCP_US_CENTER1_LOCATION
85+
assert result[0].project_id == GCP_PROJECT_ID
86+
87+
def test_function_without_vpc_connector(self):
88+
cloudfunction_client = mock.MagicMock()
89+
90+
with (
91+
mock.patch(
92+
"prowler.providers.common.provider.Provider.get_global_provider",
93+
return_value=set_mocked_gcp_provider(),
94+
),
95+
mock.patch(
96+
_CLIENT_PATH,
97+
new=cloudfunction_client,
98+
),
99+
):
100+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_inside_vpc.cloudfunction_function_inside_vpc import (
101+
cloudfunction_function_inside_vpc,
102+
)
103+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import (
104+
Function,
105+
)
106+
107+
cloudfunction_client.functions = [
108+
Function(
109+
name="fn-public",
110+
project_id=GCP_PROJECT_ID,
111+
location=GCP_US_CENTER1_LOCATION,
112+
state="ACTIVE",
113+
vpc_connector=None,
114+
)
115+
]
116+
117+
check = cloudfunction_function_inside_vpc()
118+
result = check.execute()
119+
assert len(result) == 1
120+
assert result[0].status == "FAIL"
121+
assert (
122+
result[0].status_extended
123+
== "Cloud Function fn-public is not connected to any VPC network."
124+
)
125+
assert result[0].resource_id == "fn-public"
126+
assert result[0].resource_name == "fn-public"
127+
assert result[0].location == GCP_US_CENTER1_LOCATION
128+
assert result[0].project_id == GCP_PROJECT_ID
129+
130+
def test_function_with_empty_vpc_connector(self):
131+
cloudfunction_client = mock.MagicMock()
132+
133+
with (
134+
mock.patch(
135+
"prowler.providers.common.provider.Provider.get_global_provider",
136+
return_value=set_mocked_gcp_provider(),
137+
),
138+
mock.patch(
139+
_CLIENT_PATH,
140+
new=cloudfunction_client,
141+
),
142+
):
143+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_inside_vpc.cloudfunction_function_inside_vpc import (
144+
cloudfunction_function_inside_vpc,
145+
)
146+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import (
147+
Function,
148+
)
149+
150+
cloudfunction_client.functions = [
151+
Function(
152+
name="fn-empty",
153+
project_id=GCP_PROJECT_ID,
154+
location=GCP_US_CENTER1_LOCATION,
155+
state="ACTIVE",
156+
vpc_connector="",
157+
)
158+
]
159+
160+
check = cloudfunction_function_inside_vpc()
161+
result = check.execute()
162+
assert len(result) == 1
163+
assert result[0].status == "FAIL"
164+
165+
def test_inactive_function_skipped(self):
166+
cloudfunction_client = mock.MagicMock()
167+
168+
with (
169+
mock.patch(
170+
"prowler.providers.common.provider.Provider.get_global_provider",
171+
return_value=set_mocked_gcp_provider(),
172+
),
173+
mock.patch(
174+
_CLIENT_PATH,
175+
new=cloudfunction_client,
176+
),
177+
):
178+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_inside_vpc.cloudfunction_function_inside_vpc import (
179+
cloudfunction_function_inside_vpc,
180+
)
181+
from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import (
182+
Function,
183+
)
184+
185+
cloudfunction_client.functions = [
186+
Function(
187+
name="fn-deploy",
188+
project_id=GCP_PROJECT_ID,
189+
location=GCP_US_CENTER1_LOCATION,
190+
state="DEPLOYING",
191+
vpc_connector=None,
192+
)
193+
]
194+
195+
check = cloudfunction_function_inside_vpc()
196+
result = check.execute()
197+
assert len(result) == 0

0 commit comments

Comments
 (0)