Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ An IP address must be in one of the whitelisted ranges for a response to be retu

`NIPIO_BLACKLIST`: A space-separated list of description=ip blacklisted pairs. Example: `some_description=10.0.0.1 other_description=10.0.0.2`.

`NIPIO_CAA`: A space-separated list of description=value pairs for CAA `issue` records returned for whitelisted IPs. Example: `letsencrypt=letsencrypt.org`.

This is useful if you're creating your own [Dockerfile](Dockerfile).

## Troubleshooting
Expand Down
8 changes: 8 additions & 0 deletions nipio/backend.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,11 @@ private_net_192_168 = 192.168.0.0/16
# blacklisted ips (optional)
[blacklist]
some_description = 10.0.0.1


# CAA `issue` records to include for whitelisted IPs.
[caa]
# To limit to Lets Encrypt only:
# letsencrypt=letsencrypt.org
# To block all CAs from issuing certificates:
# deny_all=;
65 changes: 48 additions & 17 deletions nipio/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _get_env_splitted(
values = environment_value.split(linesep)
result: List[Tuple[str, str]] = []
for value in values:
parts = value.split(pairsep, 2)
parts = value.split(pairsep, 1)
result.append((parts[0], parts[1]))
return result
else:
Expand Down Expand Up @@ -111,6 +111,7 @@ class DynamicBackend:
name_servers
whitelisted_ranges
blacklisted_ips
caa
bits
auth

Expand All @@ -129,6 +130,8 @@ class DynamicBackend:
NIPIO_WHITELIST -- A space-separated list of description=range pairs to whitelist.
The range should be in CIDR format.
NIPIO_BLACKLIST -- A space-separated list of description=ip blacklisted pairs.
NIPIO_CAA -- A space-separated list of description=value pairs for CAA `issue`
records returned for whitelisted IPs
NIPIO_AUTH -- Indicates whether this response is authoritative, this is for DNSSEC.
NIPIO_BITS -- Scopebits indicates how many bits from the subnet provided in
the question.
Expand All @@ -150,6 +153,7 @@ def __init__(self) -> None:
self.name_servers: Dict[str, str] = {}
self.whitelisted_ranges: List[IPv4Network] = []
self.blacklisted_ips: List[str] = []
self.caa: List[str] = []
self.bits = "0"
self.auth = "1"

Expand Down Expand Up @@ -202,6 +206,13 @@ def configure(self, config_filename: str = _get_default_config_file()) -> None:
):
self.blacklisted_ips.append(entry[1])

if "NIPIO_CAA" in os.environ or config.has_section("caa"):
for entry in _get_env_splitted(
"NIPIO_CAA",
config.items("caa") if config.has_section("caa") else [],
):
self.caa.append(entry[1])

_log(f"Name servers: {self.name_servers}")
_log(f"ID: {self.id}")
_log(f"TTL: {self.ttl}")
Expand All @@ -210,6 +221,7 @@ def configure(self, config_filename: str = _get_default_config_file()) -> None:
_log(f"Domain: {self.domain}")
_log(f"Whitelisted IP ranges: {[str(r) for r in self.whitelisted_ranges]}")
_log(f"Blacklisted IPs: {self.blacklisted_ips}")
_log(f"CAA: {self.caa}")

def run(self) -> None:
"""Run the pipe backend.
Expand Down Expand Up @@ -246,22 +258,27 @@ def run(self) -> None:
qname = cmd[1].lower()
qtype = cmd[3]

if (qtype == "A" or qtype == "ANY") and qname.endswith(self.domain):
if qtype in ("ANY", "A", "CAA") and qname.endswith(self.domain):
if qname == self.domain:
self.handle_self(self.domain)
self.handle_self(qtype, self.domain)
elif qname in self.name_servers:
self.handle_nameservers(qname)
self.handle_nameservers(qtype, qname)
else:
self.handle_subdomains(qname)
self.handle_subdomains(qtype, qname)
elif qtype == "SOA" and qname.endswith(self.domain):
self.handle_soa(qname)
else:
self.handle_unknown(qtype, qname)

self.write_end()

def write_end(self) -> None:
_write("END")

def handle_self(self, name: str) -> None:
def handle_self(self, qtype: str, name: str) -> None:
if qtype not in ("ANY", "A"):
return

_write(
"DATA",
self.bits,
Expand All @@ -274,9 +291,8 @@ def handle_self(self, name: str) -> None:
self.ip_address,
)
self.write_name_servers(name)
_write("END")

def handle_subdomains(self, qname: str) -> None:
def handle_subdomains(self, qtype: str, qname: str) -> None:
subdomain = qname[0 : qname.find(self.domain) - 1]

subparts = self._split_subdomain(subdomain)
Expand Down Expand Up @@ -305,7 +321,10 @@ def handle_subdomains(self, qname: str) -> None:
self.handle_blacklisted(ip_address)
return

self.handle_resolved(ip_address, qname)
if qtype in ("ANY", "A"):
self.handle_resolved(ip_address, qname)
if qtype in ("ANY", "CAA"):
self.handle_caa(qname)

def handle_resolved(self, address: IPv4Address, qname: str) -> None:
_write(
Expand All @@ -320,12 +339,29 @@ def handle_resolved(self, address: IPv4Address, qname: str) -> None:
str(address),
)
self.write_name_servers(qname)
_write("END")

def handle_nameservers(self, qname: str) -> None:
def handle_caa(self, qname: str) -> None:
for value in self.caa:
_write(
"DATA",
self.bits,
self.auth,
qname,
"IN",
"CAA",
self.ttl,
self.id,
"0",
"issue",
'"%s"' % value,
)

def handle_nameservers(self, qtype: str, qname: str) -> None:
if qtype not in ("ANY", "A"):
return

ip = self.name_servers[qname]
_write("DATA", self.bits, self.auth, qname, "IN", "A", self.ttl, self.id, ip)
_write("END")

def write_name_servers(self, qname: str) -> None:
for name_server in self.name_servers:
Expand Down Expand Up @@ -353,23 +389,18 @@ def handle_soa(self, qname: str) -> None:
self.id,
self.soa,
)
_write("END")

def handle_unknown(self, qtype: str, qname: str) -> None:
_write("LOG", f"Unknown type: {qtype}, domain: {qname}")
_write("END")

def handle_not_whitelisted(self, ip_address: IPv4Address) -> None:
_write("LOG", f"Not Whitelisted: {ip_address}")
_write("END")

def handle_blacklisted(self, ip_address: IPv4Address) -> None:
_write("LOG", f"Blacklisted: {ip_address}")
_write("END")

def handle_invalid_ip(self, ip_address: str) -> None:
_write("LOG", f"Invalid IP address: {ip_address}")
_write("END")

def _split_subdomain(self, subdomain: str) -> List[str]:
match = re.search("(?:^|.*[.-])([0-9A-Fa-f]{8})$", subdomain)
Expand Down
5 changes: 5 additions & 0 deletions nipio_tests/backend_test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ private_net = 192.168.0.0/16
# blacklist
[blacklist]
some_description = 10.0.0.100


# CAA
[caa]
letsencrypt = letsencrypt.org;validationmethods=http-01
69 changes: 69 additions & 0 deletions nipio_tests/backend_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ def test_backend_with_empty_whitelist_responds_to_ANY_request_for_valid_ip(
"22",
"ns2.nip.io.test",
],
[
"DATA",
"0",
"1",
"subdomain.10.0.10.1.nip.io.test",
"IN",
"CAA",
"200",
"22",
"0",
"issue",
'"letsencrypt.org;validationmethods=http-01"',
],
)

def test_backend_with_empty_whitelist_responds_to_A_request_for_valid_ip(
Expand Down Expand Up @@ -216,6 +229,19 @@ def test_backend_responds_to_ANY_request_with_valid_ip(self) -> None:
"22",
"ns2.nip.io.test",
],
[
"DATA",
"0",
"1",
"subdomain.127.0.0.1.nip.io.test",
"IN",
"CAA",
"200",
"22",
"0",
"issue",
'"letsencrypt.org;validationmethods=http-01"',
],
)

def test_backend_responds_to_A_request_with_valid_ip(self) -> None:
Expand Down Expand Up @@ -261,6 +287,29 @@ def test_backend_responds_to_A_request_with_valid_ip(self) -> None:
],
)

def test_backend_responds_to_CAA_request(self) -> None:
self._send_commands(
["Q", "subdomain.127.0.0.1.nip.io.test", "IN", "CAA", "1", "127.0.0.1"]
)

self._run_backend()

self._assert_expected_responses(
[
"DATA",
"0",
"1",
"subdomain.127.0.0.1.nip.io.test",
"IN",
"CAA",
"200",
"22",
"0",
"issue",
'"letsencrypt.org;validationmethods=http-01"',
],
)

def test_backend_responds_to_ANY_request_with_valid_ip_separated_by_dashes(
self,
) -> None:
Expand Down Expand Up @@ -304,6 +353,19 @@ def test_backend_responds_to_ANY_request_with_valid_ip_separated_by_dashes(
"22",
"ns2.nip.io.test",
],
[
"DATA",
"0",
"1",
"subdomain-127-0-0-1.nip.io.test",
"IN",
"CAA",
"200",
"22",
"0",
"issue",
'"letsencrypt.org;validationmethods=http-01"',
],
)

def test_backend_responds_to_A_request_with_valid_ip_separated_by_dashes(
Expand Down Expand Up @@ -812,6 +874,9 @@ def test_configure_with_env_lists_config(self) -> None:
os.environ[
"NIPIO_BLACKLIST"
] = "black_listed=10.0.0.111 black_listed2=10.0.0.112"
os.environ[
"NIPIO_CAA"
] = "letsencrypt=letsencrypt.org;validationmethods=http-01"
backend = self._configure_backend(filename="backend_test_no_lists.conf")

assert_that(backend.whitelisted_ranges).is_equal_to(
Expand All @@ -820,6 +885,9 @@ def test_configure_with_env_lists_config(self) -> None:
]
)
assert_that(backend.blacklisted_ips).is_equal_to(["10.0.0.111", "10.0.0.112"])
assert_that(backend.caa).is_equal_to(
["letsencrypt.org;validationmethods=http-01"]
)

def test_configure_with_config_missing_lists(self) -> None:
backend = self._configure_backend(filename="backend_test_no_lists.conf")
Expand Down Expand Up @@ -897,6 +965,7 @@ def _create_backend() -> DynamicBackend:
ipaddress.IPv4Network("222.173.190.239/32"),
]
backend.blacklisted_ips = ["127.0.0.2"]
backend.caa = ["letsencrypt.org;validationmethods=http-01"]
return backend

@staticmethod
Expand Down
Loading