1
1
import copy
2
2
from spaceone .inventory .plugin .collector .lib import *
3
3
from ..base import ResourceManager
4
- from ...conf .cloud_service_conf import ASSET_URL , INSTANCE_FILTERS
4
+ from ...conf .cloud_service_conf import ASSET_URL , INSTANCE_FILTERS , DEFAULT_VULNERABLE_PORTS
5
+ from ...error .custom import ERROR_VULNERABLE_PORTS
5
6
6
7
7
8
class SecurityGroupManager (ResourceManager ):
@@ -35,6 +36,9 @@ def create_cloud_service_type(self):
35
36
def create_cloud_service (self , region , options , secret_data , schema ):
36
37
cloudtrail_resource_type = "AWS::EC2::SecurityGroup"
37
38
39
+ # If Port Filter Option Exist
40
+ vulnerable_ports = options .get ("vulnerable_ports" , DEFAULT_VULNERABLE_PORTS )
41
+
38
42
# Get default VPC
39
43
default_vpcs = self ._get_default_vpc ()
40
44
@@ -62,7 +66,7 @@ def create_cloud_service(self, region, options, secret_data, schema):
62
66
in_rule_copy = copy .deepcopy (in_rule )
63
67
inbound_rules .append (
64
68
self .custom_security_group_rule_info (
65
- in_rule_copy , _ip_range , "ip_ranges"
69
+ in_rule_copy , _ip_range , "ip_ranges" , vulnerable_ports
66
70
)
67
71
)
68
72
@@ -73,14 +77,15 @@ def create_cloud_service(self, region, options, secret_data, schema):
73
77
in_rule_copy ,
74
78
_user_group_pairs ,
75
79
"user_id_group_pairs" ,
80
+ vulnerable_ports ,
76
81
)
77
82
)
78
83
79
84
for _ip_v6_range in in_rule .get ("Ipv6Ranges" , []):
80
85
in_rule_copy = copy .deepcopy (in_rule )
81
86
inbound_rules .append (
82
87
self .custom_security_group_rule_info (
83
- in_rule_copy , _ip_v6_range , "ipv6_ranges"
88
+ in_rule_copy , _ip_v6_range , "ipv6_ranges" , vulnerable_ports
84
89
)
85
90
)
86
91
@@ -91,7 +96,7 @@ def create_cloud_service(self, region, options, secret_data, schema):
91
96
out_rule_copy = copy .deepcopy (out_rule )
92
97
outbound_rules .append (
93
98
self .custom_security_group_rule_info (
94
- out_rule_copy , _ip_range , "ip_ranges"
99
+ out_rule_copy , _ip_range , "ip_ranges" , vulnerable_ports
95
100
)
96
101
)
97
102
@@ -101,15 +106,15 @@ def create_cloud_service(self, region, options, secret_data, schema):
101
106
self .custom_security_group_rule_info (
102
107
out_rule_copy ,
103
108
_user_group_pairs ,
104
- "user_id_group_pairs" ,
109
+ "user_id_group_pairs" ,vulnerable_ports ,
105
110
)
106
111
)
107
112
108
113
for _ip_v6_range in out_rule .get ("Ipv6Ranges" , []):
109
114
out_rule_copy = copy .deepcopy (out_rule )
110
115
outbound_rules .append (
111
116
self .custom_security_group_rule_info (
112
- out_rule_copy , _ip_v6_range , "ipv6_ranges"
117
+ out_rule_copy , _ip_v6_range , "ipv6_ranges" , vulnerable_ports
113
118
)
114
119
)
115
120
@@ -160,16 +165,16 @@ def create_cloud_service(self, region, options, secret_data, schema):
160
165
region_name = region ,
161
166
)
162
167
163
- def custom_security_group_rule_info (self , raw_rule , remote , remote_type ):
168
+ def custom_security_group_rule_info (self , raw_rule , remote , remote_type , vulnerable_ports ):
169
+ protocol_display = self ._get_protocol_display (raw_rule .get ("IpProtocol" ))
164
170
raw_rule .update (
165
171
{
166
- "protocol_display" : self ._get_protocol_display (
167
- raw_rule .get ("IpProtocol" )
168
- ),
172
+ "protocol_display" : protocol_display ,
169
173
"port_display" : self ._get_port_display (raw_rule ),
170
174
"source_display" : self ._get_source_display (remote ),
171
175
"description_display" : self ._get_description_display (remote ),
172
176
remote_type : remote ,
177
+ "vulnerable_ports" : self ._get_vulnerable_ports (protocol_display , raw_rule , vulnerable_ports )
173
178
}
174
179
)
175
180
@@ -287,3 +292,23 @@ def get_instance_name_from_tags(instance):
287
292
return _tag .get ("Value" )
288
293
289
294
return ""
295
+
296
+ @staticmethod
297
+ def _get_vulnerable_ports (protocol_display : str , raw_rule : dict , vulnerable_ports : str ):
298
+ try :
299
+ if protocol_display == "ALL" :
300
+ return [int (port .strip ()) for port in vulnerable_ports .split (',' )]
301
+
302
+ to_port = raw_rule .get ("ToPort" )
303
+ from_port = raw_rule .get ("FromPort" )
304
+
305
+ if to_port is None or from_port is None :
306
+ return []
307
+
308
+ return [
309
+ int (port .strip ())
310
+ for port in vulnerable_ports .split (',' )
311
+ if from_port <= int (port .strip ()) <= to_port
312
+ ]
313
+ except ValueError :
314
+ raise ERROR_VULNERABLE_PORTS (vulnerable_ports )
0 commit comments