Skip to content

Commit

Permalink
feat: add reCAPTCHA verdict to auth blocking functions
Browse files Browse the repository at this point in the history
  • Loading branch information
exaby73 committed Dec 5, 2023
1 parent 80c0968 commit 03670c3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/firebase_functions/identity_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ class AdditionalUserInfo:
is_new_user: bool
"""A boolean indicating if the user is new or not."""

recaptcha_score: float | None
"""The user's reCAPTCHA score, if available."""


@_dataclasses.dataclass(frozen=True)
class Credential:
Expand Down Expand Up @@ -282,6 +285,12 @@ class AuthBlockingEvent:
The time the event was triggered."""


RecaptchaActionOptions = _typing.Literal["ALLOW", "BLOCK"]
"""
The reCAPTCHA action options.
"""


class BeforeCreateResponse(_typing.TypedDict, total=False):
"""
The handler response type for 'before_user_created' blocking events.
Expand All @@ -302,6 +311,8 @@ class BeforeCreateResponse(_typing.TypedDict, total=False):
custom_claims: dict[str, _typing.Any] | None
"""The user's custom claims object if available."""

recaptcha_action_override: RecaptchaActionOptions | None


class BeforeSignInResponse(BeforeCreateResponse, total=False):
"""
Expand Down
35 changes: 28 additions & 7 deletions src/firebase_functions/private/_identity_fn.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def _additional_user_info_from_token_data(token_data: dict[str, _typing.Any]):
profile=profile,
username=username,
is_new_user=is_new_user,
recaptcha_score=token_data.get("recaptcha_score"),
)


Expand Down Expand Up @@ -302,9 +303,35 @@ def _validate_auth_response(
auth_response_dict["customClaims"] = auth_response["custom_claims"]
if "session_claims" in auth_response_keys:
auth_response_dict["sessionClaims"] = auth_response["session_claims"]
if "recaptcha_action_override" in auth_response_keys:
auth_response_dict["recaptchaActionOverride"] = auth_response[
"recaptcha_action_override"]
return auth_response_dict


def _generate_response_payload(
auth_response_dict: dict[str, _typing.Any] | None
) -> dict[str, _typing.Any]:
if not auth_response_dict:
return {}

formatted_auth_response = auth_response_dict.copy()
recaptcha_action_override = formatted_auth_response.pop(
"recaptchaActionOverride", None)
result = {}
update_mask = ",".join(formatted_auth_response.keys())

if len(update_mask):
result["userRecord"] = {
**formatted_auth_response, "updateMask": update_mask
}

if recaptcha_action_override is not None:
result["recaptchaActionOverride"] = recaptcha_action_override

return result


def before_operation_handler(
func: _typing.Callable,
event_type: str,
Expand All @@ -329,13 +356,7 @@ def before_operation_handler(
if not auth_response:
return _jsonify({})
auth_response_dict = _validate_auth_response(event_type, auth_response)
update_mask = ",".join(auth_response_dict.keys())
result = {
"userRecord": {
**auth_response_dict,
"updateMask": update_mask,
}
}
result = _generate_response_payload(auth_response_dict)
return _jsonify(result)
# Disable broad exceptions lint since we want to handle all exceptions.
# pylint: disable=broad-except
Expand Down

0 comments on commit 03670c3

Please sign in to comment.