-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathionos_dyndns.py
More file actions
192 lines (156 loc) · 6.17 KB
/
ionos_dyndns.py
File metadata and controls
192 lines (156 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
# Author: Lázaro Blanc
# GitHub: https://github.com/lazaroblanc
# Usage example: ./ionos_dyndns.py --A --AAAA --api-prefix <api_prefix> --api-secret <api_secret>
import subprocess # Getting IPv6 address from systools instead of via web API
import re # Just regex stuff
import requests # Talk to the API
import json # Work with the API responses
from argparse import ArgumentParser, RawDescriptionHelpFormatter # Commandline argument parsing
import sys
import logging
logging.basicConfig(stream=sys.stdout, format="%(asctime)s %(message)s", datefmt="%B %d %H:%M:%S", level=logging.INFO)
def parse_cmdline_args():
argparser = ArgumentParser(
description="Create and update DNS records for this host using IONOS' API to use as a sort of DynDNS (for example via a cronjob).",
epilog="Author: Lázaro Blanc\nGitHub: https://github.com/lazaroblanc",
formatter_class=RawDescriptionHelpFormatter
)
argparser.add_argument(
"-4", "--A",
default=False,
action="store_const",
const=True,
help="Create/Update A record"
)
argparser.add_argument(
"-6", "--AAAA",
default=False,
action="store_const",
const=True,
help="Create/Update AAAA record"
)
argparser.add_argument(
"-i", "--interface",
default="eth0",
action="store",
metavar="",
help="Interface name for determining the public IPv6 address (Default: eth0)"
)
argparser.add_argument(
"-H", "--fqdn",
default=subprocess.getoutput(f"hostname -f"),
action="store",
metavar="",
help="Host's FQDN (Default: hostname -f)"
)
argparser.add_argument(
"--api-prefix",
required=True,
action="store",
metavar="",
help="API key publicprefix"
)
argparser.add_argument(
"--api-secret",
required=True,
action="store",
metavar="",
help="API key secret"
)
args = argparser.parse_args()
if not args.A and not args.AAAA:
argparser.print_help()
exit()
return args
# Map args to global variables
args = parse_cmdline_args()
fqdn = args.fqdn.lower()
api_url = "https://api.hosting.ionos.com/dns/v1/zones"
api_key_publicprefix = args.api_prefix
api_key_secret = args.api_secret
api_headers = {
"accept": "application/json",
"X-API-Key": f"{api_key_publicprefix}.{api_key_secret}"
}
interface = args.interface
def main():
domain = get_domain_from_fqdn(fqdn)
zone = get_zone(domain)
all_records = get_all_records_for_fqdn(zone["id"], fqdn)
records_to_create = []
records_to_update = []
# Code duplication. Could use some refactoring but I don't have time for this atm
if args.A:
ipv4_address = get_ipv4_address()
logging.info("Public IPv4: " + ipv4_address)
if filter_records_by_type(all_records, "A"):
if filter_records_by_type(all_records, "A")[0]["content"] == ipv4_address:
logging.info("A record is up-to-date")
else:
logging.info("A record is outdated")
records_to_update.append(
new_record(fqdn, "A", ipv4_address, 60))
else:
logging.info("No A record found")
records_to_create.append(new_record(fqdn, "A", ipv4_address, 60))
# Good god this is ugly. I hate myself for writing this. This really needs refactoring...
if args.AAAA:
ipv6_address = get_ipv6_address(interface)
if ipv6_address != "":
logging.info("Public IPv6: " + ipv6_address)
if filter_records_by_type(all_records, "AAAA"):
if filter_records_by_type(all_records, "AAAA")[0]["content"] == ipv6_address:
logging.info("AAAA record is up-to-date")
else:
logging.info("AAAA record is outdated")
records_to_update.append(new_record(
fqdn, "AAAA", ipv6_address, 60))
else:
logging.info("No AAAA record found")
records_to_create.append(new_record(
fqdn, "AAAA", ipv6_address, 60))
else:
logging.info("Could not find a public IPv6 address on this system")
if (len(records_to_create) > 0):
post_new_records(zone["id"], records_to_create)
if (len(records_to_update) > 0):
patch_records(zone["id"], records_to_update)
def get_domain_from_fqdn(fqdn):
# Place the hyphen at the start of the character class to avoid misinterpretation
regex = r"(?:(?:[\w-]+)\.)?([\w-]+\.\w+)$"
match = re.search(regex, fqdn, re.IGNORECASE)
return match.group(1) if match else None
def get_ipv4_address():
return requests.request("GET", "https://api4.ipify.org").text
def get_ipv6_address(interface_name):
ip_output = subprocess.getoutput(f"ip -6 -o address show dev {interface_name} scope global | grep --invert temporary | grep --invert mngtmpaddr")
if ip_output != "":
ip_output_regex = r"(?:inet6)(?:\s+)(.+)(?:\/\d{1,3})"
return re.search(ip_output_regex, ip_output, re.IGNORECASE).group(1)
else:
return ""
def get_zone(domain):
response = json.loads(requests.request(
"GET", api_url, headers=api_headers).text)
return list(filter(lambda zone: zone['name'] == domain, response))[0]
def get_all_records_for_fqdn(zone_id, host):
url = f"{api_url}/{zone_id}"
records = json.loads(requests.request("GET", url, headers=api_headers).text)['records']
return list(filter(lambda record: record['name'] == host, records))
def filter_records_by_type(records, type):
return list(filter(lambda record: record['type'] == type, records))
def new_record(name, record_type, ip_address, ttl):
return {
"name": name,
"type": record_type,
"content": ip_address,
"ttl": ttl
}
def post_new_records(zone_id, records):
url = f"{api_url}/{zone_id}/records"
return requests.request("POST", url, headers=api_headers, json=records)
def patch_records(zone_id, records):
url = f"{api_url}/{zone_id}"
return requests.request("PATCH", url, headers=api_headers, json=records)
main()