-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathdhcpConfig.py
More file actions
155 lines (121 loc) · 5.85 KB
/
dhcpConfig.py
File metadata and controls
155 lines (121 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import ipaddress
from dataclasses import dataclass, field
from typing import Tuple
from logger import logger
import common
from clustersConfig import NodeConfig
import sys
import host
DEFAULT_NETMASK = "255.255.255.0"
DHCPD_CONFIG_PATH = "/etc/dhcp/dhcpd.conf"
DHCPD_CONFIG_BACKUP_PATH = "/etc/dhcp/dhcpd.conf.cda-backup"
CDA_TAG = "Generated by CDA"
@dataclass
class DhcpdSubnetConfig:
subnet: str
netmask: str
range_start: str
range_end: str
broadcast_address: str
routers: str
dns_servers: list[str]
domain_names: list[str] = field(default_factory=lambda: ["redhat.com", "anl.eng.bos2.dc.redhat.com"])
ntp_servers: str = "clock.redhat.com"
def to_string(self) -> str:
dns_servers_str = ", ".join(self.dns_servers)
domain_names_str = " ".join(self.domain_names)
return (
f"# {CDA_TAG}\n"
f"subnet {self.subnet} netmask {self.netmask} {{\n"
f" range {self.range_start} {self.range_end};\n"
f" option domain-name-servers {dns_servers_str};\n"
f" option routers {self.routers};\n"
f" option broadcast-address {self.broadcast_address};\n"
f" option domain-name \"{domain_names_str}\";\n"
f" option ntp-servers {self.ntp_servers};\n"
f"}}\n"
)
@dataclass
class DhcpdHostConfig:
entry_name: str
hardware_ethernet: str
fixed_address: str
dhcp_option: str
def to_string(self) -> str:
return f"# {CDA_TAG}\n" f"host {self.entry_name} {{\n" f" hardware ethernet {self.hardware_ethernet};\n" f" fixed-address {self.fixed_address};\n" f" option host-name {self.dhcp_option};\n" f"}}\n"
class DhcpConfigFile:
_subnet_configs: list[DhcpdSubnetConfig] = []
_host_configs: list[DhcpdHostConfig] = []
def _get_subnets_str(self) -> list[str]:
subnets = []
for subnet in self._subnet_configs:
subnets.append(_convert_to_cidr(subnet.subnet, subnet.netmask))
return subnets
def _add_subnet_from_dhcpdsubnetconfig(self, subnet: DhcpdSubnetConfig) -> None:
self._subnet_configs.append(subnet)
def _add_host_from_dhcpdhostconfig(self, host_config: DhcpdHostConfig) -> None:
self._host_configs.append(host_config)
def add_host(self, hostname: str, hardware_ethernet: str, fixed_address: str) -> None:
# Generate host / subnet configs for the current Node
idx = len(self._host_configs)
new_hostconfig = DhcpdHostConfig(entry_name=f"host{idx}", hardware_ethernet=hardware_ethernet, fixed_address=fixed_address, dhcp_option=hostname)
subnetconfig = subnet_config_from_host_config(new_hostconfig)
# Check if an existing subnet contains the host or subnet configuration, add a new entry if not
if any(common.ip_in_subnet(new_hostconfig.fixed_address, subnet) for subnet in self._get_subnets_str()):
logger.debug(f"Subnet config for {new_hostconfig.fixed_address} already exists at {DHCPD_CONFIG_PATH}")
else:
logger.debug(f"Subnet config for {new_hostconfig.fixed_address} does not exist, adding this")
self._add_subnet_from_dhcpdsubnetconfig(subnetconfig)
self._add_host_from_dhcpdhostconfig(new_hostconfig)
def to_string(self) -> str:
config_str = ""
for subnet in self._subnet_configs:
config_str += subnet.to_string()
for h in self._host_configs:
config_str += h.to_string()
return config_str
def write_to_file(self, file_path: str = DHCPD_CONFIG_PATH) -> None:
with open(file_path, 'w') as file:
file.write(self.to_string())
def get_subnet_ip(ipv4_address: str, subnet_mask: str) -> str:
network = ipaddress.ip_network(f"{ipv4_address}/{subnet_mask}", strict=False)
return str(network.network_address)
def get_subnet_range(ipv4_address: str, subnet_mask: str) -> Tuple[str, str]:
network = ipaddress.ip_network(f"{ipv4_address}/{subnet_mask}", strict=False)
range_start = network.network_address + 1
range_end = network.broadcast_address - 1
return str(range_start), str(range_end)
def get_router_ip(ipv4_address: str, subnet_mask: str) -> str:
network = ipaddress.ip_network(f"{ipv4_address}/{subnet_mask}", strict=False)
router_ip = network.network_address + 1
return str(router_ip)
def dns_servers(lh: host.Host) -> list[str]:
"""Get the DNS servers for the given interface."""
ret = []
for e in lh.run("nmcli -g IP4.DNS device show").out.split():
for d in e.strip().split("|"):
if d:
ret.append(d)
return ret
def subnet_config_from_host_config(hc: DhcpdHostConfig) -> DhcpdSubnetConfig:
netmask = DEFAULT_NETMASK
subnet_ip = get_subnet_ip(hc.fixed_address, netmask)
range_start, range_end = get_subnet_range(hc.fixed_address, netmask)
broadcast_address = str(ipaddress.ip_network(f"{hc.fixed_address}/{netmask}", strict=False).broadcast_address)
routers = get_router_ip(hc.fixed_address, netmask)
dns = dns_servers(host.LocalHost())
return DhcpdSubnetConfig(subnet=subnet_ip, netmask=netmask, range_start=range_start, range_end=range_end, broadcast_address=broadcast_address, routers=routers, dns_servers=dns)
def _convert_to_cidr(ipv4_address: str, subnet_mask: str) -> str:
network = ipaddress.ip_network(f"{ipv4_address}/{subnet_mask}", strict=False)
return str(network)
def configure_dhcpd(node: NodeConfig) -> None:
logger.info("Configuring dhcpd entry")
dhcp_config = DhcpConfigFile()
for mac in node.mac.split(";"):
dhcp_config.add_host(hostname=node.name, hardware_ethernet=mac, fixed_address=str(node.ip))
dhcp_config.write_to_file()
lh = host.LocalHost()
ret = lh.run("systemctl restart dhcpd")
if ret.returncode != 0:
logger.error(f"Failed to restart dhcpd with err: {ret.err}")
sys.exit(-1)