Skip to content

Added support for AWS S3 credentials to do AWS S3 fuzzing #957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions restler/engine/transport_layer/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,43 @@ def _append_to_header(message, content):
if sequence_id is not None:
message = _append_to_header(message, f"x-restler-sequence-id: {sequence_id}")

# Add request signing if enabled in authentication settings
if Settings().authentication and Settings().authentication.get('module', {}).get('signing'):
try:
auth_module = Settings().authentication['module']
signing_function = auth_module.get('function', 'sign_request')
signing_module = __import__(auth_module['name'], fromlist=[signing_function])
sign_request = getattr(signing_module, signing_function)

# Extract request components needed for signing
method = self._get_method_from_message(message)
headers_end = _get_end_of_header(message)
headers_str = message[:headers_end]
body = message[_get_start_of_body(message):]

# Convert headers string to dictionary
headers = {}
for line in headers_str.split('\r\n')[1:]: # Skip first line (HTTP method line)
if ': ' in line:
key, value = line.split(': ', 1)
headers[key] = value

# Call signing function with request components
signed_headers = sign_request(
method=method,
headers=headers,
body=body,
auth_data=auth_module.get('data', {})
)

# Update message with signed headers
for header_name, header_value in signed_headers.items():
message = _append_to_header(message, f"{header_name}: {header_value}")

except Exception as error:
RAW_LOGGING(f'Failed to sign request: {error!s}\n')
raise TransportLayerException(f"Request signing failed: {error!s}")

# Attempt to throttle the request if necessary
self._begin_throttle_request()

Expand Down
5 changes: 5 additions & 0 deletions restler/restler_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ def version(self):
def wait_for_async_resource_creation(self):
return self._wait_for_async_resource_creation.val

@property
def authentication(self):
"""Returns the authentication settings dictionary"""
return self._authentication_settings.val

def include_request(self, endpoint, method):
""""Returns whether the specified endpoint and method should be tested according to
the include/exclude settings
Expand Down
99 changes: 99 additions & 0 deletions restler/utils/authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import datetime
import hashlib
import hmac
import urllib.parse

def sign(key, msg):
"""Helper function to calculate HMAC-SHA256"""
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def get_signature_key(key, datestamp, region, service):
"""Derives the signing key for AWS SigV4"""
kDate = sign(f'AWS4{key}'.encode('utf-8'), datestamp)
kRegion = sign(kDate, region)
kService = sign(kRegion, service)
kSigning = sign(kService, 'aws4_request')
return kSigning

def sign_request(method, headers, body, auth_data):
"""
Sign an HTTP request using AWS SigV4

Args:
method: HTTP method (GET, POST, etc)
headers: Dictionary of request headers
body: Request body
auth_data: Dictionary containing:
- access_key: AWS access key ID
- secret_key: AWS secret access key
- region: AWS region (e.g. us-east-1)
- service: AWS service (e.g. s3)
- session_token: Optional AWS session token

Returns:
Dictionary of headers to add to the request
"""
# Get required auth parameters
access_key = auth_data['access_key']
secret_key = auth_data['secret_key']
region = auth_data['region']
service = auth_data['service']
session_token = auth_data.get('session_token')

# Create timestamp strings
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')

# Get the canonical URI and query string from headers
first_line = headers.get('first_line', '')
url_parts = first_line.split(' ')[1] if first_line else '/'
url_parsed = urllib.parse.urlparse(url_parts)
canonical_uri = urllib.parse.quote(url_parsed.path if url_parsed.path else '/')
canonical_querystring = url_parsed.query if url_parsed.query else ''

# Create canonical headers
canonical_headers = {
'host': headers.get('Host', headers.get('host', '')),
'x-amz-date': amzdate
}
if session_token:
canonical_headers['x-amz-security-token'] = session_token

# Sort and format canonical headers
signed_headers = ';'.join(sorted(canonical_headers.keys()))
canonical_headers_str = ''.join([f"{k}:{v}\n" for k, v in sorted(canonical_headers.items())])

# Create payload hash
payload_hash = hashlib.sha256(body.encode('utf-8')).hexdigest()

# Create canonical request
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n" \
f"{canonical_headers_str}\n{signed_headers}\n{payload_hash}"

# Create string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = f"{datestamp}/{region}/{service}/aws4_request"
string_to_sign = f"{algorithm}\n{amzdate}\n{credential_scope}\n" \
f"{hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()}"

# Calculate signature
signing_key = get_signature_key(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()

# Create authorization header
authorization_header = (
f"{algorithm} Credential={access_key}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}"
)

# Return headers to be added to the request
signed_headers_dict = {
'Authorization': authorization_header,
'x-amz-date': amzdate,
'x-amz-content-sha256': payload_hash
}
if session_token:
signed_headers_dict['x-amz-security-token'] = session_token

return signed_headers_dict
Loading