-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbind.py
More file actions
91 lines (71 loc) · 2.8 KB
/
bind.py
File metadata and controls
91 lines (71 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""BIND specific subclasses"""
import re
from collections.abc import Iterator
from subprocess import CompletedProcess
from typing import Final
from .base import InvokeError, SshZoneCommand, SshZoneSudoers
from .types import ZoneHandlerConf
class BindSudoers(SshZoneSudoers):
"""Pre-generate needed BIND sudoers rules"""
def _server_command_rules(self) -> list[str]:
rules: list[str] = []
for cmd in ["retransfer", "zonestatus"]:
rule = (
f"{self.login_user}\tALL=({self.service_user}) NOPASSWD: "
+ f"/usr/sbin/rndc {cmd} *"
)
rules.append(rule)
return rules
class BindCommand(SshZoneCommand):
"""Runs the actual commands, for BIND"""
def __init__(self, config: ZoneHandlerConf) -> None:
super().__init__(config)
self.rndc_prefix: Final[tuple[str, str, str]] = self.sudo_prefix + (
"/usr/sbin/rndc",
)
def __lookup(self, zone: str, failure: str) -> str | None:
zone_file: str | None = None
command = self.rndc_prefix + ("zonestatus", zone)
result: CompletedProcess[str] = self._runner(command, failure)
pattern = re.compile(r"^([^:]+): (.+)$")
for line in result.stdout.split("\n"):
matched = pattern.match(line)
if matched and matched.group(1) == "files":
zone_file = matched.group(2)
break
return zone_file
def _dump(self, zone: str) -> None:
lookup_failure = f'Failed to lookup zone file for zone "{zone}"'
zone_file: str | None = self.__lookup(zone, lookup_failure)
if not zone_file:
raise InvokeError(lookup_failure)
command = (
"/usr/bin/named-compilezone",
"-f",
"raw",
"-o",
"-",
zone,
zone_file,
)
run_failure = f'Failed to dump content of zone "{zone}"'
result: CompletedProcess[str] = self._runner(command, run_failure)
zone_content = result.stdout.rstrip()
print(zone_content)
@staticmethod
def _filter_logs(log_lines: list[str], zones: list[str]) -> Iterator[str]:
for line in log_lines:
for zone in zones:
if (
f"zone {zone}/IN" in line
or f"'retransfer {zone}'" in line
or f"'{zone}/AXFR/IN'" in line
or f"'{zone}/IN'" in line
or f"'{zone}'" in line
):
yield line
def _retransfer(self, zone: str) -> None:
failure = f'Failed to trigger retransfer of zone "{zone}"'
command = self.rndc_prefix + ("retransfer", zone)
self._runner(command, failure)
print(f'Triggering retransfer of zone "{zone}"')