Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Check subdomains for subdomain takeovers and other DNS tomfoolery
![License](https://img.shields.io/badge/license-GPLv3-f126ea.svg)
[![tests](https://github.com/blacklanternsecurity/baddns/actions/workflows/tests.yaml/badge.svg)](https://github.com/blacklanternsecurity/baddns/actions/workflows/tests.yaml)
[![codecov](https://codecov.io/gh/blacklanternsecurity/baddns/branch/main/graph/badge.svg)](https://codecov.io/gh/blacklanternsecurity/baddns)
[![PyPI](https://img.shields.io/pypi/v/baddns)](https://pypi.org/project/baddns)
[![Pypi Downloads](https://img.shields.io/pypi/dm/baddns)](https://pypi.org/project/baddns)

<p align="left"><img width="300" height="300" src="https://github.com/blacklanternsecurity/baddns/assets/24899338/2ca1fe25-e834-4df8-8b02-8bf8f60f6e31"></p>
Expand Down
2 changes: 1 addition & 1 deletion baddns/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.3.0"
__version__ = "2.4.0"
14 changes: 13 additions & 1 deletion baddns/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
self.custom_nameservers = custom_nameservers
self.parent_class = kwargs.get("parent_class", "self")
self.cli = cli
self.disable_negative_signatures = kwargs.get("disable_negative_signatures", False)

# hook to allow external manipulation of target assignment
def set_target(self, target):
Expand Down Expand Up @@ -53,4 +54,15 @@ async def cleanup(self):


def get_all_modules(*args, **kwargs):
return [m for m in BadDNS_base.__subclasses__()]
seen = []

def _walk(cls):
for sub in cls.__subclasses__():
# Only concrete modules have a `name` class attribute; intermediate
# bases like BadDNS_email_base do not.
if getattr(sub, "name", None) and sub not in seen:
seen.append(sub)
_walk(sub)

_walk(BadDNS_base)
return seen
18 changes: 18 additions & 0 deletions baddns/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ async def execute_module(
direct_mode=False,
min_confidence=None,
min_severity=None,
disable_mx_gate=False,
disable_negative_signatures=False,
):
findings = None
try:
Expand All @@ -119,6 +121,8 @@ async def execute_module(
dns_client=dns_client,
cli=True,
direct_mode=direct_mode,
disable_mx_gate=disable_mx_gate,
disable_negative_signatures=disable_negative_signatures,
)
except BadDNSSignatureException as e:
log.error(f"Error loading signatures: {e}")
Expand Down Expand Up @@ -187,6 +191,18 @@ async def _main():
help="Minimum severity level to report. Levels: CRITICAL, HIGH, MEDIUM, LOW (exclude INFO)",
)

parser.add_argument(
"--disable-mx-gate",
action="store_true",
help="Run email-related modules (DMARC, SPF, MTA-STS misconfigurations) even when the domain has no MX records at the target or its registered domain.",
)

parser.add_argument(
"--disable-negative-signatures",
action="store_true",
help="Disable negative signatures so generic dangling CNAME/NS findings are still reported for known non-exploitable services.",
)

parser.add_argument("target", nargs="?", type=validate_target, help="subdomain to analyze")
args = parser.parse_args()

Expand Down Expand Up @@ -260,6 +276,8 @@ async def _main():
direct_mode=direct_mode,
min_confidence=args.min_confidence,
min_severity=args.min_severity,
disable_mx_gate=args.disable_mx_gate,
disable_negative_signatures=args.disable_negative_signatures,
)


Expand Down
63 changes: 63 additions & 0 deletions baddns/lib/email_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging

import tldextract

from baddns.base import BadDNS_base
from baddns.lib.dnsmanager import DNSManager

log = logging.getLogger(__name__)


class BadDNS_email_base(BadDNS_base):
"""Shared base for email-related modules (DMARC, SPF, MTA-STS).

Provides an MX-presence check so callers can skip checks that only matter
for domains that actually receive mail. The target's own MX takes precedence;
if absent, the registered (apex) domain is consulted, since email infra is
typically organizational rather than per-leaf-subdomain.
"""

def __init__(self, target, **kwargs):
super().__init__(target, **kwargs)
self.disable_mx_gate = kwargs.get("disable_mx_gate", False)
self._mx_present_cache = None

async def has_email_infra(self):
"""True if the target or its registered domain has any MX records."""
if self._mx_present_cache is not None:
return self._mx_present_cache

mx_omit = ["A", "AAAA", "CNAME", "NS", "SOA", "TXT", "NSEC"]

sub_dns = DNSManager(self.target, dns_client=self.dns_client, custom_nameservers=self.custom_nameservers)
await sub_dns.dispatchDNS(omit_types=mx_omit)
if sub_dns.answers.get("MX"):
self._mx_present_cache = True
return True

registered_domain = tldextract.extract(self.target).registered_domain
if registered_domain and registered_domain != self.target:
apex_dns = DNSManager(
registered_domain, dns_client=self.dns_client, custom_nameservers=self.custom_nameservers
)
await apex_dns.dispatchDNS(omit_types=mx_omit)
if apex_dns.answers.get("MX"):
self._mx_present_cache = True
return True

self._mx_present_cache = False
return False

async def mx_gate_skips(self):
"""True if the module should skip its email-config checks due to no MX.

Returns False when the gate is disabled via disable_mx_gate.
"""
if self.disable_mx_gate:
return False
if await self.has_email_infra():
return False
log.debug(
f"Skipping email checks for [{self.target}] in [{self.__class__.__name__}] - no MX at target or apex"
)
return True
2 changes: 1 addition & 1 deletion baddns/modules/cname.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def analyze(self):
)
break
# Check negative signatures before falling back to generic
if not signature_match:
if not signature_match and not self.disable_negative_signatures:
for sig in self.signatures:
if sig.signature["mode"] == "dns_nxdomain" and sig.signature.get("negative_signature", False):
sig_cnames = [c["value"] for c in sig.signature["identifiers"]["cnames"]]
Expand Down
6 changes: 4 additions & 2 deletions baddns/modules/dmarc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from baddns.base import BadDNS_base
from baddns.lib.email_base import BadDNS_email_base
from baddns.lib.dnsmanager import DNSManager
from baddns.lib.findings import Finding

Expand All @@ -8,7 +8,7 @@
log = logging.getLogger(__name__)


class BadDNS_dmarc(BadDNS_base):
class BadDNS_dmarc(BadDNS_email_base):
name = "DMARC"
description = "Check for missing or misconfigured DMARC records"
skip_cloud_targets = True
Expand Down Expand Up @@ -40,6 +40,8 @@ def parse_dmarc_record(record):
return tags

async def _dispatch(self):
if await self.mx_gate_skips():
return False
# Step 1: Check _dmarc.<target> (RFC 7489 Section 6.6.3)
await self.target_dnsmanager.dispatchDNS(omit_types=["A", "AAAA", "CNAME", "NS", "SOA", "MX", "NSEC"])
txt_records = self.target_dnsmanager.answers["TXT"]
Expand Down
21 changes: 15 additions & 6 deletions baddns/modules/mtasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from blasthttp import BlastHTTP

from baddns.base import BadDNS_base
from baddns.lib.email_base import BadDNS_email_base
from baddns.lib.dnsmanager import DNSManager
from baddns.lib.httpmanager import USER_AGENT
from baddns.lib.whoismanager import WhoisManager
Expand All @@ -13,7 +13,7 @@
log = logging.getLogger(__name__)


class BadDNS_mtasts(BadDNS_base):
class BadDNS_mtasts(BadDNS_email_base):
name = "MTA-STS"
description = "Check for MTA-STS misconfigurations and dangling mta-sts subdomains"

Expand All @@ -33,6 +33,7 @@ def __init__(self, target, **kwargs):
self.policy_error = None
self.mx_whois_results = {}
self.mta_sts_host = f"mta-sts.{target}"
self._has_mx = None

async def _dispatch(self):
# Step 1: Check for _mta-sts TXT record
Expand Down Expand Up @@ -135,6 +136,9 @@ async def _dispatch(self):
await whois_mgr.dispatchWHOIS()
self.mx_whois_results[mx_entry] = whois_mgr

# Cache MX presence for analyze() to gate misconfig-only findings
self._has_mx = await self.has_email_infra()

return True

@staticmethod
Expand Down Expand Up @@ -186,14 +190,19 @@ def _convert_cname_findings(self, finding_sets):
def analyze(self):
findings = []

# Finding 1: Dangling mta-sts subdomain via CNAME delegation
# Finding 1: Dangling mta-sts subdomain via CNAME delegation — runs
# regardless of MX presence; this is a takeover vector independent of mail flow.
if self.cname_findings_direct:
findings.extend(self._convert_cname_findings(self.cname_findings_direct))
if self.cname_findings:
findings.extend(self._convert_cname_findings(self.cname_findings))

# Misconfig findings (orphaned TXT, policy/MX mismatch) only matter if mail
# actually flows. Skip them when the domain has no MX (gate honors disable_mx_gate).
emit_misconfig = self.disable_mx_gate or self._has_mx

# Finding 2: Orphaned TXT record with unreachable policy
if self.policy_error and not self.cname_findings_direct and not self.cname_findings:
if emit_misconfig and self.policy_error and not self.cname_findings_direct and not self.cname_findings:
findings.append(
Finding(
{
Expand All @@ -212,8 +221,8 @@ def analyze(self):
if self.policy:
actual_mx = self.mx_dnsmanager.answers.get("MX") or []

# Finding 3: Policy MX mismatch (only in enforce mode)
if self.policy.get("mode") == "enforce" and actual_mx:
# Finding 3: Policy MX mismatch (only in enforce mode) — gated on MX presence
if emit_misconfig and self.policy.get("mode") == "enforce" and actual_mx:
unmatched = []
for mx_host in actual_mx:
if not any(self._mx_matches(pattern, mx_host) for pattern in self.policy["mx"]):
Expand Down
19 changes: 10 additions & 9 deletions baddns/modules/ns.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,16 @@ def analyze(self):
)
return findings
# Check negative signatures before falling back to generic
for sig in self.signatures:
if sig.signature["mode"] == "dns_nosoa" and sig.signature.get("negative_signature", False):
sig_nameservers = [ns for ns in sig.signature["identifiers"]["nameservers"]]
r = self.get_substring_matches(target_nameservers, sig_nameservers)
if r:
log.debug(
f"Negative signature match [{sig.signature['service_name']}] for nameservers {', '.join(target_nameservers)}, suppressing generic finding"
)
return findings
if not self.disable_negative_signatures:
for sig in self.signatures:
if sig.signature["mode"] == "dns_nosoa" and sig.signature.get("negative_signature", False):
sig_nameservers = [ns for ns in sig.signature["identifiers"]["nameservers"]]
r = self.get_substring_matches(target_nameservers, sig_nameservers)
if r:
log.debug(
f"Negative signature match [{sig.signature['service_name']}] for nameservers {', '.join(target_nameservers)}, suppressing generic finding"
)
return findings
log.debug(
f"No signature found, falling back to report generic dangling NS record for nameservers: [{', '.join(target_nameservers)}]]"
)
Expand Down
6 changes: 4 additions & 2 deletions baddns/modules/spf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from baddns.base import BadDNS_base
from baddns.lib.email_base import BadDNS_email_base
from baddns.lib.dnsmanager import DNSManager
from baddns.lib.whoismanager import WhoisManager
from baddns.lib.findings import Finding
Expand All @@ -9,7 +9,7 @@
log = logging.getLogger(__name__)


class BadDNS_spf(BadDNS_base):
class BadDNS_spf(BadDNS_email_base):
name = "SPF"
description = "Check for missing or misconfigured SPF records and hijackable include/redirect domains"
skip_cloud_targets = True
Expand Down Expand Up @@ -93,6 +93,8 @@ def parse_spf_record(record):
return result

async def _dispatch(self):
if await self.mx_gate_skips():
return False
await self.target_dnsmanager.dispatchDNS(omit_types=["A", "AAAA", "CNAME", "NS", "SOA", "MX", "NSEC"])
txt_records = self.target_dnsmanager.answers["TXT"]

Expand Down
35 changes: 35 additions & 0 deletions tests/cname_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,41 @@ async def test_cname_dnsnxdomain_generic_with_negative_loaded(fs, mock_dispatch_
assert any(expected == finding.to_dict() for finding in findings)


@pytest.mark.asyncio
async def test_cname_dnsnxdomain_negative_signature_disabled(fs, mock_dispatch_whois, configure_mock_resolver):
"""With disable_negative_signatures, generic finding fires even when negative signature matches."""
mock_data = {
"bad.dns": {"CNAME": ["7mkvokl.ng.impervadns.net."]},
"_NXDOMAIN": ["7mkvokl.ng.impervadns.net"],
}
mock_resolver = configure_mock_resolver(mock_data)

target = "bad.dns"
mock_signature_load(fs, "nucleitemplates_azure-takeover-detection.yml")
mock_signature_load(fs, "negative_imperva_cname.yml")
signatures = load_signatures("/tmp/signatures")
baddns_cname = BadDNS_cname(
target, signatures=signatures, dns_client=mock_resolver, disable_negative_signatures=True
)

findings = None
if await baddns_cname.dispatch():
findings = baddns_cname.analyze()

assert findings
expected = {
"target": "bad.dns",
"description": "Dangling CNAME, possible subdomain takeover (NXDOMAIN technique)",
"confidence": "MEDIUM",
"severity": "MEDIUM",
"signature": "GENERIC",
"indicator": "Generic Dangling CNAME",
"trigger": "7mkvokl.ng.impervadns.net",
"module": "CNAME",
}
assert any(expected == finding.to_dict() for finding in findings)


@pytest.mark.asyncio
async def test_cname_dnsnxdomain_generic_negative(fs, mock_dispatch_whois, configure_mock_resolver):
mock_data = {"bad.dns": {"CNAME": ["baddns.somerandomthing.dns."]}, "_NXDOMAIN": ["baddns.somerandomthing.dns"]}
Expand Down
20 changes: 20 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ def cached_suffix_list(fs):
yield


# When pyfakefs is active, tldextract's HTTPS fetch of the public suffix list
# tries to read certifi's cacert.pem and fails with OSError (path not in fake fs).
# That OSError is not caught by tldextract's snapshot fallback, which only catches
# requests.exceptions.RequestException. Exposing the real cacert.pem lets the
# fetch fail at the network layer instead, so the bundled snapshot is used.
@pytest.fixture(autouse=True)
def _expose_certifi_to_pyfakefs(request):
if "fs" not in request.fixturenames:
yield
return
fs = request.getfixturevalue("fs")
try:
import certifi

fs.add_real_file(certifi.where(), read_only=True)
except (ImportError, FileExistsError, OSError):
pass
yield


@pytest.fixture()
def configure_mock_resolver(monkeypatch):
def mock_ns_trace_method_generator(return_list):
Expand Down
Loading
Loading