From 2e62757955ceb469f15962258ea47af49ccad786 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Tue, 20 Feb 2024 16:45:26 +0100 Subject: [PATCH 01/31] Working on init method --- netutils/acl.py | 197 ++++++++++++++++++++++++++++-------------------- 1 file changed, 114 insertions(+), 83 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 88eeba66..b799f7ee 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -147,30 +147,48 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: raise ValueError() +def get_attributes(cls): + result = { + name: attr for name, attr in cls.__dict__.items() + if not name.startswith("__") + and not callable(attr) + and not type(attr) is staticmethod + } + return result + + class ACLRule: """A class that helps you imagine an acl rule via methodologies.""" - attrs: t.List[str] = ["name", "src_ip", "src_zone", "dst_ip", "dst_port", "dst_zone", "action"] - permit: str = "permit" - deny: str = "deny" + name: t.Any = None + src_ip: t.Any = "some def value" + src_zone: t.Any = None + dst_ip: t.Any = None + dst_port: t.Any = None + dst_zone: t.Any = None + action: t.Any = None - input_data_verify: bool = False - input_data_schema: t.Any = INPUT_SCHEMA + class Meta: + permit: str = "permit" + deny: str = "deny" - result_data_verify: bool = False - result_data_schema: t.Any = RESULT_SCHEMA + input_data_verify: bool = False + input_data_schema: t.Any = INPUT_SCHEMA - matrix: t.Any = {} - matrix_enforced: bool = False - matrix_definition: t.Any = {} + result_data_verify: bool = False + result_data_schema: t.Any = RESULT_SCHEMA - dst_port_process: bool = True + matrix: t.Any = {} + matrix_enforced: bool = False + matrix_definition: t.Any = {} - order_validate: t.List[str] = [] - order_enforce: t.List[str] = [] - filter_same_ip: bool = True + dst_port_process: bool = True - def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument + order_validate: t.List[str] = [] + order_enforce: t.List[str] = [] + filter_same_ip: bool = True + + def __init__(self, **kwargs): # pylint: disable=unused-argument """Initialize and load data. Args: @@ -179,7 +197,7 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab Examples: >>> from netutils.acl import ACLRule >>> - >>> acl_data = dict( + >>> rule = ACLRule( ... name="Check no match", ... src_ip=["10.1.1.1"], ... dst_ip="172.16.0.10", @@ -187,42 +205,55 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab ... action="permit", ... ) >>> - >>> rule = ACLRule(acl_data) >>> >>> rule.expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] >>> """ - self.processed: t.Dict[str, str] = {} - self.data = data - self.load_data() + self.__load_data(kwargs=kwargs) - def load_data(self) -> None: + def __load_data(self, kwargs) -> None: """Load the data into the rule while verifying input data, result data, and processing data.""" - self.input_data_check() - for attr in self.attrs: - if not self.data.get(attr): - continue - if hasattr(self, f"process_{attr}"): - proccessor = getattr(self, f"process_{attr}") - _attr_data = proccessor(self.data[attr]) + # # Validate kwargs for class based definitions. + # for kk, kv in kwargs.items(): + # if kk not in get_attributes(self.__class__): + # raise ValueError("Invalid kwarg specified in init method.") + + # Ensure each class attr is in init kwargs. + for attr in get_attributes(self.__class__): + if attr not in kwargs: + kwargs[attr] = getattr(self, attr) + + # Store the init input + self._preprocessed_data = copy.deepcopy(kwargs) + self._processed_data = copy.deepcopy(self._preprocessed_data) + + # # Input check + # self.input_data_check() + + for attr in get_attributes(self.__class__): + processor_func = getattr(self, f"process_{attr}", None) # TODO(mzb): Consider callable and staticmethod. + if processor_func: + _attr_data = processor_func(self._processed_data[attr]) else: - _attr_data = self.data[attr] - self.processed[attr] = _attr_data + _attr_data = self._processed_data[attr] + + self._processed_data[attr] = _attr_data setattr(self, attr, _attr_data) - self.result_data_check() - self.validate() - self.expanded_rules = _cartesian_product(self.processed) - if self.filter_same_ip: - self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]] + + # self.result_data_check() + # self.validate() + # self.expanded_rules = _cartesian_product(self._processed) + # if self.filter_same_ip: + # self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]] def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" - return _check_schema(self.data, self.input_data_schema, self.input_data_verify) + return _check_schema(self._preprocessed_data, self.input_data_schema, self.input_data_verify) def result_data_check(self) -> None: """Verify the result data against the specified JSONSchema or using a simple dictionary check.""" - return _check_schema(self.processed, self.result_data_schema, self.result_data_verify) + return _check_schema(self._processed, self.result_data_schema, self.result_data_verify) def validate(self) -> t.Any: """Run through any method that startswith('validate_') and run that method.""" @@ -242,43 +273,43 @@ def validate(self) -> t.Any: results.extend(result) return results - def process_dst_port( - self, dst_port: t.Any - ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements - """Convert port and protocol information. - - Method supports a single format of `{protocol}/{port}`, and will translate the - protocol for all IANA defined protocols. The port will be translated for TCP and - UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP - or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly - staying unchanged, but sourced from - https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. - """ - output = [] - if not self.dst_port_process: - return None - if not isinstance(dst_port, list): - dst_port = [dst_port] - for item in dst_port: - protocol = item.split("/")[0] - port = item.split("/")[1] - if protocol.isalpha(): - if not PROTO_NAME_TO_NUM.get(protocol.upper()): - raise ValueError( - f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." - ) - protocol = PROTO_NAME_TO_NUM[protocol.upper()] - # test port[0] vs port, since dashes do not count, e.g. www-http - if int(protocol) == 6 and port[0].isalpha(): - if not TCP_NAME_TO_NUM.get(port.upper()): - raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") - port = TCP_NAME_TO_NUM[port.upper()] - if int(protocol) == 17 and port[0].isalpha(): - if not UDP_NAME_TO_NUM.get(port.upper()): - raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") - port = UDP_NAME_TO_NUM[port.upper()] - output.append(f"{protocol}/{port}") - return output + # def process_dst_port( + # self, dst_port: t.Any + # ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements + # """Convert port and protocol information. + # + # Method supports a single format of `{protocol}/{port}`, and will translate the + # protocol for all IANA defined protocols. The port will be translated for TCP and + # UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP + # or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly + # staying unchanged, but sourced from + # https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. + # """ + # output = [] + # if not self.dst_port_process: + # return None + # if not isinstance(dst_port, list): + # dst_port = [dst_port] + # for item in dst_port: + # protocol = item.split("/")[0] + # port = item.split("/")[1] + # if protocol.isalpha(): + # if not PROTO_NAME_TO_NUM.get(protocol.upper()): + # raise ValueError( + # f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." + # ) + # protocol = PROTO_NAME_TO_NUM[protocol.upper()] + # # test port[0] vs port, since dashes do not count, e.g. www-http + # if int(protocol) == 6 and port[0].isalpha(): + # if not TCP_NAME_TO_NUM.get(port.upper()): + # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") + # port = TCP_NAME_TO_NUM[port.upper()] + # if int(protocol) == 17 and port[0].isalpha(): + # if not UDP_NAME_TO_NUM.get(port.upper()): + # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") + # port = UDP_NAME_TO_NUM[port.upper()] + # output.append(f"{protocol}/{port}") + # return output def enforce(self) -> t.List[t.Dict[str, t.Any]]: """Run through any method that startswith('enforce_') and run that method. @@ -452,8 +483,8 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: break detailed_info = { "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - "match_rule": match_rule.processed, - "existing_rule": self.processed, + "match_rule": match_rule._processed, + "existing_rule": self._processed, } if rules_found[-1]: detailed_info["match_rule_product"] = rule @@ -474,13 +505,13 @@ def match(self, match_rule: "ACLRule") -> bool: details = self.match_details(match_rule) return not bool(details["rules_unmatched"]) - def __repr__(self) -> str: - """Set repr of the object to be sane.""" - output = [] - for attr in self.attrs: - if self.processed.get(attr): - output.append(f"{attr}: {self.processed[attr]}") - return ", ".join(output) + # def __repr__(self) -> str: + # """Set repr of the object to be sane.""" + # output = [] + # for attr in self.attrs: + # if self._processed.get(attr): + # output.append(f"{attr}: {self._processed[attr]}") + # return ", ".join(output) class ACLRules: From df3174ce402edd7b9f2e8bc1f789a4a3a0dbf696 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Tue, 20 Feb 2024 20:14:31 +0100 Subject: [PATCH 02/31] Working on init method --- netutils/acl.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index b799f7ee..838aaf82 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -214,10 +214,16 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument def __load_data(self, kwargs) -> None: """Load the data into the rule while verifying input data, result data, and processing data.""" - # # Validate kwargs for class based definitions. - # for kk, kv in kwargs.items(): - # if kk not in get_attributes(self.__class__): - # raise ValueError("Invalid kwarg specified in init method.") + # Remaining kwargs stored under ACRule.Meta + pop_kwargs = [] + for key, val in kwargs.items(): + if key not in get_attributes(self.__class__): + self.Meta.key = val + pop_kwargs.append(key) + + # Pop unneeded keys + for key in pop_kwargs: + kwargs.pop(key) # Ensure each class attr is in init kwargs. for attr in get_attributes(self.__class__): @@ -228,8 +234,8 @@ def __load_data(self, kwargs) -> None: self._preprocessed_data = copy.deepcopy(kwargs) self._processed_data = copy.deepcopy(self._preprocessed_data) - # # Input check - # self.input_data_check() + # Input check + self.input_data_check() for attr in get_attributes(self.__class__): processor_func = getattr(self, f"process_{attr}", None) # TODO(mzb): Consider callable and staticmethod. @@ -241,7 +247,7 @@ def __load_data(self, kwargs) -> None: self._processed_data[attr] = _attr_data setattr(self, attr, _attr_data) - # self.result_data_check() + self.result_data_check() # self.validate() # self.expanded_rules = _cartesian_product(self._processed) # if self.filter_same_ip: @@ -249,11 +255,11 @@ def __load_data(self, kwargs) -> None: def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" - return _check_schema(self._preprocessed_data, self.input_data_schema, self.input_data_verify) + return _check_schema(self._preprocessed_data, self.Meta.input_data_schema, self.Meta.input_data_verify) def result_data_check(self) -> None: """Verify the result data against the specified JSONSchema or using a simple dictionary check.""" - return _check_schema(self._processed, self.result_data_schema, self.result_data_verify) + return _check_schema(self._processed_data, self.Meta.result_data_schema, self.Meta.result_data_verify) def validate(self) -> t.Any: """Run through any method that startswith('validate_') and run that method.""" From 099861511fee74222707d678c6cc11f64f035598 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 21 Feb 2024 10:51:56 +0100 Subject: [PATCH 03/31] Working on init method --- docs/user/lib_use_cases_acl.md | 30 ++++++++++++++++++------------ netutils/acl.py | 30 +++++++++++++++--------------- tests/unit/test_acl.py | 2 +- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/docs/user/lib_use_cases_acl.md b/docs/user/lib_use_cases_acl.md index 49cfe583..783fe1e7 100644 --- a/docs/user/lib_use_cases_acl.md +++ b/docs/user/lib_use_cases_acl.md @@ -280,25 +280,31 @@ class ExpandAddrGroups(ACLRule): Using the above object, we can test with: ```python ->>> rule_data = dict( -... name="Check allow", -... src_ip=["red", "blue", "10.4.4.4"], -... dst_ip=["white"], -... dst_port="6/www-http", -... action="permit", +>> > rule_data = dict( + ... +name = "Check allow", +... +src_ip = ["red", "blue", "10.4.4.4"], +... +dst_ip = ["white"], +... +dst_port = "6/www-http", +... +action = "permit", ... ) ->>> ->>> address_object_expanded = ExpandAddrGroups(rule_data) ->>> for item in address_object_expanded.expanded_rules: -... print(item) -... +>> > +>> > address_object_expanded = ExpandAddrGroups(rule_data) +>> > for item in address_object_expanded._expanded_rules: + ... +print(item) +... {'name': 'Check allow', 'src_ip': '10.1.1.1', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.2.2.2', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.3.3.3', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.3.3.3', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.4.4.4', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.4.4.4', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} ->>> +>> > ``` In that example you can see how we expanded `red` -> 10.1.1.1", "10.2.2.2", "10.3.3.3" as an example. diff --git a/netutils/acl.py b/netutils/acl.py index 838aaf82..013adcd4 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -161,7 +161,7 @@ class ACLRule: """A class that helps you imagine an acl rule via methodologies.""" name: t.Any = None - src_ip: t.Any = "some def value" + src_ip: t.Any = None src_zone: t.Any = None dst_ip: t.Any = None dst_port: t.Any = None @@ -206,7 +206,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument ... ) >>> >>> - >>> rule.expanded_rules + >>> rule._expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] >>> """ @@ -214,7 +214,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument def __load_data(self, kwargs) -> None: """Load the data into the rule while verifying input data, result data, and processing data.""" - # Remaining kwargs stored under ACRule.Meta + # Remaining kwargs stored under ACLRule.Meta pop_kwargs = [] for key, val in kwargs.items(): if key not in get_attributes(self.__class__): @@ -249,7 +249,7 @@ def __load_data(self, kwargs) -> None: self.result_data_check() # self.validate() - # self.expanded_rules = _cartesian_product(self._processed) + self._expanded_rules = _cartesian_product(self._processed) # if self.filter_same_ip: # self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]] @@ -263,8 +263,8 @@ def result_data_check(self) -> None: def validate(self) -> t.Any: """Run through any method that startswith('validate_') and run that method.""" - if self.order_validate: - method_order = self.order_validate + if self.Meta.order_validate: + method_order = self.Meta.order_validate else: method_order = dir(self) results = [] @@ -323,8 +323,8 @@ def enforce(self) -> t.List[t.Dict[str, t.Any]]: Returns: A list of dictionaries that explains the results of the enforcement. """ - if self.order_enforce: - method_order = self.order_enforce + if self.Meta.order_enforce: + method_order = self.Meta.order_enforce else: method_order = dir(self) results = [] @@ -345,14 +345,14 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: Returns: A list of dictionaries that explains the results of the matrix being enforced. """ - if not self.matrix_enforced: + if not self.Meta.matrix_enforced: return None - if not self.matrix: + if not self.Meta.matrix: raise ValueError("You must set a matrix dictionary to use the matrix feature.") - if not self.matrix_definition: + if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self.expanded_rules: + for rule in self._expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -466,11 +466,11 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: rules_unmatched: t.List[t.Dict[str, t.Any]] = [] rules_matched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule.expanded_rules: + if not match_rule._expanded_rules: raise ValueError("There is no expanded rules to test against.") - for rule in match_rule.expanded_rules: + for rule in match_rule._expanded_rules: rules_found.append(False) - for existing_rule in self.expanded_rules: + for existing_rule in self._expanded_rules: missing = False for attr in attrs: # Examples of obj are match_rule.src_ip, match_rule.dst_port diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index 2e88a29c..1aea0655 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -372,4 +372,4 @@ def process_dst_ip(self, dst_ip): @pytest.mark.parametrize("data", add_group_check) def test_custom_address_group(data): obj = TestAddrGroups(data["sent"]) - assert obj.expanded_rules == data["received"] + assert obj._expanded_rules == data["received"] From 30fd4e3537805db32429ec117d0202ee62483ffa Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 21 Feb 2024 10:55:56 +0100 Subject: [PATCH 04/31] Working on init method --- netutils/acl.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 013adcd4..66a44ed8 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -248,10 +248,10 @@ def __load_data(self, kwargs) -> None: setattr(self, attr, _attr_data) self.result_data_check() - # self.validate() + self.validate() self._expanded_rules = _cartesian_product(self._processed) - # if self.filter_same_ip: - # self.expanded_rules = [item for item in self.expanded_rules if item["dst_ip"] != item["src_ip"]] + if self.Meta.filter_same_ip: + self._expanded_rules = [item for item in self._expanded_rules if item["dst_ip"] != item["src_ip"]] def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" @@ -359,14 +359,14 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: src_zone = "" dst_zone = "" as_tuple = (source, destination, port) - for zone, ips in self.matrix_definition.items(): + for zone, ips in self.Meta.matrix_definition.items(): if is_ip_within(source, ips): src_zone = zone if is_ip_within(destination, ips): dst_zone = zone - if port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []): + if port in self.Meta.matrix.get(src_zone, {}).get(dst_zone, {}).get("allow", []): actions.append({"obj": as_tuple, "action": "allow"}) - elif port in self.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []): + elif port in self.Meta.matrix.get(src_zone, {}).get(dst_zone, {}).get("notify", []): actions.append({"obj": as_tuple, "action": "notify"}) else: actions.append({"obj": as_tuple, "action": "deny"}) @@ -489,8 +489,8 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: break detailed_info = { "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - "match_rule": match_rule._processed, - "existing_rule": self._processed, + "match_rule": match_rule._processed_data, + "existing_rule": self._processed_data, } if rules_found[-1]: detailed_info["match_rule_product"] = rule From 126d5bc618bd0ea514b7244d8936ad5e988110ce Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Thu, 22 Feb 2024 13:31:21 +0100 Subject: [PATCH 05/31] Working on init method --- netutils/acl.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 66a44ed8..292a802a 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -130,6 +130,8 @@ def _cartesian_product(data: t.Dict[str, str]) -> t.List[t.Dict[str, t.Any]]: keys.append(key) if isinstance(value, (str, int)): values.append([value]) + elif value is None: + values.append([None]) else: values.append(value) product = list(itertools.product(*values)) @@ -150,7 +152,7 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: def get_attributes(cls): result = { name: attr for name, attr in cls.__dict__.items() - if not name.startswith("__") + if not name.startswith("_") and not callable(attr) and not type(attr) is staticmethod } @@ -218,7 +220,7 @@ def __load_data(self, kwargs) -> None: pop_kwargs = [] for key, val in kwargs.items(): if key not in get_attributes(self.__class__): - self.Meta.key = val + setattr(self.Meta, key, val) pop_kwargs.append(key) # Pop unneeded keys @@ -240,16 +242,14 @@ def __load_data(self, kwargs) -> None: for attr in get_attributes(self.__class__): processor_func = getattr(self, f"process_{attr}", None) # TODO(mzb): Consider callable and staticmethod. if processor_func: - _attr_data = processor_func(self._processed_data[attr]) - else: - _attr_data = self._processed_data[attr] + self._processed_data[attr] = processor_func(self._processed_data[attr]) - self._processed_data[attr] = _attr_data - setattr(self, attr, _attr_data) + if self._processed_data[attr] != self.attr: + setattr(self, attr, self._processed_data[attr]) self.result_data_check() self.validate() - self._expanded_rules = _cartesian_product(self._processed) + self._expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: self._expanded_rules = [item for item in self._expanded_rules if item["dst_ip"] != item["src_ip"]] @@ -523,7 +523,10 @@ def match(self, match_rule: "ACLRule") -> bool: class ACLRules: """Class to help match multiple ACLRule objects.""" - class_obj = ACLRule + rules: t.List[t.Any] = [] + + class Meta: + class_obj = ACLRule def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument """Class to help match multiple ACLRule. @@ -531,14 +534,13 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab Args: data: A list of `ACLRule` rules. """ - self.data: t.Any = data self.rules: t.List[t.Any] = [] self.load_data() def load_data(self) -> None: """Load the data for multiple rules.""" for item in self.data: - self.rules.append(self.class_obj(item)) + self.rules.append(self.Meta.class_obj(item)) def match(self, rule: ACLRule) -> str: """Check the rules loaded in `load_data` match against a new `rule`. @@ -550,9 +552,9 @@ def match(self, rule: ACLRule) -> str: The response from the rule that matched, or `deny` by default. """ for item in self.rules: - if item.match(self.class_obj(rule)): - return str(item.action) - return str(item.deny) # pylint: disable=undefined-loop-variable + if item.match(rule): # mzb: bugfix + return True # mzb: change to bool + return False # pylint: disable=undefined-loop-variable def match_details(self, rule: ACLRule) -> t.Any: """Verbosely check the rules loaded in `load_data` match against a new `rule`. @@ -565,5 +567,5 @@ def match_details(self, rule: ACLRule) -> t.Any: """ output = [] for item in self.rules: - output.append(item.match_details(self.class_obj(rule))) + output.append(item.match_details(rule)) # mzb: bugfix return output From ff94d4760740b80bcab9a7f33fd526149a8b5fed Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Thu, 22 Feb 2024 13:46:59 +0100 Subject: [PATCH 06/31] Fixinb black --- netutils/acl.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 292a802a..b7cc1b3b 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -151,10 +151,9 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: def get_attributes(cls): result = { - name: attr for name, attr in cls.__dict__.items() - if not name.startswith("_") - and not callable(attr) - and not type(attr) is staticmethod + name: attr + for name, attr in cls.__dict__.items() + if not name.startswith("_") and not callable(attr) and not type(attr) is staticmethod } return result From b252742dedefa966129a8e652df0a0652e3324f3 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Thu, 22 Feb 2024 14:12:17 +0100 Subject: [PATCH 07/31] Fixinb black --- netutils/acl.py | 69 ++++++++++++------------------------------------- 1 file changed, 17 insertions(+), 52 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index b7cc1b3b..8432609c 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -2,8 +2,8 @@ import itertools import copy +import types import typing as t -from netutils.protocol_mapper import PROTO_NAME_TO_NUM, TCP_NAME_TO_NUM, UDP_NAME_TO_NUM from netutils.ip import is_ip_within try: @@ -150,10 +150,11 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: def get_attributes(cls): + """Function that describes class attributes.""" result = { name: attr for name, attr in cls.__dict__.items() - if not name.startswith("_") and not callable(attr) and not type(attr) is staticmethod + if not name.startswith("_") and not callable(attr) and not isinstance(attr, types.FunctionType) } return result @@ -169,7 +170,7 @@ class ACLRule: dst_zone: t.Any = None action: t.Any = None - class Meta: + class Meta: # pylint: disable=too-few-public-methods permit: str = "permit" deny: str = "deny" @@ -241,10 +242,12 @@ def __load_data(self, kwargs) -> None: for attr in get_attributes(self.__class__): processor_func = getattr(self, f"process_{attr}", None) # TODO(mzb): Consider callable and staticmethod. if processor_func: - self._processed_data[attr] = processor_func(self._processed_data[attr]) + _attr_data = processor_func(self._processed_data[attr]) + else: + _attr_data = self._processed_data[attr] - if self._processed_data[attr] != self.attr: - setattr(self, attr, self._processed_data[attr]) + self._processed_data[attr] = _attr_data + setattr(self, attr, _attr_data) self.result_data_check() self.validate() @@ -278,44 +281,6 @@ def validate(self) -> t.Any: results.extend(result) return results - # def process_dst_port( - # self, dst_port: t.Any - # ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements - # """Convert port and protocol information. - # - # Method supports a single format of `{protocol}/{port}`, and will translate the - # protocol for all IANA defined protocols. The port will be translated for TCP and - # UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP - # or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly - # staying unchanged, but sourced from - # https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. - # """ - # output = [] - # if not self.dst_port_process: - # return None - # if not isinstance(dst_port, list): - # dst_port = [dst_port] - # for item in dst_port: - # protocol = item.split("/")[0] - # port = item.split("/")[1] - # if protocol.isalpha(): - # if not PROTO_NAME_TO_NUM.get(protocol.upper()): - # raise ValueError( - # f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." - # ) - # protocol = PROTO_NAME_TO_NUM[protocol.upper()] - # # test port[0] vs port, since dashes do not count, e.g. www-http - # if int(protocol) == 6 and port[0].isalpha(): - # if not TCP_NAME_TO_NUM.get(port.upper()): - # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") - # port = TCP_NAME_TO_NUM[port.upper()] - # if int(protocol) == 17 and port[0].isalpha(): - # if not UDP_NAME_TO_NUM.get(port.upper()): - # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") - # port = UDP_NAME_TO_NUM[port.upper()] - # output.append(f"{protocol}/{port}") - # return output - def enforce(self) -> t.List[t.Dict[str, t.Any]]: """Run through any method that startswith('enforce_') and run that method. @@ -465,9 +430,9 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: rules_unmatched: t.List[t.Dict[str, t.Any]] = [] rules_matched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule._expanded_rules: + if not match_rule._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - for rule in match_rule._expanded_rules: + for rule in match_rule._expanded_rules: # pylint: disable=protected-access rules_found.append(False) for existing_rule in self._expanded_rules: missing = False @@ -488,7 +453,7 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: break detailed_info = { "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - "match_rule": match_rule._processed_data, + "match_rule": match_rule._processed_data, # pylint: disable=protected-access "existing_rule": self._processed_data, } if rules_found[-1]: @@ -524,7 +489,7 @@ class ACLRules: rules: t.List[t.Any] = [] - class Meta: + class Meta: # pylint: disable=too-few-public-methods class_obj = ACLRule def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument @@ -534,12 +499,12 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab data: A list of `ACLRule` rules. """ self.rules: t.List[t.Any] = [] - self.load_data() + self.load_data(data=data) - def load_data(self) -> None: + def load_data(self, data) -> None: """Load the data for multiple rules.""" - for item in self.data: - self.rules.append(self.Meta.class_obj(item)) + for item in data: + self.rules.append(self.Meta.class_obj(**item)) def match(self, rule: ACLRule) -> str: """Check the rules loaded in `load_data` match against a new `rule`. From efa273bf69b7976a73ea28822e3215206ce697fe Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 10:44:19 +0100 Subject: [PATCH 08/31] Updates --- netutils/acl.py | 69 +++++++++++++++++---- tests/unit/test_acl.py | 135 +++++++++++++++++++++++++++++------------ 2 files changed, 154 insertions(+), 50 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 8432609c..84e79c41 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -4,6 +4,7 @@ import copy import types import typing as t +from netutils.protocol_mapper import PROTO_NAME_TO_NUM, TCP_NAME_TO_NUM, UDP_NAME_TO_NUM from netutils.ip import is_ip_within try: @@ -16,8 +17,8 @@ INPUT_SCHEMA = { "type": "object", "properties": { - "name": {"type": "string"}, - "src_zone": {"type": ["string", "array"]}, + "name": {"type": ["string", "null"]}, + "src_zone": {"type": ["string", "array", "null"]}, "src_ip": {"$ref": "#/definitions/arrayOrIP"}, "dst_ip": {"$ref": "#/definitions/arrayOrIP"}, "dst_port": { @@ -35,8 +36,9 @@ }, ], }, - "dst_zone": {"type": ["string", "array"]}, + "dst_zone": {"type": ["string", "array", "null"]}, "action": {"type": "string"}, + "protocol": {"type": ["string", "null"]}, }, "definitions": { "ipv4": {"type": "string", "pattern": "^(?:\\d{1,3}\\.){3}\\d{1,3}$"}, @@ -149,12 +151,15 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: raise ValueError() -def get_attributes(cls): +def get_attributes(obj): """Function that describes class attributes.""" result = { - name: attr - for name, attr in cls.__dict__.items() - if not name.startswith("_") and not callable(attr) and not isinstance(attr, types.FunctionType) + # name for name in dir(cls) + attr: getattr(obj, attr) + for attr in dir(obj) + if not attr.startswith("_") + and not callable(getattr(obj, attr)) + and not isinstance(getattr(obj, attr), types.FunctionType) } return result @@ -168,6 +173,7 @@ class ACLRule: dst_ip: t.Any = None dst_port: t.Any = None dst_zone: t.Any = None + protocol: t.Any = None action: t.Any = None class Meta: # pylint: disable=too-few-public-methods @@ -219,7 +225,7 @@ def __load_data(self, kwargs) -> None: # Remaining kwargs stored under ACLRule.Meta pop_kwargs = [] for key, val in kwargs.items(): - if key not in get_attributes(self.__class__): + if key not in get_attributes(self): setattr(self.Meta, key, val) pop_kwargs.append(key) @@ -228,7 +234,7 @@ def __load_data(self, kwargs) -> None: kwargs.pop(key) # Ensure each class attr is in init kwargs. - for attr in get_attributes(self.__class__): + for attr in get_attributes(self): if attr not in kwargs: kwargs[attr] = getattr(self, attr) @@ -239,7 +245,7 @@ def __load_data(self, kwargs) -> None: # Input check self.input_data_check() - for attr in get_attributes(self.__class__): + for attr in get_attributes(self): processor_func = getattr(self, f"process_{attr}", None) # TODO(mzb): Consider callable and staticmethod. if processor_func: _attr_data = processor_func(self._processed_data[attr]) @@ -251,8 +257,11 @@ def __load_data(self, kwargs) -> None: self.result_data_check() self.validate() - self._expanded_rules = _cartesian_product(self._processed_data) + self._expanded_rules = _cartesian_product(self._processed_data) # todo(mzb): fix nested if self.Meta.filter_same_ip: + print("mzb") + print(self._expanded_rules) + print(self) self._expanded_rules = [item for item in self._expanded_rules if item["dst_ip"] != item["src_ip"]] def input_data_check(self) -> None: @@ -281,6 +290,44 @@ def validate(self) -> t.Any: results.extend(result) return results + def process_dst_port( + self, dst_port: t.Any + ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements + """Convert port and protocol information. + + Method supports a single format of `{protocol}/{port}`, and will translate the + protocol for all IANA defined protocols. The port will be translated for TCP and + UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP + or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly + staying unchanged, but sourced from + https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. + """ + output = [] + if not self.Meta.dst_port_process: + return self.dst_port + if not isinstance(dst_port, list): + dst_port = [dst_port] + for item in dst_port: + protocol = item.split("/")[0] + port = item.split("/")[1] + if protocol.isalpha(): + if not PROTO_NAME_TO_NUM.get(protocol.upper()): + raise ValueError( + f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." + ) + protocol = PROTO_NAME_TO_NUM[protocol.upper()] + # test port[0] vs port, since dashes do not count, e.g. www-http + if int(protocol) == 6 and port[0].isalpha(): + if not TCP_NAME_TO_NUM.get(port.upper()): + raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") + port = TCP_NAME_TO_NUM[port.upper()] + if int(protocol) == 17 and port[0].isalpha(): + if not UDP_NAME_TO_NUM.get(port.upper()): + raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") + port = UDP_NAME_TO_NUM[port.upper()] + output.append(f"{protocol}/{port}") + return output + def enforce(self) -> t.List[t.Dict[str, t.Any]]: """Run through any method that startswith('enforce_') and run that method. diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index 1aea0655..b86ddd1b 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -14,7 +14,7 @@ dst_port="tcp/www-http", action="permit", ), - "received": "permit", + "received": True, }, { "sent": dict( @@ -24,7 +24,7 @@ dst_port="6/80", action="permit", ), - "received": "permit", + "received": True, }, { "sent": dict( @@ -34,7 +34,7 @@ dst_port="6/80", action="permit", ), - "received": "permit", + "received": True, }, { "sent": dict( @@ -44,7 +44,7 @@ dst_port="tcp/80", action="permit", ), - "received": "deny", + "received": False, }, { "sent": dict( @@ -54,7 +54,7 @@ dst_port="tcp/80", action="permit", ), - "received": "deny", + "received": False, }, { "sent": dict( @@ -64,7 +64,7 @@ dst_port="tcp/443", action="permit", ), - "received": "deny", + "received": False, }, ] @@ -216,34 +216,37 @@ } -class TestMatrix(acl.ACLRule): +class TestMatrixRule(acl.ACLRule): """ACLRule inherited class to test the matrix.""" - matrix = MATRIX - matrix_enforced = True - matrix_definition = IP_DEFINITIONS + class Meta(acl.ACLRule.Meta): # pylint: disable=too-few-public-methods + matrix = MATRIX + matrix_enforced = True + matrix_definition = IP_DEFINITIONS -class TestSchema(acl.ACLRule): +class TestSchemaRule(acl.ACLRule): """ACLRule inherited class to test the schema.""" - input_data_verify = True + class Meta(acl.ACLRule.Meta): # pylint: disable=too-few-public-methods + input_data_verify = True -class TestSchema2(acl.ACLRule): +class TestSchema2Rule(acl.ACLRule): """ACLRule inherited class alternate to test the schema.""" - result_data_verify = True + class Meta(acl.ACLRule.Meta): # pylint: disable=too-few-public-methods + result_data_verify = True @pytest.mark.parametrize("data", verify_acl) def test_verify_acl(data): - assert acl.ACLRules(acls).match(data["sent"]) == data["received"] + assert acl.ACLRules(acls).match(acl.ACLRule(**data["sent"])) == data["received"] @pytest.mark.parametrize("data", verify_matrix) def test_matrix(data): - assert TestMatrix(data["sent"]).enforce() == data["received"] + assert TestMatrixRule(**data["sent"]).enforce() == data["received"] @pytest.mark.parametrize("data", verify_schema) @@ -255,19 +258,19 @@ def test_schema(data): pass with pytest.raises(jsonschema.exceptions.ValidationError): - TestSchema(data["sent"]) + TestSchemaRule(**data["sent"]) def test_schema_not_enforced_when_option_not_set(): try: - acl.ACLRule(dict(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action=100)) + acl.ACLRule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action=100) except Exception: # pylint: disable=broad-exception-caught assert False, "No error should have been raised" def test_schema_valid(): try: - TestSchema(dict(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit")) + TestSchemaRule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit") except Exception: # pylint: disable=broad-exception-caught assert False, "No error should have been raised" @@ -281,12 +284,12 @@ def test_schema2(data): pass with pytest.raises(jsonschema.exceptions.ValidationError): - TestSchema2(data["sent"]).validate() + TestSchema2Rule(**data["sent"]).validate() def test_schema2_valid(): try: - TestSchema2(dict(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit")).validate() + TestSchema2Rule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit").validate() except Exception: # pylint: disable=broad-exception-caught assert False, "No error should have been raised" @@ -294,13 +297,13 @@ def test_schema2_valid(): class TestAddrGroups(acl.ACLRule): """ACLRule inherited class alternate to test expansions.""" - address_groups = {"red": ["white", "blue"], "blue": ["cyan"], "yellow": ["orange"]} + def __init__(self, **kwargs): + self._address_groups = {"red": ["white", "blue"], "blue": ["cyan"], "yellow": ["orange"]} + self._addresses = {"white": ["10.1.1.1", "10.2.2.2"], "cyan": ["10.3.3.3"], "orange": ["10.4.4.4"]} - addresses = {"white": ["10.1.1.1", "10.2.2.2"], "cyan": ["10.3.3.3"], "orange": ["10.4.4.4"]} + self._flattened_addresses = self.flatten_addresses(self._address_groups, self._addresses) - def __init__(self, data, *args, **kwargs): - self.flattened_addresses = self.flatten_addresses(self.address_groups, self.addresses) - super().__init__(data, *args, **kwargs) + super().__init__(**kwargs) def flatten_addresses(self, address_groups, addresses): """Go through and get the addresses given potential address groups.""" @@ -320,7 +323,7 @@ def flatten_addresses(self, address_groups, addresses): if group != sub_group: flattened_addresses.setdefault(group, []).extend(ips) - return flattened_addresses + return flattened_addresses def process_ip(self, ip): """Test ability to expand IP for both source and destination.""" @@ -331,10 +334,10 @@ def process_ip(self, ip): for ip_name in ip: if not ip_name[0].isalpha(): output.append(ip_name) - elif self.addresses.get(ip_name): - output.extend(self.addresses[ip_name]) - elif self.flattened_addresses.get(ip_name): - output.extend(self.flattened_addresses[ip_name]) + elif self._addresses.get(ip_name): + output.extend(self._addresses[ip_name]) + elif self._flattened_addresses.get(ip_name): + output.extend(self._flattened_addresses[ip_name]) return sorted(list(set(output))) def process_src_ip(self, src_ip): @@ -358,12 +361,66 @@ def process_dst_ip(self, dst_ip): action="permit", ), "received": [ - {"action": "permit", "dst_ip": "10.2.2.2", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.1.1.1"}, - {"action": "permit", "dst_ip": "10.1.1.1", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.2.2.2"}, - {"action": "permit", "dst_ip": "10.1.1.1", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.3.3.3"}, - {"action": "permit", "dst_ip": "10.2.2.2", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.3.3.3"}, - {"action": "permit", "dst_ip": "10.1.1.1", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.4.4.4"}, - {"action": "permit", "dst_ip": "10.2.2.2", "dst_port": "6/80", "name": "Check allow", "src_ip": "10.4.4.4"}, + { + "action": "permit", + "dst_ip": "10.2.2.2", + "dst_port": "6/80", + "name": "Check allow", + "src_ip": "10.1.1.1", + "protocol": None, + "src_zone": None, + "dst_zone": None, + }, + { + "action": "permit", + "dst_ip": "10.1.1.1", + "dst_port": "6/80", + "name": "Check allow", + "src_ip": "10.2.2.2", + "protocol": None, + "src_zone": None, + "dst_zone": None, + }, + { + "action": "permit", + "dst_ip": "10.1.1.1", + "dst_port": "6/80", + "name": "Check allow", + "src_ip": "10.3.3.3", + "protocol": None, + "src_zone": None, + "dst_zone": None, + }, + { + "action": "permit", + "dst_ip": "10.2.2.2", + "dst_port": "6/80", + "name": "Check allow", + "src_ip": "10.3.3.3", + "protocol": None, + "src_zone": None, + "dst_zone": None, + }, + { + "action": "permit", + "dst_ip": "10.1.1.1", + "dst_port": "6/80", + "name": "Check allow", + "src_ip": "10.4.4.4", + "protocol": None, + "src_zone": None, + "dst_zone": None, + }, + { + "action": "permit", + "dst_ip": "10.2.2.2", + "dst_port": "6/80", + "name": "Check allow", + "src_ip": "10.4.4.4", + "protocol": None, + "src_zone": None, + "dst_zone": None, + }, ], } ] @@ -371,5 +428,5 @@ def process_dst_ip(self, dst_ip): @pytest.mark.parametrize("data", add_group_check) def test_custom_address_group(data): - obj = TestAddrGroups(data["sent"]) - assert obj._expanded_rules == data["received"] + obj = TestAddrGroups(**data["sent"]) + assert obj._expanded_rules == data["received"] # pylint: disable=protected-access From f202f3d0d4dbc4581126244499dcf367e5f26d20 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 10:47:27 +0100 Subject: [PATCH 09/31] Updates --- netutils/acl.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/netutils/acl.py b/netutils/acl.py index 84e79c41..f6c178e9 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -177,6 +177,7 @@ class ACLRule: action: t.Any = None class Meta: # pylint: disable=too-few-public-methods + """Default meta class.""" permit: str = "permit" deny: str = "deny" @@ -537,6 +538,7 @@ class ACLRules: rules: t.List[t.Any] = [] class Meta: # pylint: disable=too-few-public-methods + """Default meta class.""" class_obj = ACLRule def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument From 38912b048019a646f1c0353eda56e15ebc55134d Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 10:47:48 +0100 Subject: [PATCH 10/31] Updates --- netutils/acl.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/netutils/acl.py b/netutils/acl.py index f6c178e9..c5292267 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -178,6 +178,7 @@ class ACLRule: class Meta: # pylint: disable=too-few-public-methods """Default meta class.""" + permit: str = "permit" deny: str = "deny" @@ -539,6 +540,7 @@ class ACLRules: class Meta: # pylint: disable=too-few-public-methods """Default meta class.""" + class_obj = ACLRule def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disable=unused-argument From bad0a898972cd89fe3bdac8554e07daa75c81f2c Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 10:50:50 +0100 Subject: [PATCH 11/31] Updates --- docs/user/lib_use_cases_acl.md | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/docs/user/lib_use_cases_acl.md b/docs/user/lib_use_cases_acl.md index 783fe1e7..49cfe583 100644 --- a/docs/user/lib_use_cases_acl.md +++ b/docs/user/lib_use_cases_acl.md @@ -280,31 +280,25 @@ class ExpandAddrGroups(ACLRule): Using the above object, we can test with: ```python ->> > rule_data = dict( - ... -name = "Check allow", -... -src_ip = ["red", "blue", "10.4.4.4"], -... -dst_ip = ["white"], -... -dst_port = "6/www-http", -... -action = "permit", +>>> rule_data = dict( +... name="Check allow", +... src_ip=["red", "blue", "10.4.4.4"], +... dst_ip=["white"], +... dst_port="6/www-http", +... action="permit", ... ) ->> > ->> > address_object_expanded = ExpandAddrGroups(rule_data) ->> > for item in address_object_expanded._expanded_rules: - ... -print(item) -... +>>> +>>> address_object_expanded = ExpandAddrGroups(rule_data) +>>> for item in address_object_expanded.expanded_rules: +... print(item) +... {'name': 'Check allow', 'src_ip': '10.1.1.1', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.2.2.2', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.3.3.3', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.3.3.3', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.4.4.4', 'dst_ip': '10.1.1.1', 'dst_port': '6/80', 'action': 'permit'} {'name': 'Check allow', 'src_ip': '10.4.4.4', 'dst_ip': '10.2.2.2', 'dst_port': '6/80', 'action': 'permit'} ->> > +>>> ``` In that example you can see how we expanded `red` -> 10.1.1.1", "10.2.2.2", "10.3.3.3" as an example. From 8af6483a110439969faf6108e35cc34c3263feaf Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 10:55:03 +0100 Subject: [PATCH 12/31] Updates --- netutils/acl.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index c5292267..76ff9320 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -248,7 +248,7 @@ def __load_data(self, kwargs) -> None: self.input_data_check() for attr in get_attributes(self): - processor_func = getattr(self, f"process_{attr}", None) # TODO(mzb): Consider callable and staticmethod. + processor_func = getattr(self, f"process_{attr}", None) if processor_func: _attr_data = processor_func(self._processed_data[attr]) else: @@ -259,11 +259,8 @@ def __load_data(self, kwargs) -> None: self.result_data_check() self.validate() - self._expanded_rules = _cartesian_product(self._processed_data) # todo(mzb): fix nested + self._expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: - print("mzb") - print(self._expanded_rules) - print(self) self._expanded_rules = [item for item in self._expanded_rules if item["dst_ip"] != item["src_ip"]] def input_data_check(self) -> None: From 0e4d19efd1bed3b9b32a5fb4af9815f8ec367243 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 11:05:36 +0100 Subject: [PATCH 13/31] Expanded rules property --- netutils/acl.py | 21 ++++++++++++++------- tests/unit/test_acl.py | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 76ff9320..434ab935 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -216,7 +216,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument ... ) >>> >>> - >>> rule._expanded_rules + >>> rule.expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] >>> """ @@ -259,9 +259,16 @@ def __load_data(self, kwargs) -> None: self.result_data_check() self.validate() - self._expanded_rules = _cartesian_product(self._processed_data) + + @property + def expanded_rules(self): + """Expanded rule property.""" + + _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: - self._expanded_rules = [item for item in self._expanded_rules if item["dst_ip"] != item["src_ip"]] + _expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] + + return _expanded_rules def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" @@ -362,7 +369,7 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self._expanded_rules: + for rule in self.expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -476,11 +483,11 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: rules_unmatched: t.List[t.Dict[str, t.Any]] = [] rules_matched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule._expanded_rules: # pylint: disable=protected-access + if not match_rule.expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - for rule in match_rule._expanded_rules: # pylint: disable=protected-access + for rule in match_rule.expanded_rules: # pylint: disable=protected-access rules_found.append(False) - for existing_rule in self._expanded_rules: + for existing_rule in self.expanded_rules: missing = False for attr in attrs: # Examples of obj are match_rule.src_ip, match_rule.dst_port diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index b86ddd1b..e156280e 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -429,4 +429,4 @@ def process_dst_ip(self, dst_ip): @pytest.mark.parametrize("data", add_group_check) def test_custom_address_group(data): obj = TestAddrGroups(**data["sent"]) - assert obj._expanded_rules == data["received"] # pylint: disable=protected-access + assert obj.expanded_rules == data["received"] # pylint: disable=protected-access From 07858d8c65a56ab214ee5771e461f3ce9ae829a9 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 23 Feb 2024 11:06:36 +0100 Subject: [PATCH 14/31] Expanded rules property --- netutils/acl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/netutils/acl.py b/netutils/acl.py index 434ab935..b4ffc670 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -263,7 +263,6 @@ def __load_data(self, kwargs) -> None: @property def expanded_rules(self): """Expanded rule property.""" - _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: _expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] From 7a9e7e0e11229dbb8316fb8b9bff0872c33ae882 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Tue, 27 Feb 2024 09:53:18 +0100 Subject: [PATCH 15/31] Serializer --- netutils/acl.py | 24 ++++++++++++------------ tests/unit/test_acl.py | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index b4ffc670..558d5e10 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -192,7 +192,7 @@ class Meta: # pylint: disable=too-few-public-methods matrix_enforced: bool = False matrix_definition: t.Any = {} - dst_port_process: bool = True + dst_port_process: bool = False order_validate: t.List[str] = [] order_enforce: t.List[str] = [] @@ -216,7 +216,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument ... ) >>> >>> - >>> rule.expanded_rules + >>> rule._expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] >>> """ @@ -261,7 +261,7 @@ def __load_data(self, kwargs) -> None: self.validate() @property - def expanded_rules(self): + def _expanded_rules(self): """Expanded rule property.""" _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: @@ -368,7 +368,7 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self.expanded_rules: + for rule in self._expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -482,11 +482,11 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: rules_unmatched: t.List[t.Dict[str, t.Any]] = [] rules_matched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule.expanded_rules: # pylint: disable=protected-access + if not match_rule._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - for rule in match_rule.expanded_rules: # pylint: disable=protected-access + for rule in match_rule._expanded_rules: # pylint: disable=protected-access rules_found.append(False) - for existing_rule in self.expanded_rules: + for existing_rule in self._expanded_rules: missing = False for attr in attrs: # Examples of obj are match_rule.src_ip, match_rule.dst_port @@ -529,11 +529,11 @@ def match(self, match_rule: "ACLRule") -> bool: # def __repr__(self) -> str: # """Set repr of the object to be sane.""" - # output = [] - # for attr in self.attrs: - # if self._processed.get(attr): - # output.append(f"{attr}: {self._processed[attr]}") - # return ", ".join(output) + # return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + + def serialize(self) -> str: + """Primitive Serializer.""" + return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} class ACLRules: diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index e156280e..b86ddd1b 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -429,4 +429,4 @@ def process_dst_ip(self, dst_ip): @pytest.mark.parametrize("data", add_group_check) def test_custom_address_group(data): obj = TestAddrGroups(**data["sent"]) - assert obj.expanded_rules == data["received"] # pylint: disable=protected-access + assert obj._expanded_rules == data["received"] # pylint: disable=protected-access From d91c2864f01f45ebc5ca633afc1c2709648cef27 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Tue, 27 Feb 2024 11:21:29 +0100 Subject: [PATCH 16/31] Serializer --- netutils/acl.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/netutils/acl.py b/netutils/acl.py index 558d5e10..1f89aebe 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -560,6 +560,10 @@ def load_data(self, data) -> None: for item in data: self.rules.append(self.Meta.class_obj(**item)) + def serialize(self) -> str: + """Primitive Serializer.""" + return [rule.serialize() for rule in self.rules] + def match(self, rule: ACLRule) -> str: """Check the rules loaded in `load_data` match against a new `rule`. From 52d6f23abd91badcccb06aecfbe9e5f7f93a9442 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Tue, 27 Feb 2024 11:58:06 +0100 Subject: [PATCH 17/31] Improve match functions --- netutils/acl.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 1f89aebe..8a8ed386 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -410,7 +410,10 @@ def match_src_ip(self, existing_ip: str, check_ip: str) -> bool: Returns: True if `check_ip` is within the range of `existing_ip`, False otherwise. """ - return is_ip_within(check_ip, existing_ip) + if existing_ip == check_ip: # None cases + return True + else: + is_ip_within(check_ip, existing_ip) def match_src_zone(self, existing_src_zone: str, check_src_zone: str) -> bool: """Match the source zone for equality. @@ -434,7 +437,10 @@ def match_dst_ip(self, existing_ip: str, check_ip: str) -> bool: Returns: True if `check_ip` is within the range of `existing_ip`, False otherwise. """ - return is_ip_within(check_ip, existing_ip) + if existing_ip == check_ip: # None cases + return True + else: + return is_ip_within(check_ip, existing_ip) def match_dst_zone(self, existing_dst_zone: str, check_dst_zone: str) -> bool: """Match the destination zone for equality. @@ -484,6 +490,9 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: if not match_rule._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") + elif not self._expanded_rules: # pylint: disable=protected-access + raise ValueError("There is no expanded rules to test.") + for rule in match_rule._expanded_rules: # pylint: disable=protected-access rules_found.append(False) for existing_rule in self._expanded_rules: From 7c5ef0fe4703bdab531c0bf0deafbae6674d85b1 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Mon, 11 Mar 2024 13:45:44 +0100 Subject: [PATCH 18/31] Improve match functions --- netutils/acl.py | 98 ++++++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 46 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 8a8ed386..fc744eeb 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -220,9 +220,9 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] >>> """ - self.__load_data(kwargs=kwargs) + self._load_data(kwargs=kwargs) - def __load_data(self, kwargs) -> None: + def _load_data(self, kwargs) -> None: """Load the data into the rule while verifying input data, result data, and processing data.""" # Remaining kwargs stored under ACLRule.Meta pop_kwargs = [] @@ -248,7 +248,7 @@ def __load_data(self, kwargs) -> None: self.input_data_check() for attr in get_attributes(self): - processor_func = getattr(self, f"process_{attr}", None) + processor_func = getattr(self, f"process_{attr}", None) # todo(mzb): remove special case for dst_port ! if processor_func: _attr_data = processor_func(self._processed_data[attr]) else: @@ -295,43 +295,43 @@ def validate(self) -> t.Any: results.extend(result) return results - def process_dst_port( - self, dst_port: t.Any - ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements - """Convert port and protocol information. - - Method supports a single format of `{protocol}/{port}`, and will translate the - protocol for all IANA defined protocols. The port will be translated for TCP and - UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP - or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly - staying unchanged, but sourced from - https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. - """ - output = [] - if not self.Meta.dst_port_process: - return self.dst_port - if not isinstance(dst_port, list): - dst_port = [dst_port] - for item in dst_port: - protocol = item.split("/")[0] - port = item.split("/")[1] - if protocol.isalpha(): - if not PROTO_NAME_TO_NUM.get(protocol.upper()): - raise ValueError( - f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." - ) - protocol = PROTO_NAME_TO_NUM[protocol.upper()] - # test port[0] vs port, since dashes do not count, e.g. www-http - if int(protocol) == 6 and port[0].isalpha(): - if not TCP_NAME_TO_NUM.get(port.upper()): - raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") - port = TCP_NAME_TO_NUM[port.upper()] - if int(protocol) == 17 and port[0].isalpha(): - if not UDP_NAME_TO_NUM.get(port.upper()): - raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") - port = UDP_NAME_TO_NUM[port.upper()] - output.append(f"{protocol}/{port}") - return output + # def process_dst_port( + # self, dst_port: t.Any + # ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements + # """Convert port and protocol information. + # + # Method supports a single format of `{protocol}/{port}`, and will translate the + # protocol for all IANA defined protocols. The port will be translated for TCP and + # UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP + # or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly + # staying unchanged, but sourced from + # https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. + # """ + # output = [] + # if not self.Meta.dst_port_process: + # return self.dst_port + # if not isinstance(dst_port, list): + # dst_port = [dst_port] + # for item in dst_port: + # protocol = item.split("/")[0] + # port = item.split("/")[1] + # if protocol.isalpha(): + # if not PROTO_NAME_TO_NUM.get(protocol.upper()): + # raise ValueError( + # f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." + # ) + # protocol = PROTO_NAME_TO_NUM[protocol.upper()] + # # test port[0] vs port, since dashes do not count, e.g. www-http + # if int(protocol) == 6 and port[0].isalpha(): + # if not TCP_NAME_TO_NUM.get(port.upper()): + # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") + # port = TCP_NAME_TO_NUM[port.upper()] + # if int(protocol) == 17 and port[0].isalpha(): + # if not UDP_NAME_TO_NUM.get(port.upper()): + # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") + # port = UDP_NAME_TO_NUM[port.upper()] + # output.append(f"{protocol}/{port}") + # return output def enforce(self) -> t.List[t.Dict[str, t.Any]]: """Run through any method that startswith('enforce_') and run that method. @@ -514,15 +514,21 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: break detailed_info = { "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - "match_rule": match_rule._processed_data, # pylint: disable=protected-access - "existing_rule": self._processed_data, + "match_rule": match_rule.serialize(), # pylint: disable=protected-access + "existing_rule": self.serialize(), # TODO(mzb): property? + "match_rule_product": rule, } if rules_found[-1]: - detailed_info["match_rule_product"] = rule rules_matched.append(detailed_info) else: + detailed_info["existing_rule_product"] = None rules_unmatched.append(detailed_info) - return {"rules_matched": rules_matched, "rules_unmatched": rules_unmatched} + + return { + "match": True if rules_matched and not rules_unmatched else False, + "rules_matched": rules_matched, + "rules_unmatched": rules_unmatched, + } def match(self, match_rule: "ACLRule") -> bool: """Simple boolean way of verifying match or not. @@ -540,7 +546,7 @@ def match(self, match_rule: "ACLRule") -> bool: # """Set repr of the object to be sane.""" # return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} - def serialize(self) -> str: + def serialize(self) -> dict: """Primitive Serializer.""" return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} @@ -569,7 +575,7 @@ def load_data(self, data) -> None: for item in data: self.rules.append(self.Meta.class_obj(**item)) - def serialize(self) -> str: + def serialize(self) -> list: """Primitive Serializer.""" return [rule.serialize() for rule in self.rules] From 3f2f198bd410ee1001e9276da8ffeed4b8809474 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 10:43:21 +0100 Subject: [PATCH 19/31] private _get_attribute --- netutils/acl.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index fc744eeb..6c1dc2b9 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -151,7 +151,7 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: raise ValueError() -def get_attributes(obj): +def _get_attributes(obj): """Function that describes class attributes.""" result = { # name for name in dir(cls) @@ -227,7 +227,7 @@ def _load_data(self, kwargs) -> None: # Remaining kwargs stored under ACLRule.Meta pop_kwargs = [] for key, val in kwargs.items(): - if key not in get_attributes(self): + if key not in _get_attributes(self): setattr(self.Meta, key, val) pop_kwargs.append(key) @@ -236,7 +236,7 @@ def _load_data(self, kwargs) -> None: kwargs.pop(key) # Ensure each class attr is in init kwargs. - for attr in get_attributes(self): + for attr in _get_attributes(self): if attr not in kwargs: kwargs[attr] = getattr(self, attr) @@ -247,7 +247,7 @@ def _load_data(self, kwargs) -> None: # Input check self.input_data_check() - for attr in get_attributes(self): + for attr in _get_attributes(self): processor_func = getattr(self, f"process_{attr}", None) # todo(mzb): remove special case for dst_port ! if processor_func: _attr_data = processor_func(self._processed_data[attr]) @@ -514,8 +514,8 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: break detailed_info = { "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - "match_rule": match_rule.serialize(), # pylint: disable=protected-access - "existing_rule": self.serialize(), # TODO(mzb): property? + # "match_rule": match_rule.serialize(), # pylint: disable=protected-access + # "existing_rule": self.serialize(), # TODO(mzb): property? "match_rule_product": rule, } if rules_found[-1]: @@ -526,8 +526,10 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: return { "match": True if rules_matched and not rules_unmatched else False, - "rules_matched": rules_matched, - "rules_unmatched": rules_unmatched, + "existing_rule": self.serialize(), + "match_rule": match_rule.serialize(), + "products_matched": rules_matched, + "products_unmatched": rules_unmatched, } def match(self, match_rule: "ACLRule") -> bool: From b54584edcfbf4852a4793cc37995303c19e467bb Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 10:47:39 +0100 Subject: [PATCH 20/31] fixing review comments --- netutils/acl.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 6c1dc2b9..79f09d91 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -216,7 +216,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument ... ) >>> >>> - >>> rule._expanded_rules + >>> rule.expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] >>> """ @@ -261,7 +261,7 @@ def _load_data(self, kwargs) -> None: self.validate() @property - def _expanded_rules(self): + def expanded_rules(self): """Expanded rule property.""" _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: @@ -368,7 +368,7 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self._expanded_rules: + for rule in self.expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -488,14 +488,14 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: rules_unmatched: t.List[t.Dict[str, t.Any]] = [] rules_matched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule._expanded_rules: # pylint: disable=protected-access + if not match_rule.expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - elif not self._expanded_rules: # pylint: disable=protected-access + elif not self.expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test.") - for rule in match_rule._expanded_rules: # pylint: disable=protected-access + for rule in match_rule.expanded_rules: # pylint: disable=protected-access rules_found.append(False) - for existing_rule in self._expanded_rules: + for existing_rule in self.expanded_rules: missing = False for attr in attrs: # Examples of obj are match_rule.src_ip, match_rule.dst_port From f9e91e1d0dd1bfe8ca91876119a9a56131248e13 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 11:22:01 +0100 Subject: [PATCH 21/31] fixing repr --- netutils/acl.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 79f09d91..4591ae78 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -514,8 +514,6 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: break detailed_info = { "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - # "match_rule": match_rule.serialize(), # pylint: disable=protected-access - # "existing_rule": self.serialize(), # TODO(mzb): property? "match_rule_product": rule, } if rules_found[-1]: @@ -544,9 +542,9 @@ def match(self, match_rule: "ACLRule") -> bool: details = self.match_details(match_rule) return not bool(details["rules_unmatched"]) - # def __repr__(self) -> str: - # """Set repr of the object to be sane.""" - # return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + def __repr__(self) -> str: + """Set repr of the object to be sane.""" + return self.name def serialize(self) -> dict: """Primitive Serializer.""" From ac70581be8a44a8c125e3f697704cf0370e0dcc3 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 12:21:09 +0100 Subject: [PATCH 22/31] fixing example --- netutils/acl.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 4591ae78..b7801026 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -198,6 +198,11 @@ class Meta: # pylint: disable=too-few-public-methods order_enforce: t.List[str] = [] filter_same_ip: bool = True + class Data: # pylint: disable=too-few-public-methods + """Default data class.""" + + expanded_rules: t.Any = {} + def __init__(self, **kwargs): # pylint: disable=unused-argument """Initialize and load data. @@ -210,14 +215,17 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument >>> rule = ACLRule( ... name="Check no match", ... src_ip=["10.1.1.1"], + ... src_zone="internal", ... dst_ip="172.16.0.10", - ... dst_port="tcp/www-http", + ... dst_port="6/80", + ... dst_zone="external", + ... protocol='tcp', ... action="permit", ... ) >>> >>> - >>> rule.expanded_rules - [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'action': 'permit'}] + >>> rule.Data.expanded_rules + [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'src_zone': 'internal', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'dst_zone': 'external', 'protocol': 'tcp', 'action': 'permit'}] >>> """ self._load_data(kwargs=kwargs) @@ -259,15 +267,15 @@ def _load_data(self, kwargs) -> None: self.result_data_check() self.validate() + self._set_expanded_rules() - @property - def expanded_rules(self): + def _set_expanded_rules(self): """Expanded rule property.""" _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: _expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] - return _expanded_rules + setattr(self.Data, "expanded_rules", _expanded_rules) def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" @@ -368,7 +376,7 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self.expanded_rules: + for rule in self.Data.expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -488,14 +496,14 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: rules_unmatched: t.List[t.Dict[str, t.Any]] = [] rules_matched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule.expanded_rules: # pylint: disable=protected-access + if not match_rule.Data.expanded_ruless: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - elif not self.expanded_rules: # pylint: disable=protected-access + elif not self.Data.expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test.") - for rule in match_rule.expanded_rules: # pylint: disable=protected-access + for rule in match_rule.Data.expanded_ruless: # pylint: disable=protected-access rules_found.append(False) - for existing_rule in self.expanded_rules: + for existing_rule in self.Data.expanded_rules: missing = False for attr in attrs: # Examples of obj are match_rule.src_ip, match_rule.dst_port From e530d097f2ff524500afc976b727d443419f0307 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 18:03:12 +0100 Subject: [PATCH 23/31] refactoring detailed_match --- netutils/acl.py | 138 ++++++++++++++++++++++++----------------- tests/unit/test_acl.py | 44 ++++++------- 2 files changed, 102 insertions(+), 80 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index b7801026..868fb6cb 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -164,6 +164,20 @@ def _get_attributes(obj): return result +def _get_match_funcs(obj): + """Returns {'attr': match_attr_funct, ...} dict.""" + attrs = {} + for attr in dir(obj): + if attr.startswith("match_"): + match_name = attr[len("match_"):] # noqa: E203 + # When an attribute is not defined, can skip it + if not hasattr(match_rule, obj): + continue + attrs[match_name] = getattr(obj, attr) + + return attrs + + class ACLRule: """A class that helps you imagine an acl rule via methodologies.""" @@ -198,11 +212,6 @@ class Meta: # pylint: disable=too-few-public-methods order_enforce: t.List[str] = [] filter_same_ip: bool = True - class Data: # pylint: disable=too-few-public-methods - """Default data class.""" - - expanded_rules: t.Any = {} - def __init__(self, **kwargs): # pylint: disable=unused-argument """Initialize and load data. @@ -224,7 +233,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument ... ) >>> >>> - >>> rule.Data.expanded_rules + >>> rule._expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'src_zone': 'internal', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'dst_zone': 'external', 'protocol': 'tcp', 'action': 'permit'}] >>> """ @@ -270,12 +279,12 @@ def _load_data(self, kwargs) -> None: self._set_expanded_rules() def _set_expanded_rules(self): - """Expanded rule property.""" + """Expanded rule setter.""" _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: _expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] - setattr(self.Data, "expanded_rules", _expanded_rules) + self._expanded_rules = _expanded_rules def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" @@ -376,7 +385,7 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self.Data.expanded_rules: + for rule in self._expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -474,7 +483,7 @@ def match_dst_port(self, existing_port: str, check_port: str) -> bool: """ return existing_port == check_port - def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals + def detailed_match(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals """Verbose way of verifying match details. Args: @@ -483,61 +492,73 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: Returns: A dictionary with root keys of `rules_matched` and `rules_matched`. """ - attrs = [] - for name in dir(self): - if name.startswith("match_"): - obj_name = name[len("match_") :] # noqa: E203 - # When an attribute is not defined, can skip it - if not hasattr(match_rule, obj_name): - continue - attrs.append(obj_name) - rules_found: t.List[bool] = [] - rules_unmatched: t.List[t.Dict[str, t.Any]] = [] - rules_matched: t.List[t.Dict[str, t.Any]] = [] + products_matched: t.List[t.Dict[str, t.Any]] = [] + products_unmatched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule.Data.expanded_ruless: # pylint: disable=protected-access + if not match_rule._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - elif not self.Data.expanded_rules: # pylint: disable=protected-access + elif not self._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test.") - for rule in match_rule.Data.expanded_ruless: # pylint: disable=protected-access - rules_found.append(False) - for existing_rule in self.Data.expanded_rules: - missing = False - for attr in attrs: - # Examples of obj are match_rule.src_ip, match_rule.dst_port - rule_value = rule[attr] - existing_value = existing_rule[attr] - # Examples of getter are self.match_src_ip, self.match_dst_port - getter = getattr(self, f"match_{attr}")(existing_value, rule_value) - if not getter and getter is not None: - missing = True - break - # If the loop gets through with each existing rule not flagging - # the `missing` value, we know everything was matched, and the rule has - # found a complete match, we can break out of the loop at this point. - if not missing: - rules_found[-1] = True - break - detailed_info = { - "existing_rule_product": existing_rule, # pylint: disable=undefined-loop-variable - "match_rule_product": rule, - } - if rules_found[-1]: - rules_matched.append(detailed_info) + for match_product in match_rule._expanded_rules: # pylint: disable=protected-access + for existing_product in self._expanded_rules: + comparison_results = [] + for attr_name, attr_func in _get_match_funcs(self).items(): + comparison_results.append(attr_func(existing_product[attr_name], match_product[attr_name])) + # We found match_product in existing_product (all matchers returned True) + if all(comparison_results): + products_matched.append(match_product) + break # No need to compare remaining existing products. Performance improvement. else: - detailed_info["existing_rule_product"] = None - rules_unmatched.append(detailed_info) + products_unmatched.append(match_product) return { - "match": True if rules_matched and not rules_unmatched else False, + "match": True if products_matched and not products_unmatched else False, "existing_rule": self.serialize(), "match_rule": match_rule.serialize(), - "products_matched": rules_matched, - "products_unmatched": rules_unmatched, + "products_matched": products_matched, + "products_unmatched": products_unmatched, } + + # for match_product in match_rule._expanded_rules: # pylint: disable=protected-access + # rules_found.append(False) + # for existing_product in self._expanded_rules: + # missing = False + # for attr in attrs: + # # Examples of obj are match_rule.src_ip, match_rule.dst_port + # rule_value = match_product[attr] + # existing_value = existing_product[attr] + # # Examples of getter are self.match_src_ip, self.match_dst_port + # getter = getattr(self, f"match_{attr}")(existing_value, rule_value) + # if not getter and getter is not None: + # missing = True + # break + # # If the loop gets through with each existing rule not flagging + # # the `missing` value, we know everything was matched, and the rule has + # # found a complete match, we can break out of the loop at this point. + # if not missing: + # rules_found[-1] = True + # break + # detailed_info = { + # "existing_rule_product": existing_product, # pylint: disable=undefined-loop-variable + # "match_rule_product": match_product, + # } + # if rules_found[-1]: + # products_matched.append(detailed_info) + # else: + # detailed_info["existing_rule_product"] = None + # products_unmatched.append(detailed_info) + + # return { + # "match": True if products_matched and not products_unmatched else False, + # "existing_rule": self.serialize(), + # "match_rule": match_rule.serialize(), + # "products_matched": products_matched, + # "products_unmatched": products_unmatched, + # } + def match(self, match_rule: "ACLRule") -> bool: """Simple boolean way of verifying match or not. @@ -547,8 +568,9 @@ def match(self, match_rule: "ACLRule") -> bool: Returns: A boolean if there was a full match or not. """ - details = self.match_details(match_rule) - return not bool(details["rules_unmatched"]) + details = self.detailed_match(match_rule) + + return details["match"] def __repr__(self) -> str: """Set repr of the object to be sane.""" @@ -601,16 +623,16 @@ def match(self, rule: ACLRule) -> str: return True # mzb: change to bool return False # pylint: disable=undefined-loop-variable - def match_details(self, rule: ACLRule) -> t.Any: + def detailed_match(self, rule: ACLRule) -> t.Any: """Verbosely check the rules loaded in `load_data` match against a new `rule`. Args: rule: The `ACLRule` rule to test against the list of `ACLRule`s loaded in initiation. Returns: - The details from the `ACLRule.match_details` results. + The details from the `ACLRule.detailed_match` results. """ output = [] for item in self.rules: - output.append(item.match_details(rule)) # mzb: bugfix + output.append(item.detailed_match(rule)) # mzb: bugfix return output diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index b86ddd1b..0be40845 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -11,7 +11,7 @@ name="Check multiple sources pass. Check conversion of non-alpha tcp, e.g. with a dash", src_ip=["192.168.1.10", "192.168.1.11", "192.168.1.15-192.168.1.20"], dst_ip="172.16.0.10", - dst_port="tcp/www-http", + dst_port="80", action="permit", ), "received": True, @@ -21,7 +21,7 @@ name="Check with number in port definition", src_ip="192.168.0.10", dst_ip="192.168.250.11", - dst_port="6/80", + dst_port="80", action="permit", ), "received": True, @@ -31,7 +31,7 @@ name="Check with subnets", src_ip="192.168.0.0/25", dst_ip="172.16.0.0/24", - dst_port="6/80", + dst_port="80", action="permit", ), "received": True, @@ -41,7 +41,7 @@ name="Test partial match on Source IP", src_ip=["192.168.1.10", "192.168.2.10"], dst_ip="172.16.0.11", - dst_port="tcp/80", + dst_port="80", action="permit", ), "received": False, @@ -51,7 +51,7 @@ name="Test an entry that is not found", src_ip="192.168.1.10", dst_ip="192.168.240.1", - dst_port="tcp/80", + dst_port="80", action="permit", ), "received": False, @@ -73,14 +73,14 @@ name="Allow to internal web", src_ip=["192.168.0.0/24", "10.0.0.0/16"], dst_ip=["172.16.0.0/16", "192.168.250.10-192.168.250.20"], - dst_port=["tcp/80", "udp/53"], + dst_port=["80", "53"], action="permit", ), dict( name="Allow to internal dns", src_ip=["192.168.1.0/24"], dst_ip=["172.16.0.0/16"], - dst_port=["tcp/80", "udp/53"], + dst_port=["80", "53"], action="permit", ), dict( @@ -115,7 +115,7 @@ dst_port="tcp/www-http", action="permit", ), - "received": [{"obj": ("10.1.100.5", "10.1.200.0", "6/80"), "action": "allow"}], + "received": [{"obj": ("10.1.100.5", "10.1.200.0", "80"), "action": "allow"}], }, { "sent": dict( @@ -142,12 +142,12 @@ name="Check not found and denied", src_ip=["10.1.100.5", "10.1.100.6"], dst_ip="10.1.200.0", - dst_port="tcp/80", + dst_port="80", action="permit", ), "received": [ - {"obj": ("10.1.100.5", "10.1.200.0", "6/80"), "action": "allow"}, - {"obj": ("10.1.100.6", "10.1.200.0", "6/80"), "action": "allow"}, + {"obj": ("10.1.100.5", "10.1.200.0", "80"), "action": "allow"}, + {"obj": ("10.1.100.6", "10.1.200.0", "80"), "action": "allow"}, ], }, { @@ -211,8 +211,8 @@ } MATRIX = { - "red": {"blue": {"allow": ["6/80", "6/443"], "notify": ["6/25"]}, "orange": {"allow": ["6/80"]}}, - "blue": {"red": {"allow": ["6/80"]}}, + "red": {"blue": {"allow": ["80", "443"], "notify": ["25"]}, "orange": {"allow": ["80"]}}, + "blue": {"red": {"allow": ["80"]}}, } @@ -263,14 +263,14 @@ def test_schema(data): def test_schema_not_enforced_when_option_not_set(): try: - acl.ACLRule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action=100) + acl.ACLRule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="80", action=100) except Exception: # pylint: disable=broad-exception-caught assert False, "No error should have been raised" def test_schema_valid(): try: - TestSchemaRule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit") + TestSchemaRule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="80", action="permit") except Exception: # pylint: disable=broad-exception-caught assert False, "No error should have been raised" @@ -289,7 +289,7 @@ def test_schema2(data): def test_schema2_valid(): try: - TestSchema2Rule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="tcp/80", action="permit").validate() + TestSchema2Rule(src_ip="10.1.1.1", dst_ip="10.2.2.2", dst_port="80", action="permit").validate() except Exception: # pylint: disable=broad-exception-caught assert False, "No error should have been raised" @@ -364,7 +364,7 @@ def process_dst_ip(self, dst_ip): { "action": "permit", "dst_ip": "10.2.2.2", - "dst_port": "6/80", + "dst_port": "80", "name": "Check allow", "src_ip": "10.1.1.1", "protocol": None, @@ -374,7 +374,7 @@ def process_dst_ip(self, dst_ip): { "action": "permit", "dst_ip": "10.1.1.1", - "dst_port": "6/80", + "dst_port": "80", "name": "Check allow", "src_ip": "10.2.2.2", "protocol": None, @@ -384,7 +384,7 @@ def process_dst_ip(self, dst_ip): { "action": "permit", "dst_ip": "10.1.1.1", - "dst_port": "6/80", + "dst_port": "80", "name": "Check allow", "src_ip": "10.3.3.3", "protocol": None, @@ -394,7 +394,7 @@ def process_dst_ip(self, dst_ip): { "action": "permit", "dst_ip": "10.2.2.2", - "dst_port": "6/80", + "dst_port": "80", "name": "Check allow", "src_ip": "10.3.3.3", "protocol": None, @@ -404,7 +404,7 @@ def process_dst_ip(self, dst_ip): { "action": "permit", "dst_ip": "10.1.1.1", - "dst_port": "6/80", + "dst_port": "80", "name": "Check allow", "src_ip": "10.4.4.4", "protocol": None, @@ -414,7 +414,7 @@ def process_dst_ip(self, dst_ip): { "action": "permit", "dst_ip": "10.2.2.2", - "dst_port": "6/80", + "dst_port": "80", "name": "Check allow", "src_ip": "10.4.4.4", "protocol": None, From 435963b2337726c096ee69af6830285cb556c9fc Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 20:35:02 +0100 Subject: [PATCH 24/31] removing port processor --- netutils/acl.py | 117 ++++++++++------------------------------- tests/unit/test_acl.py | 32 +++++------ 2 files changed, 45 insertions(+), 104 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 868fb6cb..7b32fdeb 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -23,6 +23,9 @@ "dst_ip": {"$ref": "#/definitions/arrayOrIP"}, "dst_port": { "oneOf": [ + { + "type": "null" + }, { "$ref": "#/definitions/port", }, @@ -85,14 +88,26 @@ }, ], }, - "port": {"type": "string", "pattern": "^\\S+\\/\\S+$"}, + "port": { + "oneOf": [ + { + "type": "integer", + "minimum": 0, + "maximum": 65535, + }, + { + "type": "string", + "pattern": "^\d+$", + }, + ] + }, }, "required": [], } RESULT_SCHEMA = copy.deepcopy(INPUT_SCHEMA) -RESULT_SCHEMA["definitions"]["port"]["pattern"] = "^\\d+\\/\\d+$" # type: ignore +# RESULT_SCHEMA["definitions"]["port"]["pattern"] = "^\\d+\\/\\d+$" # type: ignore def _cartesian_product(data: t.Dict[str, str]) -> t.List[t.Dict[str, t.Any]]: @@ -167,13 +182,13 @@ def _get_attributes(obj): def _get_match_funcs(obj): """Returns {'attr': match_attr_funct, ...} dict.""" attrs = {} - for attr in dir(obj): - if attr.startswith("match_"): - match_name = attr[len("match_"):] # noqa: E203 + for attr_name in dir(obj): + if attr_name.startswith("match_") and attr_name not in ["match_details"]: + match_name = attr_name[len("match_"):] # noqa: E203 # When an attribute is not defined, can skip it - if not hasattr(match_rule, obj): + if not hasattr(obj, match_name): continue - attrs[match_name] = getattr(obj, attr) + attrs[match_name] = getattr(obj, attr_name) return attrs @@ -312,44 +327,6 @@ def validate(self) -> t.Any: results.extend(result) return results - # def process_dst_port( - # self, dst_port: t.Any - # ) -> t.Union[t.List[str], None]: # pylint: disable=inconsistent-return-statements - # """Convert port and protocol information. - # - # Method supports a single format of `{protocol}/{port}`, and will translate the - # protocol for all IANA defined protocols. The port will be translated for TCP and - # UDP ports only. For all other protocols should use port of 0, e.g. `ICMP/0` for ICMP - # or `50/0` for ESP. Similarly, IANA defines the port mappings, while these are mostly - # staying unchanged, but sourced from - # https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.csv. - # """ - # output = [] - # if not self.Meta.dst_port_process: - # return self.dst_port - # if not isinstance(dst_port, list): - # dst_port = [dst_port] - # for item in dst_port: - # protocol = item.split("/")[0] - # port = item.split("/")[1] - # if protocol.isalpha(): - # if not PROTO_NAME_TO_NUM.get(protocol.upper()): - # raise ValueError( - # f"Protocol {protocol} was not found in netutils.protocol_mapper.PROTO_NAME_TO_NUM." - # ) - # protocol = PROTO_NAME_TO_NUM[protocol.upper()] - # # test port[0] vs port, since dashes do not count, e.g. www-http - # if int(protocol) == 6 and port[0].isalpha(): - # if not TCP_NAME_TO_NUM.get(port.upper()): - # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.TCP_NAME_TO_NUM.") - # port = TCP_NAME_TO_NUM[port.upper()] - # if int(protocol) == 17 and port[0].isalpha(): - # if not UDP_NAME_TO_NUM.get(port.upper()): - # raise ValueError(f"Port {port} was not found in netutils.protocol_mapper.UDP_NAME_TO_NUM.") - # port = UDP_NAME_TO_NUM[port.upper()] - # output.append(f"{protocol}/{port}") - # return output - def enforce(self) -> t.List[t.Dict[str, t.Any]]: """Run through any method that startswith('enforce_') and run that method. @@ -430,7 +407,7 @@ def match_src_ip(self, existing_ip: str, check_ip: str) -> bool: if existing_ip == check_ip: # None cases return True else: - is_ip_within(check_ip, existing_ip) + return is_ip_within(check_ip, existing_ip) def match_src_zone(self, existing_src_zone: str, check_src_zone: str) -> bool: """Match the source zone for equality. @@ -483,7 +460,7 @@ def match_dst_port(self, existing_port: str, check_port: str) -> bool: """ return existing_port == check_port - def detailed_match(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals + def match_detail(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals """Verbose way of verifying match details. Args: @@ -505,7 +482,9 @@ def detailed_match(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint for existing_product in self._expanded_rules: comparison_results = [] for attr_name, attr_func in _get_match_funcs(self).items(): - comparison_results.append(attr_func(existing_product[attr_name], match_product[attr_name])) + r= attr_func(existing_product[attr_name], match_product[attr_name]) + comparison_results.append(r) + print(f"matched {attr_name} by {attr_func}, with result {r}") # We found match_product in existing_product (all matchers returned True) if all(comparison_results): products_matched.append(match_product) @@ -521,44 +500,6 @@ def detailed_match(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint "products_unmatched": products_unmatched, } - - # for match_product in match_rule._expanded_rules: # pylint: disable=protected-access - # rules_found.append(False) - # for existing_product in self._expanded_rules: - # missing = False - # for attr in attrs: - # # Examples of obj are match_rule.src_ip, match_rule.dst_port - # rule_value = match_product[attr] - # existing_value = existing_product[attr] - # # Examples of getter are self.match_src_ip, self.match_dst_port - # getter = getattr(self, f"match_{attr}")(existing_value, rule_value) - # if not getter and getter is not None: - # missing = True - # break - # # If the loop gets through with each existing rule not flagging - # # the `missing` value, we know everything was matched, and the rule has - # # found a complete match, we can break out of the loop at this point. - # if not missing: - # rules_found[-1] = True - # break - # detailed_info = { - # "existing_rule_product": existing_product, # pylint: disable=undefined-loop-variable - # "match_rule_product": match_product, - # } - # if rules_found[-1]: - # products_matched.append(detailed_info) - # else: - # detailed_info["existing_rule_product"] = None - # products_unmatched.append(detailed_info) - - # return { - # "match": True if products_matched and not products_unmatched else False, - # "existing_rule": self.serialize(), - # "match_rule": match_rule.serialize(), - # "products_matched": products_matched, - # "products_unmatched": products_unmatched, - # } - def match(self, match_rule: "ACLRule") -> bool: """Simple boolean way of verifying match or not. @@ -568,7 +509,7 @@ def match(self, match_rule: "ACLRule") -> bool: Returns: A boolean if there was a full match or not. """ - details = self.detailed_match(match_rule) + details = self.match_detail(match_rule) return details["match"] @@ -634,5 +575,5 @@ def detailed_match(self, rule: ACLRule) -> t.Any: """ output = [] for item in self.rules: - output.append(item.detailed_match(rule)) # mzb: bugfix + output.append(item.match_detail(rule)) # mzb: bugfix return output diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index 0be40845..e1c42a2f 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -61,7 +61,7 @@ name="Test an action not permit or deny", src_ip="10.1.1.1", dst_ip="10.255.255.255", - dst_port="tcp/443", + dst_port="443", action="permit", ), "received": False, @@ -87,21 +87,21 @@ name="Allow to internal https", src_ip=["10.0.0.0/8"], dst_ip=["172.16.0.0/16"], - dst_port=["tcp/443"], + dst_port=["443"], action="deny", ), dict( name="Drop (not deny) this specfic packet", src_ip="10.1.1.1", dst_ip="10.255.255.255", - dst_port="tcp/443", + dst_port="443", action="drop", ), dict( name="Allow External DNS", src_ip=["0.0.0.0/0"], dst_ip=["8.8.8.8/32", "8.8.4.4/32"], - dst_port=["udp/53"], + dst_port=["53"], action="permit", ), ] @@ -112,7 +112,7 @@ name="Check allow", src_ip="10.1.100.5", dst_ip="10.1.200.0", - dst_port="tcp/www-http", + dst_port="80", action="permit", ), "received": [{"obj": ("10.1.100.5", "10.1.200.0", "80"), "action": "allow"}], @@ -122,20 +122,20 @@ name="Check Notify", src_ip="10.1.100.5", dst_ip="10.1.200.0", - dst_port="tcp/25", + dst_port="25", action="permit", ), - "received": [{"obj": ("10.1.100.5", "10.1.200.0", "6/25"), "action": "notify"}], + "received": [{"obj": ("10.1.100.5", "10.1.200.0", "25"), "action": "notify"}], }, { "sent": dict( name="Check not found and denied", src_ip="10.1.100.5", dst_ip="10.1.200.0", - dst_port="tcp/53", + dst_port="53", action="permit", ), - "received": [{"obj": ("10.1.100.5", "10.1.200.0", "6/53"), "action": "deny"}], + "received": [{"obj": ("10.1.100.5", "10.1.200.0", "53"), "action": "deny"}], }, { "sent": dict( @@ -155,10 +155,10 @@ name="Nothing found", src_ip="1.1.1.1", dst_ip="2.2.2.2", - dst_port="tcp/53", + dst_port="53", action="permit", ), - "received": [{"obj": ("1.1.1.1", "2.2.2.2", "6/53"), "action": "deny"}], + "received": [{"obj": ("1.1.1.1", "2.2.2.2", "53"), "action": "deny"}], }, ] @@ -168,7 +168,7 @@ name="Bad IP", src_ip="10.1.100.A", dst_ip="10.1.200.0", - dst_port="tcp/www-http", + dst_port="80", action="permit", ), }, @@ -177,7 +177,7 @@ name="Bad port", src_ip="10.1.100.5", dst_ip="10.1.200.0", - dst_port="tcp25", + dst_port="25", action="permit", ), }, @@ -186,7 +186,7 @@ name="Bad IP in list", src_ip=["10.1.100.5", "10.1.100.A"], dst_ip="10.1.200.0", - dst_port="tcp/25", + dst_port="25", action="permit", ), }, @@ -198,7 +198,7 @@ name="Check allow", src_ip="10.1.100.1", dst_ip="10.1.200.0", - dst_port="6/www-http", + dst_port="80", action=100, ), }, @@ -357,7 +357,7 @@ def process_dst_ip(self, dst_ip): name="Check allow", src_ip=["red", "blue", "10.4.4.4"], dst_ip=["white"], - dst_port="6/www-http", + dst_port="80", action="permit", ), "received": [ From 95852c0967df39f0cbd4c827b3d2f6ac2589d29c Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Wed, 13 Mar 2024 20:35:33 +0100 Subject: [PATCH 25/31] removing port processor --- netutils/acl.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 7b32fdeb..22e0a8f0 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -482,9 +482,7 @@ def match_detail(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: for existing_product in self._expanded_rules: comparison_results = [] for attr_name, attr_func in _get_match_funcs(self).items(): - r= attr_func(existing_product[attr_name], match_product[attr_name]) - comparison_results.append(r) - print(f"matched {attr_name} by {attr_func}, with result {r}") + comparison_results.append(rattr_func(existing_product[attr_name], match_product[attr_name])) # We found match_product in existing_product (all matchers returned True) if all(comparison_results): products_matched.append(match_product) From 79ba760bd9e1ef82c72dd27018926d88351f64e9 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Thu, 14 Mar 2024 09:12:53 +0100 Subject: [PATCH 26/31] improving match detail --- netutils/acl.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 22e0a8f0..88cb3441 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -480,13 +480,11 @@ def match_detail(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: for match_product in match_rule._expanded_rules: # pylint: disable=protected-access for existing_product in self._expanded_rules: - comparison_results = [] - for attr_name, attr_func in _get_match_funcs(self).items(): - comparison_results.append(rattr_func(existing_product[attr_name], match_product[attr_name])) - # We found match_product in existing_product (all matchers returned True) - if all(comparison_results): + # Break if we find match_product in existing_product (all matchers returned True) + if all([attr_func(existing_product[attr_name], match_product[attr_name]) for + attr_name, attr_func in _get_match_funcs(self).items()]): products_matched.append(match_product) - break # No need to compare remaining existing products. Performance improvement. + break # Do not compare remaining existing products. else: products_unmatched.append(match_product) From 5ca290f5b20dc4d5158f9dd1cee966327dc3079f Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Thu, 14 Mar 2024 13:28:49 +0100 Subject: [PATCH 27/31] fixing ci --- netutils/acl.py | 66 ++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 88cb3441..4dcab055 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -4,7 +4,6 @@ import copy import types import typing as t -from netutils.protocol_mapper import PROTO_NAME_TO_NUM, TCP_NAME_TO_NUM, UDP_NAME_TO_NUM from netutils.ip import is_ip_within try: @@ -23,9 +22,7 @@ "dst_ip": {"$ref": "#/definitions/arrayOrIP"}, "dst_port": { "oneOf": [ - { - "type": "null" - }, + {"type": "null"}, { "$ref": "#/definitions/port", }, @@ -97,7 +94,7 @@ }, { "type": "string", - "pattern": "^\d+$", + "pattern": "^\\d+$", }, ] }, @@ -107,7 +104,6 @@ RESULT_SCHEMA = copy.deepcopy(INPUT_SCHEMA) -# RESULT_SCHEMA["definitions"]["port"]["pattern"] = "^\\d+\\/\\d+$" # type: ignore def _cartesian_product(data: t.Dict[str, str]) -> t.List[t.Dict[str, t.Any]]: @@ -166,10 +162,9 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: raise ValueError() -def _get_attributes(obj): +def _get_attributes(obj: t.Any) -> dict[str, t.Any]: """Function that describes class attributes.""" result = { - # name for name in dir(cls) attr: getattr(obj, attr) for attr in dir(obj) if not attr.startswith("_") @@ -179,12 +174,12 @@ def _get_attributes(obj): return result -def _get_match_funcs(obj): +def _get_match_funcs(obj: t.Any) -> dict[str, t.Any]: """Returns {'attr': match_attr_funct, ...} dict.""" attrs = {} for attr_name in dir(obj): if attr_name.startswith("match_") and attr_name not in ["match_details"]: - match_name = attr_name[len("match_"):] # noqa: E203 + match_name = attr_name[len("match_") :] # noqa: E203 # When an attribute is not defined, can skip it if not hasattr(obj, match_name): continue @@ -227,7 +222,7 @@ class Meta: # pylint: disable=too-few-public-methods order_enforce: t.List[str] = [] filter_same_ip: bool = True - def __init__(self, **kwargs): # pylint: disable=unused-argument + def __init__(self, **kwargs: t.Any) -> None: # pylint: disable=unused-argument """Initialize and load data. Args: @@ -254,7 +249,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument """ self._load_data(kwargs=kwargs) - def _load_data(self, kwargs) -> None: + def _load_data(self, kwargs: dict[str, t.Any]) -> None: """Load the data into the rule while verifying input data, result data, and processing data.""" # Remaining kwargs stored under ACLRule.Meta pop_kwargs = [] @@ -280,7 +275,7 @@ def _load_data(self, kwargs) -> None: self.input_data_check() for attr in _get_attributes(self): - processor_func = getattr(self, f"process_{attr}", None) # todo(mzb): remove special case for dst_port ! + processor_func = getattr(self, f"process_{attr}", None) if processor_func: _attr_data = processor_func(self._processed_data[attr]) else: @@ -293,13 +288,13 @@ def _load_data(self, kwargs) -> None: self.validate() self._set_expanded_rules() - def _set_expanded_rules(self): + def _set_expanded_rules(self) -> None: """Expanded rule setter.""" _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: _expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] - self._expanded_rules = _expanded_rules + self._expanded_rules = _expanded_rules # pylint: disable=attribute-defined-outside-init def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" @@ -406,8 +401,8 @@ def match_src_ip(self, existing_ip: str, check_ip: str) -> bool: """ if existing_ip == check_ip: # None cases return True - else: - return is_ip_within(check_ip, existing_ip) + + return is_ip_within(check_ip, existing_ip) def match_src_zone(self, existing_src_zone: str, check_src_zone: str) -> bool: """Match the source zone for equality. @@ -433,8 +428,8 @@ def match_dst_ip(self, existing_ip: str, check_ip: str) -> bool: """ if existing_ip == check_ip: # None cases return True - else: - return is_ip_within(check_ip, existing_ip) + + return is_ip_within(check_ip, existing_ip) def match_dst_zone(self, existing_dst_zone: str, check_dst_zone: str) -> bool: """Match the destination zone for equality. @@ -469,27 +464,29 @@ def match_detail(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: Returns: A dictionary with root keys of `rules_matched` and `rules_matched`. """ - products_matched: t.List[t.Dict[str, t.Any]] = [] products_unmatched: t.List[t.Dict[str, t.Any]] = [] if not match_rule._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - elif not self._expanded_rules: # pylint: disable=protected-access + + if not self._expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test.") for match_product in match_rule._expanded_rules: # pylint: disable=protected-access for existing_product in self._expanded_rules: # Break if we find match_product in existing_product (all matchers returned True) - if all([attr_func(existing_product[attr_name], match_product[attr_name]) for - attr_name, attr_func in _get_match_funcs(self).items()]): + if all( + attr_func(existing_product[attr_name], match_product[attr_name]) + for attr_name, attr_func in _get_match_funcs(self).items() + ): products_matched.append(match_product) break # Do not compare remaining existing products. else: products_unmatched.append(match_product) return { - "match": True if products_matched and not products_unmatched else False, + "match": not bool(products_unmatched), "existing_rule": self.serialize(), "match_rule": match_rule.serialize(), "products_matched": products_matched, @@ -507,13 +504,13 @@ def match(self, match_rule: "ACLRule") -> bool: """ details = self.match_detail(match_rule) - return details["match"] + return details["match"] # type: ignore def __repr__(self) -> str: """Set repr of the object to be sane.""" - return self.name + return self.name or "Name is not set" - def serialize(self) -> dict: + def serialize(self) -> dict[str, t.Any]: """Primitive Serializer.""" return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} @@ -537,16 +534,16 @@ def __init__(self, data: t.Any, *args: t.Any, **kwargs: t.Any): # pylint: disab self.rules: t.List[t.Any] = [] self.load_data(data=data) - def load_data(self, data) -> None: + def load_data(self, data: t.Any) -> None: """Load the data for multiple rules.""" for item in data: self.rules.append(self.Meta.class_obj(**item)) - def serialize(self) -> list: + def serialize(self) -> list[t.Any]: """Primitive Serializer.""" return [rule.serialize() for rule in self.rules] - def match(self, rule: ACLRule) -> str: + def match(self, rule: ACLRule) -> bool: """Check the rules loaded in `load_data` match against a new `rule`. Args: @@ -556,9 +553,10 @@ def match(self, rule: ACLRule) -> str: The response from the rule that matched, or `deny` by default. """ for item in self.rules: - if item.match(rule): # mzb: bugfix - return True # mzb: change to bool - return False # pylint: disable=undefined-loop-variable + if item.match(rule): + return True + + return False def detailed_match(self, rule: ACLRule) -> t.Any: """Verbosely check the rules loaded in `load_data` match against a new `rule`. @@ -571,5 +569,5 @@ def detailed_match(self, rule: ACLRule) -> t.Any: """ output = [] for item in self.rules: - output.append(item.match_detail(rule)) # mzb: bugfix + output.append(item.match_detail(rule)) return output From 68d834c595f0cffc1ecba8e4c6a964a57df95180 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Thu, 14 Mar 2024 13:34:20 +0100 Subject: [PATCH 28/31] match details name fix --- netutils/acl.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index 4dcab055..2210c1b0 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -455,7 +455,7 @@ def match_dst_port(self, existing_port: str, check_port: str) -> bool: """ return existing_port == check_port - def match_detail(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals + def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: disable=too-many-locals """Verbose way of verifying match details. Args: @@ -502,7 +502,7 @@ def match(self, match_rule: "ACLRule") -> bool: Returns: A boolean if there was a full match or not. """ - details = self.match_detail(match_rule) + details = self.match_details(match_rule) return details["match"] # type: ignore @@ -569,5 +569,5 @@ def detailed_match(self, rule: ACLRule) -> t.Any: """ output = [] for item in self.rules: - output.append(item.match_detail(rule)) + output.append(item.match_details(rule)) return output From b5547a3e01de11e6d03d11c8abf271078a2ba954 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 15 Mar 2024 09:51:11 +0100 Subject: [PATCH 29/31] docs updates --- docs/user/lib_use_cases_acl.md | 41 +++++++++++++++++----------------- netutils/acl.py | 32 ++++++++++++++------------ tests/unit/test_acl.py | 4 ++-- 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/docs/user/lib_use_cases_acl.md b/docs/user/lib_use_cases_acl.md index 49cfe583..eece36f8 100644 --- a/docs/user/lib_use_cases_acl.md +++ b/docs/user/lib_use_cases_acl.md @@ -82,7 +82,7 @@ The `ACLRule` class at a high level: ### Initialization & Loading Data -The initialization process calls on the `load_data` method. This on a high level verifies schema of initial data, allows you to process data (e.g. convert tcp/https -> 80/443), expand data, determine Cartesian product (or permutations) of the firewall rule (traditionally 5-tuple), and verifies schema of result data. +`ACLRule` initialization process calls on the `__init__` method. This on a high level verifies schema of initial data, allows you to process data (e.g. convert tcp/https -> 80/443), expand data, determine Cartesian product (or permutations) of the firewall rule (traditionally 5-tuple), and verifies schema of result data. The Cartesian product (or permutations) is key to the functionality of other steps, this allows you to evaluate each rule based on the smallest view of the data, so pay close attention to those steps, as it is important to other methods as well. @@ -100,14 +100,13 @@ Many of validations will be based on IPs, but not all. Here you will find a written understanding of what is happening in the code: -- The init method takes in data and calls `load_data`. -- The method `load_data` processes the input data. +- The init method takes in data as key-value arguments and calls `_load_data`. +- The method `_load_data` processes the input data. - The `input_data_check` method is called and verifies the input data based on the specified JSON schema. - This is controlled by the `input_data_verify` attribute and schema defined in `input_data_schema`. - - For each `self.attrs`, a method name matching `f"process_{attr}"`, (e.g. `process_src_ip()`) is called. + - For each `self.attrs`, a method name matching `f"process_{attr}"`, (e.g. `process_src_ip()`) is called for `src_ip` attribute. - This allows you to inherit from and provide your own custom processes to convert, expand, or otherwise modify data before being evaluated. - - The `process_dst_port` method processes the `dst_port` attribute by converting the protocol and port information, it is enabled by default but controlled with the `dst_port_process` attribute. - - Both a dictionary `self.processed` and attributes (e.g. self.action, self.src_ip, etc.) are created. + - Dictionaries `self._preprocessed_data`, `self._processed_data` and attributes (e.g. `self.action`, `self.src_ip`, etc.) are created. - The `result_data_check` method verifies the processed data based on the specified JSON schema. - This is controlled by the `result_data_verify` attribute which is disabled by default. - The `validate` method validating the rule using a series of custom methods starting with `validate_` prefixes. @@ -146,7 +145,7 @@ While not accurate in all use cases it would be best practice to run any of your ### Match & Match Details -The `match_details` method provides a verbose way of verifying match details between two ACL rule's, the `match` method uses `match_details` and provides a boolean if there are any rules in `rules_unmatched` which would tell you if you had a full match or not. We will only review in detail the `match_details`. +The `match_details` method provides a verbose way of verifying match details between two ACL rule's, the `match` method uses `match_details` and provides a boolean if there are any rules in `products_unmatched` which would tell you if you had a full match or not. We will only review in detail the `match_details`. Here you will find a written understanding of what is happening in the code: @@ -155,13 +154,13 @@ Here you will find a written understanding of what is happening in the code: - This allows you to inherit from and provide your own custom equality check or verify with your business logic. - You do not need to have a `f"match_{attr}"` method for every attr, description as example would not be a good candidate to match on. - Equality checks are done on `src_zone`, `dst_zone`, `action`, and `port` by default. - - An `is_ip_within` check is done with for `src_ip` and `dst_ip` by default. + - Equality checks and an `is_ip_within` check is done with for `src_ip` and `dst_ip` by default. - In the process, details are provided for and returned: - - `rules_matched` - Root key that is a list of dictionaries of rules that matched. - - `rules_unmatched` - Root key that is a list of dictionaries of rules that did not match. - - `existing_rule_product` - The original expanded_rule that existed in this item. + - `match` - Root key, of type `bool`, that indicates whether a rule was matched. - `existing_rule` - The full original rule (not expanded_rule) that existed. - - `match_rule` - The full original rule that tested against, only shown in `rules_matched` root key. + - `match_rule` - The full original rule that tested against. + - `products_matched` - Root key that is a list of dictionaries of match products that matched. + - `products_unmatched` - Root key that is a list of dictionaries of match products that did not match. This data could help you to understand what matched, why it matched, and other metadata. This detail data can be used to in `ACLRules` to aggregate and ask more interesting questions. @@ -183,21 +182,21 @@ Here we can test if a rule is matched via the existing ruleset. We can leverage **Simple Example** ```python ->>> from netutils.acl import ACLRules +>>> from netutils.acl import ACLRules, ACLRule >>> >>> existing_acls = [ ... dict( ... name="Allow to internal web", ... src_ip=["192.168.0.0/24", "10.0.0.0/16"], ... dst_ip=["172.16.0.0/16", "192.168.250.10-192.168.250.20"], -... dst_port=["tcp/80", "udp/53"], +... dst_port=["80", "53"], ... action="permit", ... ), ... dict( ... name="Allow to internal dns", ... src_ip=["192.168.1.0/24"], ... dst_ip=["172.16.0.0/16"], -... dst_port=["tcp/80", "udp/53"], +... dst_port=["80", "53"], ... action="permit", ... ) ... ] @@ -206,24 +205,24 @@ Here we can test if a rule is matched via the existing ruleset. We can leverage ... name="Check multiple sources pass", ... src_ip=["192.168.1.10", "192.168.1.11", "192.168.1.15-192.168.1.20"], ... dst_ip="172.16.0.10", -... dst_port="tcp/www-http", +... dst_port="80", ... action="permit", ... ) >>> ->>> ACLRules(existing_acls).match(new_acl_match) -'permit' +>>> ACLRules(existing_acls).match(ACLRule(**new_acl_match)) +True >>> >>> >>> new_acl_non_match = dict( ... name="Check no match", ... src_ip=["10.1.1.1"], ... dst_ip="172.16.0.10", -... dst_port="tcp/www-http", +... dst_port="80", ... action="permit", ... ) >>> ->>> ACLRules(existing_acls).match(new_acl_non_match) -'deny' +>>> ACLRules(existing_acls).match(ACLRule(**new_acl_non_match)) +False >>> ``` diff --git a/netutils/acl.py b/netutils/acl.py index 2210c1b0..f1f3a515 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -162,7 +162,7 @@ def _check_schema(data: t.Any, schema: t.Any, verify: bool) -> None: raise ValueError() -def _get_attributes(obj: t.Any) -> dict[str, t.Any]: +def _get_attributes(obj: t.Any) -> t.Dict[str, t.Any]: """Function that describes class attributes.""" result = { attr: getattr(obj, attr) @@ -174,7 +174,7 @@ def _get_attributes(obj: t.Any) -> dict[str, t.Any]: return result -def _get_match_funcs(obj: t.Any) -> dict[str, t.Any]: +def _get_match_funcs(obj: t.Any) -> t.Dict[str, t.Any]: """Returns {'attr': match_attr_funct, ...} dict.""" attrs = {} for attr_name in dir(obj): @@ -243,13 +243,13 @@ def __init__(self, **kwargs: t.Any) -> None: # pylint: disable=unused-argument ... ) >>> >>> - >>> rule._expanded_rules + >>> rule.expanded_rules [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'src_zone': 'internal', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'dst_zone': 'external', 'protocol': 'tcp', 'action': 'permit'}] >>> """ self._load_data(kwargs=kwargs) - def _load_data(self, kwargs: dict[str, t.Any]) -> None: + def _load_data(self, kwargs: t.Dict[str, t.Any]) -> None: """Load the data into the rule while verifying input data, result data, and processing data.""" # Remaining kwargs stored under ACLRule.Meta pop_kwargs = [] @@ -292,9 +292,13 @@ def _set_expanded_rules(self) -> None: """Expanded rule setter.""" _expanded_rules = _cartesian_product(self._processed_data) if self.Meta.filter_same_ip: - _expanded_rules = [item for item in _expanded_rules if item["dst_ip"] != item["src_ip"]] + _expanded_rules = [ + item + for item in _expanded_rules + if (item["dst_ip"] != item["src_ip"]) or (item["dst_ip"] is None and item["src_ip"] is None) + ] - self._expanded_rules = _expanded_rules # pylint: disable=attribute-defined-outside-init + self.expanded_rules = _expanded_rules # pylint: disable=attribute-defined-outside-init def input_data_check(self) -> None: """Verify the input data against the specified JSONSchema or using a simple dictionary check.""" @@ -357,7 +361,7 @@ def enforce_matrix(self) -> t.Union[t.List[t.Dict[str, t.Any]], None]: if not self.Meta.matrix_definition: raise ValueError("You must set a matrix definition dictionary to use the matrix feature.") actions = [] - for rule in self._expanded_rules: + for rule in self.expanded_rules: source = rule["src_ip"] destination = rule["dst_ip"] port = rule["dst_port"] @@ -467,14 +471,14 @@ def match_details(self, match_rule: "ACLRule") -> t.Dict[str, t.Any]: # pylint: products_matched: t.List[t.Dict[str, t.Any]] = [] products_unmatched: t.List[t.Dict[str, t.Any]] = [] - if not match_rule._expanded_rules: # pylint: disable=protected-access + if not match_rule.expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test against.") - if not self._expanded_rules: # pylint: disable=protected-access + if not self.expanded_rules: # pylint: disable=protected-access raise ValueError("There is no expanded rules to test.") - for match_product in match_rule._expanded_rules: # pylint: disable=protected-access - for existing_product in self._expanded_rules: + for match_product in match_rule.expanded_rules: # pylint: disable=protected-access + for existing_product in self.expanded_rules: # Break if we find match_product in existing_product (all matchers returned True) if all( attr_func(existing_product[attr_name], match_product[attr_name]) @@ -510,9 +514,9 @@ def __repr__(self) -> str: """Set repr of the object to be sane.""" return self.name or "Name is not set" - def serialize(self) -> dict[str, t.Any]: + def serialize(self) -> t.Dict[str, t.Any]: """Primitive Serializer.""" - return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + return {k: v for k, v in self.__dict__.items() if not (k.startswith("_") or k == "expanded_rules")} class ACLRules: @@ -539,7 +543,7 @@ def load_data(self, data: t.Any) -> None: for item in data: self.rules.append(self.Meta.class_obj(**item)) - def serialize(self) -> list[t.Any]: + def serialize(self) -> t.List[t.Any]: """Primitive Serializer.""" return [rule.serialize() for rule in self.rules] diff --git a/tests/unit/test_acl.py b/tests/unit/test_acl.py index e1c42a2f..24bae7e9 100644 --- a/tests/unit/test_acl.py +++ b/tests/unit/test_acl.py @@ -177,7 +177,7 @@ name="Bad port", src_ip="10.1.100.5", dst_ip="10.1.200.0", - dst_port="25", + dst_port="5o0", action="permit", ), }, @@ -429,4 +429,4 @@ def process_dst_ip(self, dst_ip): @pytest.mark.parametrize("data", add_group_check) def test_custom_address_group(data): obj = TestAddrGroups(**data["sent"]) - assert obj._expanded_rules == data["received"] # pylint: disable=protected-access + assert obj.expanded_rules == data["received"] # pylint: disable=protected-access From d81a7c2b6b1867ab6766c5b79a099c879f6a83a9 Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 15 Mar 2024 10:23:48 +0100 Subject: [PATCH 30/31] Fix func name --- netutils/acl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index f1f3a515..ea4de28d 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -562,14 +562,14 @@ def match(self, rule: ACLRule) -> bool: return False - def detailed_match(self, rule: ACLRule) -> t.Any: + def match_details(self, rule: ACLRule) -> t.Any: """Verbosely check the rules loaded in `load_data` match against a new `rule`. Args: rule: The `ACLRule` rule to test against the list of `ACLRule`s loaded in initiation. Returns: - The details from the `ACLRule.detailed_match` results. + The details from the `ACLRule.match_detail` results. """ output = [] for item in self.rules: From 5ee0a05db3c79e61d3743f74c38d2dcbd2212d7e Mon Sep 17 00:00:00 2001 From: Marek Zbroch Date: Fri, 15 Mar 2024 10:34:43 +0100 Subject: [PATCH 31/31] Fix example --- netutils/acl.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/netutils/acl.py b/netutils/acl.py index ea4de28d..316cb671 100644 --- a/netutils/acl.py +++ b/netutils/acl.py @@ -236,7 +236,7 @@ def __init__(self, **kwargs: t.Any) -> None: # pylint: disable=unused-argument ... src_ip=["10.1.1.1"], ... src_zone="internal", ... dst_ip="172.16.0.10", - ... dst_port="6/80", + ... dst_port="80", ... dst_zone="external", ... protocol='tcp', ... action="permit", @@ -244,7 +244,7 @@ def __init__(self, **kwargs: t.Any) -> None: # pylint: disable=unused-argument >>> >>> >>> rule.expanded_rules - [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'src_zone': 'internal', 'dst_ip': '172.16.0.10', 'dst_port': '6/80', 'dst_zone': 'external', 'protocol': 'tcp', 'action': 'permit'}] + [{'name': 'Check no match', 'src_ip': '10.1.1.1', 'src_zone': 'internal', 'dst_ip': '172.16.0.10', 'dst_port': '80', 'dst_zone': 'external', 'protocol': 'tcp', 'action': 'permit'}] >>> """ self._load_data(kwargs=kwargs) @@ -569,7 +569,7 @@ def match_details(self, rule: ACLRule) -> t.Any: rule: The `ACLRule` rule to test against the list of `ACLRule`s loaded in initiation. Returns: - The details from the `ACLRule.match_detail` results. + The details from the `ACLRule.match_details` results. """ output = [] for item in self.rules: