diff --git a/README.md b/README.md index 3781ac5..ee78ff5 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ An IP address must be in one of the whitelisted ranges for a response to be retu `NIPIO_BLACKLIST`: A space-separated list of description=ip blacklisted pairs. Example: `some_description=10.0.0.1 other_description=10.0.0.2`. +`NIPIO_CAA`: A space-separated list of description=value pairs for CAA `issue` records returned for whitelisted IPs. Example: `letsencrypt=letsencrypt.org`. + This is useful if you're creating your own [Dockerfile](Dockerfile). ## Troubleshooting diff --git a/nipio/backend.conf b/nipio/backend.conf index 818019f..43b2a67 100644 --- a/nipio/backend.conf +++ b/nipio/backend.conf @@ -50,3 +50,11 @@ private_net_192_168 = 192.168.0.0/16 # blacklisted ips (optional) [blacklist] some_description = 10.0.0.1 + + +# CAA `issue` records to include for whitelisted IPs. +[caa] +# To limit to Lets Encrypt only: +# letsencrypt=letsencrypt.org +# To block all CAs from issuing certificates: +# deny_all=; diff --git a/nipio/backend.py b/nipio/backend.py index 43436f6..c23229a 100755 --- a/nipio/backend.py +++ b/nipio/backend.py @@ -55,7 +55,7 @@ def _get_env_splitted( values = environment_value.split(linesep) result: List[Tuple[str, str]] = [] for value in values: - parts = value.split(pairsep, 2) + parts = value.split(pairsep, 1) result.append((parts[0], parts[1])) return result else: @@ -111,6 +111,7 @@ class DynamicBackend: name_servers whitelisted_ranges blacklisted_ips + caa bits auth @@ -129,6 +130,8 @@ class DynamicBackend: NIPIO_WHITELIST -- A space-separated list of description=range pairs to whitelist. The range should be in CIDR format. NIPIO_BLACKLIST -- A space-separated list of description=ip blacklisted pairs. + NIPIO_CAA -- A space-separated list of description=value pairs for CAA `issue` + records returned for whitelisted IPs NIPIO_AUTH -- Indicates whether this response is authoritative, this is for DNSSEC. NIPIO_BITS -- Scopebits indicates how many bits from the subnet provided in the question. @@ -150,6 +153,7 @@ def __init__(self) -> None: self.name_servers: Dict[str, str] = {} self.whitelisted_ranges: List[IPv4Network] = [] self.blacklisted_ips: List[str] = [] + self.caa: List[str] = [] self.bits = "0" self.auth = "1" @@ -202,6 +206,13 @@ def configure(self, config_filename: str = _get_default_config_file()) -> None: ): self.blacklisted_ips.append(entry[1]) + if "NIPIO_CAA" in os.environ or config.has_section("caa"): + for entry in _get_env_splitted( + "NIPIO_CAA", + config.items("caa") if config.has_section("caa") else [], + ): + self.caa.append(entry[1]) + _log(f"Name servers: {self.name_servers}") _log(f"ID: {self.id}") _log(f"TTL: {self.ttl}") @@ -210,6 +221,7 @@ def configure(self, config_filename: str = _get_default_config_file()) -> None: _log(f"Domain: {self.domain}") _log(f"Whitelisted IP ranges: {[str(r) for r in self.whitelisted_ranges]}") _log(f"Blacklisted IPs: {self.blacklisted_ips}") + _log(f"CAA: {self.caa}") def run(self) -> None: """Run the pipe backend. @@ -246,22 +258,27 @@ def run(self) -> None: qname = cmd[1].lower() qtype = cmd[3] - if (qtype == "A" or qtype == "ANY") and qname.endswith(self.domain): + if qtype in ("ANY", "A", "CAA") and qname.endswith(self.domain): if qname == self.domain: - self.handle_self(self.domain) + self.handle_self(qtype, self.domain) elif qname in self.name_servers: - self.handle_nameservers(qname) + self.handle_nameservers(qtype, qname) else: - self.handle_subdomains(qname) + self.handle_subdomains(qtype, qname) elif qtype == "SOA" and qname.endswith(self.domain): self.handle_soa(qname) else: self.handle_unknown(qtype, qname) + self.write_end() + def write_end(self) -> None: _write("END") - def handle_self(self, name: str) -> None: + def handle_self(self, qtype: str, name: str) -> None: + if qtype not in ("ANY", "A"): + return + _write( "DATA", self.bits, @@ -274,9 +291,8 @@ def handle_self(self, name: str) -> None: self.ip_address, ) self.write_name_servers(name) - _write("END") - def handle_subdomains(self, qname: str) -> None: + def handle_subdomains(self, qtype: str, qname: str) -> None: subdomain = qname[0 : qname.find(self.domain) - 1] subparts = self._split_subdomain(subdomain) @@ -305,7 +321,10 @@ def handle_subdomains(self, qname: str) -> None: self.handle_blacklisted(ip_address) return - self.handle_resolved(ip_address, qname) + if qtype in ("ANY", "A"): + self.handle_resolved(ip_address, qname) + if qtype in ("ANY", "CAA"): + self.handle_caa(qname) def handle_resolved(self, address: IPv4Address, qname: str) -> None: _write( @@ -320,12 +339,29 @@ def handle_resolved(self, address: IPv4Address, qname: str) -> None: str(address), ) self.write_name_servers(qname) - _write("END") - def handle_nameservers(self, qname: str) -> None: + def handle_caa(self, qname: str) -> None: + for value in self.caa: + _write( + "DATA", + self.bits, + self.auth, + qname, + "IN", + "CAA", + self.ttl, + self.id, + "0", + "issue", + '"%s"' % value, + ) + + def handle_nameservers(self, qtype: str, qname: str) -> None: + if qtype not in ("ANY", "A"): + return + ip = self.name_servers[qname] _write("DATA", self.bits, self.auth, qname, "IN", "A", self.ttl, self.id, ip) - _write("END") def write_name_servers(self, qname: str) -> None: for name_server in self.name_servers: @@ -353,23 +389,18 @@ def handle_soa(self, qname: str) -> None: self.id, self.soa, ) - _write("END") def handle_unknown(self, qtype: str, qname: str) -> None: _write("LOG", f"Unknown type: {qtype}, domain: {qname}") - _write("END") def handle_not_whitelisted(self, ip_address: IPv4Address) -> None: _write("LOG", f"Not Whitelisted: {ip_address}") - _write("END") def handle_blacklisted(self, ip_address: IPv4Address) -> None: _write("LOG", f"Blacklisted: {ip_address}") - _write("END") def handle_invalid_ip(self, ip_address: str) -> None: _write("LOG", f"Invalid IP address: {ip_address}") - _write("END") def _split_subdomain(self, subdomain: str) -> List[str]: match = re.search("(?:^|.*[.-])([0-9A-Fa-f]{8})$", subdomain) diff --git a/nipio_tests/backend_test.conf b/nipio_tests/backend_test.conf index 64fb6ae..0ecccb4 100644 --- a/nipio_tests/backend_test.conf +++ b/nipio_tests/backend_test.conf @@ -47,3 +47,8 @@ private_net = 192.168.0.0/16 # blacklist [blacklist] some_description = 10.0.0.100 + + +# CAA +[caa] +letsencrypt = letsencrypt.org;validationmethods=http-01 diff --git a/nipio_tests/backend_test.py b/nipio_tests/backend_test.py index bdbf18c..8635d30 100644 --- a/nipio_tests/backend_test.py +++ b/nipio_tests/backend_test.py @@ -128,6 +128,19 @@ def test_backend_with_empty_whitelist_responds_to_ANY_request_for_valid_ip( "22", "ns2.nip.io.test", ], + [ + "DATA", + "0", + "1", + "subdomain.10.0.10.1.nip.io.test", + "IN", + "CAA", + "200", + "22", + "0", + "issue", + '"letsencrypt.org;validationmethods=http-01"', + ], ) def test_backend_with_empty_whitelist_responds_to_A_request_for_valid_ip( @@ -216,6 +229,19 @@ def test_backend_responds_to_ANY_request_with_valid_ip(self) -> None: "22", "ns2.nip.io.test", ], + [ + "DATA", + "0", + "1", + "subdomain.127.0.0.1.nip.io.test", + "IN", + "CAA", + "200", + "22", + "0", + "issue", + '"letsencrypt.org;validationmethods=http-01"', + ], ) def test_backend_responds_to_A_request_with_valid_ip(self) -> None: @@ -261,6 +287,29 @@ def test_backend_responds_to_A_request_with_valid_ip(self) -> None: ], ) + def test_backend_responds_to_CAA_request(self) -> None: + self._send_commands( + ["Q", "subdomain.127.0.0.1.nip.io.test", "IN", "CAA", "1", "127.0.0.1"] + ) + + self._run_backend() + + self._assert_expected_responses( + [ + "DATA", + "0", + "1", + "subdomain.127.0.0.1.nip.io.test", + "IN", + "CAA", + "200", + "22", + "0", + "issue", + '"letsencrypt.org;validationmethods=http-01"', + ], + ) + def test_backend_responds_to_ANY_request_with_valid_ip_separated_by_dashes( self, ) -> None: @@ -304,6 +353,19 @@ def test_backend_responds_to_ANY_request_with_valid_ip_separated_by_dashes( "22", "ns2.nip.io.test", ], + [ + "DATA", + "0", + "1", + "subdomain-127-0-0-1.nip.io.test", + "IN", + "CAA", + "200", + "22", + "0", + "issue", + '"letsencrypt.org;validationmethods=http-01"', + ], ) def test_backend_responds_to_A_request_with_valid_ip_separated_by_dashes( @@ -812,6 +874,9 @@ def test_configure_with_env_lists_config(self) -> None: os.environ[ "NIPIO_BLACKLIST" ] = "black_listed=10.0.0.111 black_listed2=10.0.0.112" + os.environ[ + "NIPIO_CAA" + ] = "letsencrypt=letsencrypt.org;validationmethods=http-01" backend = self._configure_backend(filename="backend_test_no_lists.conf") assert_that(backend.whitelisted_ranges).is_equal_to( @@ -820,6 +885,9 @@ def test_configure_with_env_lists_config(self) -> None: ] ) assert_that(backend.blacklisted_ips).is_equal_to(["10.0.0.111", "10.0.0.112"]) + assert_that(backend.caa).is_equal_to( + ["letsencrypt.org;validationmethods=http-01"] + ) def test_configure_with_config_missing_lists(self) -> None: backend = self._configure_backend(filename="backend_test_no_lists.conf") @@ -897,6 +965,7 @@ def _create_backend() -> DynamicBackend: ipaddress.IPv4Network("222.173.190.239/32"), ] backend.blacklisted_ips = ["127.0.0.2"] + backend.caa = ["letsencrypt.org;validationmethods=http-01"] return backend @staticmethod