|
| 1 | +"""DNS audit — scan domains for common misconfigurations.""" |
| 2 | + |
| 3 | +import sys |
| 4 | + |
| 5 | +from infomaniak_cli.api import api_request |
| 6 | +from infomaniak_cli.config import get_account_id, get_token |
| 7 | +from infomaniak_cli.output import bold, cyan, dim, green, output_json, red, yellow |
| 8 | + |
| 9 | + |
| 10 | +def _has_record(records, rtype, source=None, substring=None): |
| 11 | + """Check if records contain a specific type/source/substring match.""" |
| 12 | + for r in records: |
| 13 | + if r.get("type") != rtype: |
| 14 | + continue |
| 15 | + if source is not None: |
| 16 | + rec_source = r.get("source", "@") |
| 17 | + if rec_source == ".": |
| 18 | + rec_source = "@" |
| 19 | + if rec_source != source: |
| 20 | + continue |
| 21 | + if substring is not None and substring.lower() not in r.get("target", "").lower(): |
| 22 | + continue |
| 23 | + return True |
| 24 | + return False |
| 25 | + |
| 26 | + |
| 27 | +def _get_targets(records, rtype, source=None): |
| 28 | + """Get all targets for a given type/source.""" |
| 29 | + targets = [] |
| 30 | + for r in records: |
| 31 | + if r.get("type") != rtype: |
| 32 | + continue |
| 33 | + if source is not None: |
| 34 | + rec_source = r.get("source", "@") |
| 35 | + if rec_source == ".": |
| 36 | + rec_source = "@" |
| 37 | + if rec_source != source: |
| 38 | + continue |
| 39 | + targets.append(r.get("target", "")) |
| 40 | + return targets |
| 41 | + |
| 42 | + |
| 43 | +def cmd_dns_audit(args): |
| 44 | + """Audit DNS records across all domains for common issues.""" |
| 45 | + token = get_token() |
| 46 | + account_id = get_account_id(token) |
| 47 | + |
| 48 | + # Get all domains |
| 49 | + data = api_request("GET", f"/1/domain/account/{account_id}", token) |
| 50 | + domains = data.get("data", []) |
| 51 | + |
| 52 | + if not domains: |
| 53 | + print(f" {dim('No domains found.')}") |
| 54 | + return |
| 55 | + |
| 56 | + domain_filter = args.domain if hasattr(args, "domain") and args.domain else None |
| 57 | + if domain_filter: |
| 58 | + domains = [d for d in domains if d.get("customer_name") == domain_filter] |
| 59 | + if not domains: |
| 60 | + print(f" {red('Domain not found:')} {domain_filter}") |
| 61 | + sys.exit(1) |
| 62 | + |
| 63 | + all_issues = [] |
| 64 | + all_results = [] |
| 65 | + total_checked = 0 |
| 66 | + |
| 67 | + print(f"\n {bold('DNS Audit')}") |
| 68 | + print(f" {dim('────────')}\n") |
| 69 | + print(f" Scanning {len(domains)} domain(s)...\n") |
| 70 | + |
| 71 | + for d in domains: |
| 72 | + domain_name = d.get("customer_name", "") |
| 73 | + try: |
| 74 | + rdata = api_request("GET", f"/2/zones/{domain_name}/records", token) |
| 75 | + except SystemExit: |
| 76 | + continue |
| 77 | + |
| 78 | + records = rdata.get("data", []) |
| 79 | + total_checked += 1 |
| 80 | + domain_issues = [] |
| 81 | + |
| 82 | + # Check 1: SPF record |
| 83 | + spf_targets = _get_targets(records, "TXT", "@") |
| 84 | + has_spf = any("v=spf1" in t.lower() for t in spf_targets) |
| 85 | + if not has_spf: |
| 86 | + domain_issues.append(("missing_spf", "No SPF record found", "Email spoofing protection")) |
| 87 | + |
| 88 | + # Check 2: DMARC record |
| 89 | + dmarc_sources = [r.get("source", "@") for r in records if r.get("type") == "TXT"] |
| 90 | + has_dmarc = _has_record(records, "TXT", "_dmarc") |
| 91 | + if not has_dmarc: |
| 92 | + domain_issues.append(("missing_dmarc", "No DMARC record found", "Email authentication policy")) |
| 93 | + |
| 94 | + # Check 3: DKIM (check common selectors) |
| 95 | + has_dkim = False |
| 96 | + for r in records: |
| 97 | + source = r.get("source", "") |
| 98 | + if r.get("type") == "TXT" and "._domainkey" in source: |
| 99 | + has_dkim = True |
| 100 | + break |
| 101 | + if r.get("type") == "CNAME" and "._domainkey" in source: |
| 102 | + has_dkim = True |
| 103 | + break |
| 104 | + if not has_dkim: |
| 105 | + domain_issues.append(("missing_dkim", "No DKIM record found", "Email signing verification")) |
| 106 | + |
| 107 | + # Check 4: MX record (if domain has mail hosting) |
| 108 | + has_mx = _has_record(records, "MX") |
| 109 | + a_targets = _get_targets(records, "A", "@") |
| 110 | + if not has_mx and a_targets: |
| 111 | + domain_issues.append(("missing_mx", "No MX record — mail won't be delivered", "Mail delivery")) |
| 112 | + |
| 113 | + # Check 5: Multiple SPF records (invalid per RFC) |
| 114 | + spf_count = sum(1 for t in spf_targets if "v=spf1" in t.lower()) |
| 115 | + if spf_count > 1: |
| 116 | + domain_issues.append(("multiple_spf", f"Multiple SPF records found ({spf_count})", "RFC violation — only one SPF allowed")) |
| 117 | + |
| 118 | + # Check 6: Low TTL on root records |
| 119 | + for r in records: |
| 120 | + source = r.get("source", "@") |
| 121 | + if source in ("@", ".") and r.get("type") in ("A", "AAAA", "MX"): |
| 122 | + ttl = r.get("ttl", 3600) |
| 123 | + if ttl < 60: |
| 124 | + domain_issues.append(("low_ttl", f"Very low TTL ({ttl}s) on {r.get('type')} record", "May cause excessive DNS queries")) |
| 125 | + break |
| 126 | + |
| 127 | + # Check 7: CNAME at root (invalid per RFC) |
| 128 | + has_root_cname = _has_record(records, "CNAME", "@") |
| 129 | + if has_root_cname: |
| 130 | + domain_issues.append(("root_cname", "CNAME record at root (@)", "RFC violation — may break MX/NS")) |
| 131 | + |
| 132 | + result = { |
| 133 | + "domain": domain_name, |
| 134 | + "records": len(records), |
| 135 | + "issues": domain_issues, |
| 136 | + } |
| 137 | + all_results.append(result) |
| 138 | + |
| 139 | + if domain_issues: |
| 140 | + all_issues.extend([(domain_name, *issue) for issue in domain_issues]) |
| 141 | + |
| 142 | + if getattr(args, "json", False): |
| 143 | + output_json(all_results) |
| 144 | + |
| 145 | + # Summary |
| 146 | + clean = sum(1 for r in all_results if not r["issues"]) |
| 147 | + with_issues = sum(1 for r in all_results if r["issues"]) |
| 148 | + |
| 149 | + if all_issues: |
| 150 | + # Group by issue type |
| 151 | + issue_types = {} |
| 152 | + for domain_name, code, desc, hint in all_issues: |
| 153 | + if code not in issue_types: |
| 154 | + issue_types[code] = [] |
| 155 | + issue_types[code].append((domain_name, desc, hint)) |
| 156 | + |
| 157 | + for code, issues in sorted(issue_types.items()): |
| 158 | + hint = issues[0][2] |
| 159 | + desc = issues[0][1] |
| 160 | + print(f" {yellow('!')} {bold(desc)} {dim(f'— {hint}')}") |
| 161 | + for domain_name, _, _ in issues: |
| 162 | + print(f" {dim('→')} {domain_name}") |
| 163 | + print() |
| 164 | + |
| 165 | + print(f" {bold('Summary')}: {total_checked} domains scanned") |
| 166 | + if clean: |
| 167 | + print(f" {green(str(clean))} clean") |
| 168 | + if with_issues: |
| 169 | + print(f" {yellow(str(with_issues))} with issues ({len(all_issues)} total findings)") |
| 170 | + if not all_issues: |
| 171 | + print(f" {green('All domains passed all checks.')}") |
| 172 | + print() |
0 commit comments