Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions gixy/directives/directive.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,50 @@ def __str__(self):
return f"{self.name} {' '.join(self.args)};"


class MapDirective(Directive):
"""
map $source $destination {
default value;
key value;
}
"""

nginx_name = "map" # XXX: Should also work for "charset_map"

def __init__(self, source, destination):
super().__init__(source, destination)
self.source = source if source else None
self.destination = destination[0] if destination and len(destination) == 1 else None
self.default = ""

if self.source and self.source == 'default' and self.destination:
self.default = self.destination

# TODO: `include` is supported inside a map block.


class GeoDirective(Directive):
r"""
geo [$remote_addr] $geo {
default 0;
127.0.0.1 2;
}
"""

nginx_name = "geo"

def __init__(self, source, destination):
super().__init__(source, destination)
self.source = source if source else None
self.destination = destination[0] if destination and len(destination) == 1 else None
self.default = ""

if self.source and self.source == 'default' and self.destination:
self.default = self.destination

# TODO: Invalid values default to 255.255.255.255


class AddHeaderDirective(Directive):
"""The add_header directive is used to add a header to the response"""

Expand Down
2 changes: 2 additions & 0 deletions gixy/parser/nginx_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def _get_directive_class(self, parsed_type, parsed_name):
return block.Block
elif parsed_type == "directive":
return directive.Directive
elif parsed_type == "hash_value":
return directive.MapDirective
elif parsed_type == "unparsed_block":
LOG.warning('Skip unparseable block: "%s"', parsed_name)
return None
Expand Down
4 changes: 1 addition & 3 deletions gixy/plugins/error_log_off.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
class error_log_off(Plugin):
"""
Insecure example:
location / {
try_files $uri $uri/ /index.php$is_args$args;
}
error_log off;
"""

summary = "The error_log directive does not take the off parameter."
Expand Down
4 changes: 3 additions & 1 deletion gixy/plugins/if_is_evil.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ class if_is_evil(Plugin):
"""
Insecure example:
location /files {
alias /home/;
if ($request_method = POST) {
add_header X-Second 2;
}
}
"""
summary = 'If is Evil... when used in location context.'
Expand Down
5 changes: 4 additions & 1 deletion gixy/plugins/proxy_pass_normalized.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ def __init__(self, config):

def audit(self, directive):
proxy_pass_args = directive.args

rewrite_fail = False
parent = directive.parent

if not proxy_pass_args:
return

if not parent or parent.name != 'location' or parent.modifier == '=':
return

if proxy_pass_args[0].startswith("$") and '/' not in proxy_pass_args[0]:
# If proxy pass destination is defined by only a variable, it is not possible to check for path normalization issues
return
Expand Down
134 changes: 99 additions & 35 deletions gixy/plugins/regex_redos.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import requests
import gixy
from gixy.plugins.plugin import Plugin

from gixy.directives.block import MapBlock

class regex_redos(Plugin):
r"""
Expand Down Expand Up @@ -40,7 +40,7 @@ class regex_redos(Plugin):
'resources (also known as ReDoS).'
)
help_url = 'https://joshua.hu/regex-redos-recheck-nginx-gixy'
directives = ['location'] # XXX: Missing server_name, rewrite, if, map, proxy_redirect
directives = ['location', 'server_name', 'proxy_redirect', 'if', 'rewrite', 'map']
options = {
'url': ""
}
Expand All @@ -50,20 +50,58 @@ def __init__(self, config):
super(regex_redos, self).__init__(config)
self.redos_server = self.config.get('url')

self.cached_results = {}

def audit(self, directive):
# If we have no ReDoS check URL, skip.
if not self.redos_server:
return

# Only process directives that have regex modifiers.
if directive.modifier not in ('~', '~*'):
regex_patterns = set()
child_val = {}

if directive.name == 'server_name':
for server_name in directive.args:
if server_name[0] != '~':
continue
# server_name www.example.com ~^www.\d+\.example\.com$;
regex_patterns.add(server_name[1:])
elif directive.name == 'proxy_redirect':
if directive.args[0][0] == '~':
# proxy_redirect ~^//(.*)$ $scheme://$1;
if directive.args[0][1] == '*':
regex_patterns.add(directive.args[0][2:])
else:
regex_patterns.add(directive.args[0][1:])
elif directive.name == 'location':
if directive.modifier in ['~', '~*']:
# location ~ ^/example$ { }
regex_patterns.add(directive.path)
elif directive.name == 'if':
if directive.operand in ['~', '~*', '!~', '!~*']:
# if ($http_referer ~* ^https://example\.com/$") { }
regex_patterns.add(directive.value)
elif directive.name == 'rewrite':
# rewrite ^/1(/.*) $1 break;
regex_patterns.add(directive.pattern)
elif isinstance(directive, MapBlock):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This incorrectly checks every type of map{}, regardless of whether it's `location', 'server_name', 'proxy_redirect', 'if', 'rewrite'. This is kind of deliberate, but I wonder if it can be handled better, i.e. if it could be possible to resolve the variable, determine that it resolves to a Mapblock, and then loop over the Mapblock's children..

for child in directive.children:
for map_val in [child.source, child.destination]:
if map_val and map_val[0] == '~':
map_val = map_val[1:]
if map_val[0] == '*':
map_val = map_val[1:]
regex_patterns.add(map_val)
child_val[map_val] = child

if not regex_patterns:
return

regex_pattern = directive.path
fail_reason = f'Could not check regex {regex_pattern} for ReDoS.'
json_data = {}

modifier = "" if directive.modifier == "~" else "i"
json_data = {"1": {"pattern": regex_pattern, "modifier": modifier}}
for regex_pattern in regex_patterns:
if regex_pattern not in self.cached_results:
json_data[regex_pattern] = {"pattern": regex_pattern, "modifier": ""}

# Attempt to contact the ReDoS check server.
try:
Expand All @@ -73,46 +111,72 @@ def audit(self, directive):
headers={"Content-Type": "application/json"},
timeout=60
)
except requests.RequestException:
except requests.RequestException as e:
fail_reason = f'HTTP request to "{self.redos_server}" failed: {e}'
self.add_issue(directive=directive, reason=fail_reason, severity=self.unknown_severity)
return

# If we get a non-200 response, skip.
if response.status_code != 200:
fail_reason = f'HTTP request to "{self.redos_server}" with status code "{response.status_code}": {response.text}'
self.add_issue(directive=directive, reason=fail_reason, severity=self.unknown_severity)
return

# Attempt to parse the JSON response.
try:
response_json = response.json()
except ValueError:
except ValueError as e:
fail_reason = f'Failed to parse JSON "{e}": "{response.text}"'
self.add_issue(directive=directive, reason=fail_reason, severity=self.unknown_severity)
return

# Ensure the expected data structure is present and matches the pattern.
if (
"1" not in response_json or
response_json["1"] is None or
"source" not in response_json["1"] or
response_json["1"]["source"] != regex_pattern
):
unchecked_patterns = set()

for regex_pattern in regex_patterns:
if regex_pattern in self.cached_results:
continue
if (
regex_pattern not in response_json or
response_json[regex_pattern] is None or
"source" not in response_json[regex_pattern] or
response_json[regex_pattern]["source"] != regex_pattern
):
if regex_pattern not in unchecked_patterns:
unchecked_patterns.add(regex_pattern)

if unchecked_patterns:
unchecked_patterns = '", "'.join(unchecked_patterns)
fail_reason = 'Could not check expression(s) "{unchecked_patterns}" for ReDoS.'.format(unchecked_patterns=unchecked_patterns)
self.add_issue(directive=directive, reason=fail_reason, severity=self.unknown_severity)
return

recheck = response_json["1"]
status = recheck.get("status")

# If status is neither 'vulnerable' nor 'unknown', the expression is safe.
if status not in ("vulnerable", "unknown"):
return

# If the status is unknown, add a low-severity issue (likely the server timed out)
if status == "unknown":
reason = f'Could not check complexity of regex {regex_pattern}.'
self.add_issue(directive=directive, reason=reason, severity=self.unknown_severity)
return

# Status is 'vulnerable' here. Report as a high-severity issue.
complexity_summary = recheck.get("complexity", {}).get("summary", "unknown")
reason = f'Regex is vulnerable to {complexity_summary} ReDoS: {regex_pattern}.'
self.add_issue(directive=directive, reason=reason, severity=self.severity)
vulnerable_patterns = set()
unknown_patterns = set()

for regex_pattern in regex_patterns:
if regex_pattern in self.cached_results:
if self.cached_results[regex_pattern] == "vulnerable":
vulnerable_patterns.add(regex_pattern)
else:
continue
elif regex_pattern not in unchecked_patterns:
recheck = response_json[regex_pattern]
status = recheck.get("status")

if status == "unknown":
unknown_patterns.add(regex_pattern)
continue

self.cached_results[regex_pattern] = status
if status == "vulnerable":
vulnerable_patterns.add(regex_pattern)
continue

if unknown_patterns:
unknown_patterns_string = '", "'.join(unknown_patterns)
fail_reason = 'Unknown complexity of expression(s) "{unknown_patterns}".'.format(unknown_patterns=unknown_patterns_string)
self.add_issue(directive=[directive] + [child_val[mv] for mv in unknown_patterns if mv in child_val], reason=fail_reason, severity=self.unknown_severity)

if vulnerable_patterns:
vulnerable_patterns_string = '", "'.join(vulnerable_patterns)
fail_reason = 'ReDoS possible due to expression(s) "{vulnerable_patterns}".'.format(vulnerable_patterns=vulnerable_patterns_string)
self.add_issue(directive=[directive] + [child_val[mv] for mv in vulnerable_patterns if mv in child_val], reason=fail_reason, severity=self.severity)