diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 87fe41e66..179c44008 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,11 +12,13 @@ Fixed - Validate key against allowed types for Algorithm family in `#964 `__ - Add iterator for JWKSet in `#1041 `__ - Validate `iss` claim is a string during encoding and decoding by @pachewise in `#1040 `__ +- Improve typing/logic for `options` in decode, decode_complete by @pachewise in `#1045 `__ Added ~~~~~ - Docs: Add example of using leeway with nbf by @djw8605 in `#1034 `__ +- Docs: Refactored docs with ``autodoc``; added ``PyJWS`` and ``jwt.algorithms`` docs by @pachewise in `#1045 `__ `v2.10.1 `__ ----------------------------------------------------------------------- diff --git a/docs/api.rst b/docs/api.rst index cafc65606..59a9fa766 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -3,230 +3,57 @@ API Reference .. module:: jwt -.. function:: encode(payload, key, algorithm="HS256", headers=None, json_encoder=None) +.. autofunction:: encode(payload, key, algorithm="HS256", headers=None, json_encoder=None) -> str - Encode the ``payload`` as JSON Web Token. +.. autofunction:: decode(jwt, key="", algorithms=None, options=None, audience=None, issuer=None, leeway=0) -> dict[str, typing.Any] - :param dict payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` - :param key: a key suitable for the chosen algorithm: +.. autoclass:: PyJWK + :class-doc-from: init + :members: - * for **asymmetric algorithms**: PEM-formatted private key, a multiline string - * for **symmetric algorithms**: plain string, sufficiently long for security + .. property:: algorithm_name - :type key: str or bytes or jwt.PyJWK - :param str algorithm: algorithm to sign the token with, e.g. ``"ES256"``. - If ``headers`` includes ``alg``, it will be preferred to this parameter. - If ``key`` is a :class:`jwt.PyJWK` object, by default the key algorithm will be used. - :param dict headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. - :param json.JSONEncoder json_encoder: custom JSON encoder for ``payload`` and ``headers`` - :rtype: str - :returns: a JSON Web Token + :type: str -.. function:: decode(jwt, key="", algorithms=None, options=None, audience=None, issuer=None, leeway=0) + The name of the algorithm used by the key. - Verify the ``jwt`` token signature and return the token claims. + .. property:: Algorithm - :param str jwt: the token to be decoded - :param key: the key suitable for the allowed algorithm - :type key: str or bytes or jwt.PyJWK - - :param list algorithms: allowed algorithms, e.g. ``["ES256"]`` - If ``key`` is a :class:`jwt.PyJWK` object, allowed algorithms will default to the key algorithm. - - .. warning:: - - Do **not** compute the ``algorithms`` parameter based on - the ``alg`` from the token itself, or on any other data - that an attacker may be able to influence, as that might - expose you to various vulnerabilities (see `RFC 8725 §2.1 - `_). Instead, - either hard-code a fixed value for ``algorithms``, or - configure it in the same place you configure the - ``key``. Make sure not to mix symmetric and asymmetric - algorithms that interpret the ``key`` in different ways - (e.g. HS\* and RS\*). - - :param dict options: extended decoding and validation options - - * ``verify_signature=True`` verify the JWT cryptographic signature - * ``require=[]`` list of claims that must be present. - Example: ``require=["exp", "iat", "nbf"]``. - **Only verifies that the claims exists**. Does not verify that the claims are valid. - * ``verify_aud=verify_signature`` check that ``aud`` (audience) claim matches ``audience`` - * ``verify_iss=verify_signature`` check that ``iss`` (issuer) claim matches ``issuer`` - * ``verify_exp=verify_signature`` check that ``exp`` (expiration) claim value is in the future - * ``verify_iat=verify_signature`` check that ``iat`` (issued at) claim value is an integer - * ``verify_nbf=verify_signature`` check that ``nbf`` (not before) claim value is in the past - * ``strict_aud=False`` check that the ``aud`` claim is a single value (not a list), and matches ``audience`` exactly - - .. warning:: - - ``exp``, ``iat`` and ``nbf`` will only be verified if present. - Please pass respective value to ``require`` if you want to make - sure that they are always present (and therefore always verified - if ``verify_exp``, ``verify_iat``, and ``verify_nbf`` respectively - is set to ``True``). - - :param audience: optional, the value for ``verify_aud`` check - :type audience: Union[str, Iterable] - :param str issuer: optional, the value for ``verify_iss`` check - :param float leeway: a time margin in seconds for the expiration check - :rtype: dict - :returns: the JWT claims - -.. class:: PyJWK - - A class that represents a `JSON Web Key `_. - - .. method:: __init__(self, jwk_data, algorithm=None) - - :param dict data: The decoded JWK data. - :param algorithm: The key algorithm. If not specific, the key's ``alg`` will be used. - :type algorithm: str or None - - .. staticmethod:: from_json(data, algorithm=None) - - :param str data: The JWK data, as a JSON string. - :param algorithm: The key algorithm. If not specific, the key's ``alg`` will be used. - :type algorithm: str or None - - :returntype: jwt.PyJWK - - Create a :class:`jwt.PyJWK` object from a JSON string. - - .. property:: algorithm_name - - :type: str - - The name of the algorithm used by the key. - - .. property:: Algorithm - - The ``Algorithm`` class associated with the key. - - .. property:: key_type - - :type: str or None - - The ``kty`` property from the JWK. - - .. property:: key_id - - :type: str or None - - The ``kid`` property from the JWK. - - .. property:: public_key_use - - :type: str or None - - The ``use`` property from the JWK. + The :py:class:`Algorithm` class associated with the key. .. module:: jwt.api_jwt -.. function:: decode_complete(jwt, key="", algorithms=None, options=None, audience=None, issuer=None, leeway=0) +.. autofunction:: decode_complete(jwt, key="", algorithms=None, options=None, audience=None, issuer=None, leeway=0) -> dict[str, typing.Any] - Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), - the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", - and "signature" respectively. +.. note:: TODO: Finish documenting PyJWS class +.. module:: jwt.api_jws - :param str jwt: the token to be decoded - :param str key: the key suitable for the allowed algorithm +.. autoclass:: jwt.api_jws.PyJWS + :members: - :param list algorithms: allowed algorithms, e.g. ``["ES256"]`` - - .. warning:: - - Do **not** compute the ``algorithms`` parameter based on - the ``alg`` from the token itself, or on any other data - that an attacker may be able to influence, as that might - expose you to various vulnerabilities (see `RFC 8725 §2.1 - `_). Instead, - either hard-code a fixed value for ``algorithms``, or - configure it in the same place you configure the - ``key``. Make sure not to mix symmetric and asymmetric - algorithms that interpret the ``key`` in different ways - (e.g. HS\* and RS\*). - - :param dict options: extended decoding and validation options - - * ``verify_signature=True`` verify the JWT cryptographic signature - * ``require=[]`` list of claims that must be present. - Example: ``require=["exp", "iat", "nbf"]``. - **Only verifies that the claims exists**. Does not verify that the claims are valid. - * ``verify_aud=verify_signature`` check that ``aud`` (audience) claim matches ``audience`` - * ``verify_iss=verify_signature`` check that ``iss`` (issuer) claim matches ``issuer`` - * ``verify_exp=verify_signature`` check that ``exp`` (expiration) claim value is in the future - * ``verify_iat=verify_signature`` check that ``iat`` (issued at) claim value is an integer - * ``verify_nbf=verify_signature`` check that ``nbf`` (not before) claim value is in the past - * ``strict_aud=False`` check that the ``aud`` claim is a single value (not a list), and matches ``audience`` exactly +Algorithms +---------- - .. warning:: +.. automodule:: jwt.algorithms + :members: Algorithm, AllowedPrivateKeys, AllowedPublicKeys - ``exp``, ``iat`` and ``nbf`` will only be verified if present. - Please pass respective value to ``require`` if you want to make - sure that they are always present (and therefore always verified - if ``verify_exp``, ``verify_iat``, and ``verify_nbf`` respectively - is set to ``True``). - :param Iterable audience: optional, the value for ``verify_aud`` check - :param str issuer: optional, the value for ``verify_iss`` check - :param float leeway: a time margin in seconds for the expiration check - :rtype: dict - :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS - Payload on the key ``payload``, and the JWS Signature on the key ``signature``. +Types +---------- -.. note:: TODO: Document PyJWS class +.. module:: jwt.types + :synopsis: Type validation used in the JWT API +.. autoclass:: jwt.types.SigOptions + :members: + :undoc-members: +.. autoclass:: jwt.types.Options + :members: + :undoc-members: Exceptions ---------- -.. currentmodule:: jwt.exceptions - - -.. class:: InvalidTokenError - - Base exception when ``decode()`` fails on a token - -.. class:: DecodeError - - Raised when a token cannot be decoded because it failed validation - -.. class:: InvalidSignatureError - - Raised when a token's signature doesn't match the one provided as part of - the token. - -.. class:: ExpiredSignatureError - - Raised when a token's ``exp`` claim indicates that it has expired - -.. class:: InvalidAudienceError - - Raised when a token's ``aud`` claim does not match one of the expected - audience values - -.. class:: InvalidIssuerError - - Raised when a token's ``iss`` claim does not match the expected issuer - -.. class:: InvalidIssuedAtError - - Raised when a token's ``iat`` claim is non-numeric - -.. class:: ImmatureSignatureError - - Raised when a token's ``nbf`` or ``iat`` claims represent a time in the future - -.. class:: InvalidKeyError - - Raised when the specified key is not in the proper format - -.. class:: InvalidAlgorithmError - - Raised when the specified algorithm is not recognized by PyJWT - -.. class:: MissingRequiredClaimError - - Raised when a claim that is required to be present is not contained - in the claimset +.. automodule:: jwt.exceptions + :members: + :inherited-members: + :show-inheritance: diff --git a/docs/conf.py b/docs/conf.py index 54663e1dc..6aa60871f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -83,8 +83,12 @@ def find_version(*file_paths) -> str: # Intersphinx extension. intersphinx_mapping = { "python": ("https://docs.python.org/3/", None), + "cryptography": ("https://cryptography.io/en/latest/", None), } +# Hack for allowing aliases within TYPE_CHECKING to be documented +os.environ["SPHINX_BUILD"] = "1" + # -- Options for HTML output ---------------------------------------------- diff --git a/jwt/algorithms.py b/jwt/algorithms.py index b5f16151e..47d77df05 100644 --- a/jwt/algorithms.py +++ b/jwt/algorithms.py @@ -3,6 +3,7 @@ import hashlib import hmac import json +import os from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, ClassVar, Literal, NoReturn, cast, overload @@ -91,32 +92,36 @@ Ed448PublicKey, ) - has_crypto = True -except ModuleNotFoundError: - has_crypto = False - + if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): + from typing import TypeAlias -if TYPE_CHECKING: - from typing import TypeAlias + from cryptography.hazmat.primitives.asymmetric.types import ( + PrivateKeyTypes, + PublicKeyTypes, + ) - from cryptography.hazmat.primitives.asymmetric.types import ( - PrivateKeyTypes, - PublicKeyTypes, - ) + # Type aliases for convenience in algorithms method signatures + AllowedRSAKeys: TypeAlias = RSAPrivateKey | RSAPublicKey + AllowedECKeys: TypeAlias = EllipticCurvePrivateKey | EllipticCurvePublicKey + AllowedOKPKeys: TypeAlias = ( + Ed25519PrivateKey | Ed25519PublicKey | Ed448PrivateKey | Ed448PublicKey + ) + AllowedKeys: TypeAlias = AllowedRSAKeys | AllowedECKeys | AllowedOKPKeys + #: Type alias for allowed ``cryptography`` private keys (requires ``cryptography`` to be installed) + AllowedPrivateKeys: TypeAlias = ( + RSAPrivateKey + | EllipticCurvePrivateKey + | Ed25519PrivateKey + | Ed448PrivateKey + ) + #: Type alias for allowed ``cryptography`` public keys (requires ``cryptography`` to be installed) + AllowedPublicKeys: TypeAlias = ( + RSAPublicKey | EllipticCurvePublicKey | Ed25519PublicKey | Ed448PublicKey + ) - # Type aliases for convenience in algorithms method signatures - AllowedRSAKeys: TypeAlias = RSAPrivateKey | RSAPublicKey - AllowedECKeys: TypeAlias = EllipticCurvePrivateKey | EllipticCurvePublicKey - AllowedOKPKeys: TypeAlias = ( - Ed25519PrivateKey | Ed25519PublicKey | Ed448PrivateKey | Ed448PublicKey - ) - AllowedKeys: TypeAlias = AllowedRSAKeys | AllowedECKeys | AllowedOKPKeys - AllowedPrivateKeys: TypeAlias = ( - RSAPrivateKey | EllipticCurvePrivateKey | Ed25519PrivateKey | Ed448PrivateKey - ) - AllowedPublicKeys: TypeAlias = ( - RSAPublicKey | EllipticCurvePublicKey | Ed25519PublicKey | Ed448PublicKey - ) + has_crypto = True +except ModuleNotFoundError: + has_crypto = False requires_cryptography = { @@ -139,7 +144,7 @@ def get_default_algorithms() -> dict[str, Algorithm]: """ Returns the algorithms that are implemented by the library. """ - default_algorithms = { + default_algorithms: dict[str, Algorithm] = { "none": NoneAlgorithm(), "HS256": HMACAlgorithm(HMACAlgorithm.SHA256), "HS384": HMACAlgorithm(HMACAlgorithm.SHA384), @@ -202,13 +207,12 @@ def compute_hash_digest(self, bytestr: bytes) -> bytes: def check_crypto_key_type(self, key: PublicKeyTypes | PrivateKeyTypes): """Check that the key belongs to the right cryptographic family. - Note that this method only works when `cryptography` is installed. + Note that this method only works when ``cryptography`` is installed. - Args: - key (Any): Potentially a cryptography key - Raises: - ValueError: if `cryptography` is not installed, or this method is called by a non-cryptography algorithm - InvalidKeyError: if the key doesn't match the expected key classes + :param key: Potentially a cryptography key + :type key: :py:data:`PublicKeyTypes ` | :py:data:`PrivateKeyTypes ` + :raises ValueError: if ``cryptography`` is not installed, or this method is called by a non-cryptography algorithm + :raises InvalidKeyError: if the key doesn't match the expected key classes """ if not has_crypto or self._crypto_key_types is None: raise ValueError( diff --git a/jwt/api_jwk.py b/jwt/api_jwk.py index 2e0ed5ac6..b6b33790f 100644 --- a/jwt/api_jwk.py +++ b/jwt/api_jwk.py @@ -17,6 +17,16 @@ class PyJWK: def __init__(self, jwk_data: JWKDict, algorithm: str | None = None) -> None: + """A class that represents a `JSON Web Key `_. + + :param jwk_data: The decoded JWK data. + :type jwk_data: dict[str, typing.Any] + :param algorithm: The key algorithm. If not specified, the key's ``alg`` will be used. + :type algorithm: str or None + :raises InvalidKeyError: If the key type (``kty``) is not found or unsupported, or if the curve (``crv``) is not found or unsupported. + :raises MissingCryptographyError: If the algorithm requires ``cryptography`` to be installed and it is not available. + :raises PyJWKError: If unable to find an algorithm for the key. + """ self._algorithms = get_default_algorithms() self._jwk_data = jwk_data @@ -71,23 +81,52 @@ def __init__(self, jwk_data: JWKDict, algorithm: str | None = None) -> None: @staticmethod def from_dict(obj: JWKDict, algorithm: str | None = None) -> PyJWK: + """Creates a :class:`PyJWK` object from a JSON-like dictionary. + + :param obj: The JWK data, as a dictionary + :type obj: dict[str, typing.Any] + :param algorithm: The key algorithm. If not specified, the key's ``alg`` will be used. + :type algorithm: str or None + :rtype: PyJWK + """ return PyJWK(obj, algorithm) @staticmethod def from_json(data: str, algorithm: None = None) -> PyJWK: + """Create a :class:`PyJWK` object from a JSON string. + Implicitly calls :meth:`PyJWK.from_dict()`. + + :param str data: The JWK data, as a JSON string. + :param algorithm: The key algorithm. If not specific, the key's ``alg`` will be used. + :type algorithm: str or None + + :rtype: PyJWK + """ obj = json.loads(data) return PyJWK.from_dict(obj, algorithm) @property def key_type(self) -> str | None: + """The `kty` property from the JWK. + + :rtype: str or None + """ return self._jwk_data.get("kty", None) @property def key_id(self) -> str | None: + """The `kid` property from the JWK. + + :rtype: str or None + """ return self._jwk_data.get("kid", None) @property def public_key_use(self) -> str | None: + """The `use` property from the JWK. + + :rtype: str or None + """ return self._jwk_data.get("use", None) diff --git a/jwt/api_jws.py b/jwt/api_jws.py index 654ee0b74..9b2d957fb 100644 --- a/jwt/api_jws.py +++ b/jwt/api_jws.py @@ -24,6 +24,7 @@ if TYPE_CHECKING: from .algorithms import AllowedPrivateKeys, AllowedPublicKeys + from .types import SigOptions class PyJWS: @@ -32,7 +33,7 @@ class PyJWS: def __init__( self, algorithms: Sequence[str] | None = None, - options: dict[str, Any] | None = None, + options: SigOptions | None = None, ) -> None: self._algorithms = get_default_algorithms() self._valid_algs = ( @@ -44,17 +45,21 @@ def __init__( if key not in self._valid_algs: del self._algorithms[key] - if options is None: - options = {} - self.options = {**self._get_default_options(), **options} + self.options: SigOptions = self._get_default_options() + if options is not None: + self.options = {**self.options, **options} @staticmethod - def _get_default_options() -> dict[str, bool]: + def _get_default_options() -> SigOptions: return {"verify_signature": True} def register_algorithm(self, alg_id: str, alg_obj: Algorithm) -> None: """ Registers a new Algorithm for use when creating and verifying tokens. + + :param str alg_id: the ID of the Algorithm + :param alg_obj: the Algorithm object + :type alg_obj: Algorithm """ if alg_id in self._algorithms: raise ValueError("Algorithm already has a handler.") @@ -68,7 +73,8 @@ def register_algorithm(self, alg_id: str, alg_obj: Algorithm) -> None: def unregister_algorithm(self, alg_id: str) -> None: """ Unregisters an Algorithm for use when creating and verifying tokens - Throws KeyError if algorithm is not registered. + :param str alg_id: the ID of the Algorithm + :raises KeyError: if algorithm is not registered. """ if alg_id not in self._algorithms: raise KeyError( @@ -81,7 +87,9 @@ def unregister_algorithm(self, alg_id: str) -> None: def get_algorithms(self) -> list[str]: """ - Returns a list of supported values for the 'alg' parameter. + Returns a list of supported values for the `alg` parameter. + + :rtype: list[str] """ return list(self._valid_algs) @@ -90,8 +98,12 @@ def get_algorithm_by_name(self, alg_name: str) -> Algorithm: For a given string name, return the matching Algorithm object. Example usage: - + >>> jws_obj = PyJWS() >>> jws_obj.get_algorithm_by_name("RS256") + + :param alg_name: The name of the algorithm to retrieve + :type alg_name: str + :rtype: Algorithm """ try: return self._algorithms[alg_name] @@ -112,7 +124,7 @@ def encode( is_payload_detached: bool = False, sort_headers: bool = True, ) -> str: - segments = [] + segments: list[bytes] = [] # declare a new var to narrow the type for type checkers if algorithm is None: @@ -184,9 +196,9 @@ def decode_complete( jwt: str | bytes, key: AllowedPublicKeys | PyJWK | str | bytes = "", algorithms: Sequence[str] | None = None, - options: dict[str, Any] | None = None, + options: SigOptions | None = None, detached_payload: bytes | None = None, - **kwargs, + **kwargs: dict[str, Any], ) -> dict[str, Any]: if kwargs: warnings.warn( @@ -196,9 +208,12 @@ def decode_complete( RemovedInPyjwt3Warning, stacklevel=2, ) + merged_options: SigOptions if options is None: - options = {} - merged_options = {**self.options, **options} + merged_options = self.options + else: + merged_options = {**self.options, **options} + verify_signature = merged_options["verify_signature"] if verify_signature and not algorithms and not isinstance(key, PyJWK): @@ -230,9 +245,9 @@ def decode( jwt: str | bytes, key: AllowedPublicKeys | PyJWK | str | bytes = "", algorithms: Sequence[str] | None = None, - options: dict[str, Any] | None = None, + options: SigOptions | None = None, detached_payload: bytes | None = None, - **kwargs, + **kwargs: dict[str, Any], ) -> Any: if kwargs: warnings.warn( @@ -248,7 +263,7 @@ def decode( return decoded["payload"] def get_unverified_header(self, jwt: str | bytes) -> dict[str, Any]: - """Returns back the JWT header parameters as a dict() + """Returns back the JWT header parameters as a `dict` Note: The signature is not verified so the header parameters should not be fully trusted until signature verification is complete @@ -277,7 +292,7 @@ def _load(self, jwt: str | bytes) -> tuple[bytes, bytes, dict[str, Any], bytes]: raise DecodeError("Invalid header padding") from err try: - header = json.loads(header_data) + header: dict[str, Any] = json.loads(header_data) except ValueError as e: raise DecodeError(f"Invalid header string: {e}") from e diff --git a/jwt/api_jwt.py b/jwt/api_jwt.py index 5d6d27719..418163a83 100644 --- a/jwt/api_jwt.py +++ b/jwt/api_jwt.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import os import warnings from calendar import timegm from collections.abc import Iterable, Sequence @@ -21,19 +22,32 @@ ) from .warnings import RemovedInPyjwt3Warning -if TYPE_CHECKING: - from .algorithms import AllowedPrivateKeys, AllowedPublicKeys +if TYPE_CHECKING or bool(os.getenv("SPHINX_BUILD", "")): + from typing import TypeAlias + + from .algorithms import has_crypto from .api_jwk import PyJWK + from .types import FullOptions, Options, SigOptions + + if has_crypto: + from .algorithms import AllowedPrivateKeys, AllowedPublicKeys + + AllowedPrivateKeyTypes: TypeAlias = AllowedPrivateKeys | PyJWK | str | bytes # type: ignore + AllowedPublicKeyTypes: TypeAlias = AllowedPublicKeys | PyJWK | str | bytes # type: ignore + else: + AllowedPrivateKeyTypes: TypeAlias = PyJWK | str | bytes # type: ignore + AllowedPublicKeyTypes: TypeAlias = PyJWK | str | bytes # type: ignore class PyJWT: - def __init__(self, options: dict[str, Any] | None = None) -> None: - if options is None: - options = {} - self.options: dict[str, Any] = {**self._get_default_options(), **options} + def __init__(self, options: Options | None = None) -> None: + self.options: FullOptions + self.options = self._get_default_options() + if options is not None: + self.options = self._merge_options(options) @staticmethod - def _get_default_options() -> dict[str, bool | list[str]]: + def _get_default_options() -> FullOptions: return { "verify_signature": True, "verify_exp": True, @@ -44,17 +58,57 @@ def _get_default_options() -> dict[str, bool | list[str]]: "verify_sub": True, "verify_jti": True, "require": [], + "strict_aud": False, } + def _merge_options(self, options: Options | None = None) -> FullOptions: + if options is None: + return self.options + + # (defensive) set defaults for verify_x to False if verify_signature is False + if not options.get("verify_signature", True): + options["verify_exp"] = options.get("verify_exp", False) + options["verify_nbf"] = options.get("verify_nbf", False) + options["verify_iat"] = options.get("verify_iat", False) + options["verify_aud"] = options.get("verify_aud", False) + options["verify_iss"] = options.get("verify_iss", False) + options["verify_sub"] = options.get("verify_sub", False) + options["verify_jti"] = options.get("verify_jti", False) + return {**self.options, **options} + def encode( self, payload: dict[str, Any], - key: AllowedPrivateKeys | PyJWK | str | bytes, + key: AllowedPrivateKeyTypes, algorithm: str | None = None, headers: dict[str, Any] | None = None, json_encoder: type[json.JSONEncoder] | None = None, sort_headers: bool = True, ) -> str: + """Encode the ``payload`` as JSON Web Token. + + :param payload: JWT claims, e.g. ``dict(iss=..., aud=..., sub=...)`` + :type payload: dict[str, typing.Any] + :param key: a key suitable for the chosen algorithm: + + * for **asymmetric algorithms**: PEM-formatted private key, a multiline string + * for **symmetric algorithms**: plain string, sufficiently long for security + + :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPrivateKeys` + :param algorithm: algorithm to sign the token with, e.g. ``"ES256"``. + If ``headers`` includes ``alg``, it will be preferred to this parameter. + If ``key`` is a :class:`PyJWK` object, by default the key algorithm will be used. + :type algorithm: str or None + :param headers: additional JWT header fields, e.g. ``dict(kid="my-key-id")``. + :type headers: dict[str, typing.Any] or None + :param json_encoder: custom JSON encoder for ``payload`` and ``headers`` + :type json_encoder: json.JSONEncoder or None + + :rtype: str + :returns: a JSON Web Token + + :raises TypeError: if ``payload`` is not a ``dict`` + """ # Check that we get a dict if not isinstance(payload, dict): raise TypeError( @@ -109,9 +163,9 @@ def _encode_payload( def decode_complete( self, jwt: str | bytes, - key: AllowedPublicKeys | PyJWK | str | bytes = "", + key: AllowedPublicKeyTypes = "", algorithms: Sequence[str] | None = None, - options: dict[str, Any] | None = None, + options: Options | None = None, # deprecated arg, remove in pyjwt3 verify: bool | None = None, # could be used as passthrough to api_jws, consider removal in pyjwt3 @@ -119,12 +173,50 @@ def decode_complete( # passthrough arguments to _validate_claims # consider putting in options audience: str | Iterable[str] | None = None, - issuer: str | Sequence[str] | None = None, + issuer: str | Container[str] | None = None, subject: str | None = None, leeway: float | timedelta = 0, # kwargs **kwargs: Any, ) -> dict[str, Any]: + """Identical to ``jwt.decode`` except for return value which is a dictionary containing the token header (JOSE Header), + the token payload (JWT Payload), and token signature (JWT Signature) on the keys "header", "payload", + and "signature" respectively. + + :param jwt: the token to be decoded + :type jwt: str or bytes + :param key: the key suitable for the allowed algorithm + :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` + + :param algorithms: allowed algorithms, e.g. ``["ES256"]`` + + .. warning:: + + Do **not** compute the ``algorithms`` parameter based on + the ``alg`` from the token itself, or on any other data + that an attacker may be able to influence, as that might + expose you to various vulnerabilities (see `RFC 8725 §2.1 + `_). Instead, + either hard-code a fixed value for ``algorithms``, or + configure it in the same place you configure the + ``key``. Make sure not to mix symmetric and asymmetric + algorithms that interpret the ``key`` in different ways + (e.g. HS\\* and RS\\*). + :type algorithms: typing.Sequence[str] or None + + :param jwt.types.Options options: extended decoding and validation options + Refer to :py:class:`jwt.types.Options` for more information. + + :param audience: optional, the value for ``verify_aud`` check + :type audience: str or typing.Iterable[str] or None + :param issuer: optional, the value for ``verify_iss`` check + :type issuer: str or typing.Container[str] or None + :param leeway: a time margin in seconds for the expiration check + :type leeway: float or datetime.timedelta + :rtype: dict[str, typing.Any] + :returns: Decoded JWT with the JOSE Header on the key ``header``, the JWS + Payload on the key ``payload``, and the JWS Signature on the key ``signature``. + """ if kwargs: warnings.warn( "passing additional kwargs to decode_complete() is deprecated " @@ -133,13 +225,16 @@ def decode_complete( RemovedInPyjwt3Warning, stacklevel=2, ) - options = dict(options or {}) # shallow-copy or initialize an empty dict - options.setdefault("verify_signature", True) + + if options is None: + verify_signature = True + else: + verify_signature = options.get("verify_signature", True) # If the user has set the legacy `verify` argument, and it doesn't match # what the relevant `options` entry for the argument is, inform the user # that they're likely making a mistake. - if verify is not None and verify != options["verify_signature"]: + if verify is not None and verify != verify_signature: warnings.warn( "The `verify` argument to `decode` does nothing in PyJWT 2.0 and newer. " "The equivalent is setting `verify_signature` to False in the `options` dictionary. " @@ -148,26 +243,18 @@ def decode_complete( stacklevel=2, ) - if not options["verify_signature"]: - options.setdefault("verify_exp", False) - options.setdefault("verify_nbf", False) - options.setdefault("verify_iat", False) - options.setdefault("verify_aud", False) - options.setdefault("verify_iss", False) - options.setdefault("verify_sub", False) - options.setdefault("verify_jti", False) - + sig_options: SigOptions = {"verify_signature": verify_signature} decoded = api_jws.decode_complete( jwt, key=key, algorithms=algorithms, - options=options, + options=sig_options, detached_payload=detached_payload, ) payload = self._decode_payload(decoded) - merged_options = {**self.options, **options} + merged_options = self._merge_options(options) self._validate_claims( payload, merged_options, @@ -180,7 +267,7 @@ def decode_complete( decoded["payload"] = payload return decoded - def _decode_payload(self, decoded: dict[str, Any]) -> Any: + def _decode_payload(self, decoded: dict[str, Any]) -> dict[str, Any]: """ Decode the payload from a JWS dictionary (payload, signature, header). @@ -189,7 +276,7 @@ def _decode_payload(self, decoded: dict[str, Any]) -> Any: payloads. """ try: - payload = json.loads(decoded["payload"]) + payload: dict[str, Any] = json.loads(decoded["payload"]) except ValueError as e: raise DecodeError(f"Invalid payload string: {e}") from e if not isinstance(payload, dict): @@ -201,7 +288,7 @@ def decode( jwt: str | bytes, key: AllowedPublicKeys | PyJWK | str | bytes = "", algorithms: Sequence[str] | None = None, - options: dict[str, Any] | None = None, + options: Options | None = None, # deprecated arg, remove in pyjwt3 verify: bool | None = None, # could be used as passthrough to api_jws, consider removal in pyjwt3 @@ -210,11 +297,49 @@ def decode( # consider putting in options audience: str | Iterable[str] | None = None, subject: str | None = None, - issuer: str | Sequence[str] | None = None, + issuer: str | Container[str] | None = None, leeway: float | timedelta = 0, # kwargs **kwargs: Any, - ) -> Any: + ) -> dict[str, Any]: + """Verify the ``jwt`` token signature and return the token claims. + + :param jwt: the token to be decoded + :type jwt: str or bytes + :param key: the key suitable for the allowed algorithm + :type key: str or bytes or PyJWK or :py:class:`jwt.algorithms.AllowedPublicKeys` + + :param algorithms: allowed algorithms, e.g. ``["ES256"]`` + If ``key`` is a :class:`PyJWK` object, allowed algorithms will default to the key algorithm. + + .. warning:: + + Do **not** compute the ``algorithms`` parameter based on + the ``alg`` from the token itself, or on any other data + that an attacker may be able to influence, as that might + expose you to various vulnerabilities (see `RFC 8725 §2.1 + `_). Instead, + either hard-code a fixed value for ``algorithms``, or + configure it in the same place you configure the + ``key``. Make sure not to mix symmetric and asymmetric + algorithms that interpret the ``key`` in different ways + (e.g. HS\* and RS\*). + :type algorithms: typing.Sequence[str] or None + + :param jwt.types.Options options: extended decoding and validation options + Refer to :py:class:`jwt.types.Options` for more information. + + :param audience: optional, the value for ``verify_aud`` check + :type audience: str or typing.Iterable[str] or None + :param subject: optional, the value for ``verify_sub`` check + :type subject: str or None + :param issuer: optional, the value for ``verify_iss`` check + :type issuer: str or typing.Container[str] or None + :param leeway: a time margin in seconds for the expiration check + :type leeway: float or datetime.timedelta + :rtype: dict[str, typing.Any] + :returns: the JWT claims + """ if kwargs: warnings.warn( "passing additional kwargs to decode() is deprecated " @@ -240,9 +365,9 @@ def decode( def _validate_claims( self, payload: dict[str, Any], - options: dict[str, Any], - audience=None, - issuer=None, + options: FullOptions, + audience: Iterable[str] | str | None = None, + issuer: Container[str] | str | None = None, subject: str | None = None, leeway: float | timedelta = 0, ) -> None: @@ -252,7 +377,7 @@ def _validate_claims( if audience is not None and not isinstance(audience, (str, Iterable)): raise TypeError("audience must be a string, iterable or None") - self._validate_required_claims(payload, options) + self._validate_required_claims(payload, options["require"]) now = datetime.now(tz=timezone.utc).timestamp() @@ -282,13 +407,15 @@ def _validate_claims( def _validate_required_claims( self, payload: dict[str, Any], - options: dict[str, Any], + claims: Iterable[str], ) -> None: - for claim in options["require"]: + for claim in claims: if payload.get(claim) is None: raise MissingRequiredClaimError(claim) - def _validate_sub(self, payload: dict[str, Any], subject=None) -> None: + def _validate_sub( + self, payload: dict[str, Any], subject: str | None = None + ) -> None: """ Checks whether "sub" if in the payload is valid or not. This is an Optional claim diff --git a/jwt/exceptions.py b/jwt/exceptions.py index 9b45ae48c..57559fc43 100644 --- a/jwt/exceptions.py +++ b/jwt/exceptions.py @@ -7,46 +7,71 @@ class PyJWTError(Exception): class InvalidTokenError(PyJWTError): + """Base exception when ``decode()`` fails on a token""" + pass class DecodeError(InvalidTokenError): + """Raised when a token cannot be decoded because it failed validation""" + pass class InvalidSignatureError(DecodeError): + """Raised when a token's signature doesn't match the one provided as part of + the token.""" + pass class ExpiredSignatureError(InvalidTokenError): + """Raised when a token's ``exp`` claim indicates that it has expired""" + pass class InvalidAudienceError(InvalidTokenError): + """Raised when a token's ``aud`` claim does not match one of the expected + audience values""" + pass class InvalidIssuerError(InvalidTokenError): + """Raised when a token's ``iss`` claim does not match the expected issuer""" + pass class InvalidIssuedAtError(InvalidTokenError): + """Raised when a token's ``iat`` claim is non-numeric""" + pass class ImmatureSignatureError(InvalidTokenError): + """Raised when a token's ``nbf`` or ``iat`` claims represent a time in the future""" + pass class InvalidKeyError(PyJWTError): + """Raised when the specified key is not in the proper format""" + pass class InvalidAlgorithmError(InvalidTokenError): + """Raised when the specified algorithm is not recognized by PyJWT""" + pass class MissingRequiredClaimError(InvalidTokenError): + """Raised when a claim that is required to be present is not contained + in the claimset""" + def __init__(self, claim: str) -> None: self.claim = claim @@ -59,6 +84,8 @@ class PyJWKError(PyJWTError): class MissingCryptographyError(PyJWKError): + """Raised if the algorithm requires ``cryptography`` to be installed and it is not available.""" + pass @@ -75,8 +102,12 @@ class PyJWKClientConnectionError(PyJWKClientError): class InvalidSubjectError(InvalidTokenError): + """Raised when a token's ``sub`` claim is not a string or doesn't match the expected ``subject``""" + pass class InvalidJTIError(InvalidTokenError): + """Raised when a token's ``jti`` claim is not a string""" + pass diff --git a/jwt/types.py b/jwt/types.py index 7d9935205..cc1ea6a1e 100644 --- a/jwt/types.py +++ b/jwt/types.py @@ -1,5 +1,64 @@ -from typing import Any, Callable, Dict +from typing import Any, Callable, Dict, TypedDict JWKDict = Dict[str, Any] HashlibHash = Callable[..., Any] + + +class SigOptions(TypedDict): + """Options for PyJWS class (TypedDict). Note that this is a smaller set of options than + for :py:func:`jwt.decode()`.""" + + verify_signature: bool + """verify the JWT cryptographic signature""" + + +class Options(TypedDict, total=False): + """Options for :py:func:`jwt.decode()` and :py:func:`jwt.api_jwt.decode_complete()` (TypedDict). + + .. warning:: + + Some claims, such as ``exp``, ``iat``, ``jti``, ``nbf``, and ``sub``, + will only be verified if present. Please refer to the documentation below + for which ones, and make sure to include them in the ``require`` param + if you want to make sure that they are always present (and therefore always verified + if ``verify_{claim} = True`` for that claim). + """ + + verify_signature: bool + """Default: ``True``. Verify the JWT cryptographic signature.""" + require: list[str] + """Default: ``[]``. List of claims that must be present. + Example: ``require=["exp", "iat", "nbf"]``. + **Only verifies that the claims exists**. Does not verify that the claims are valid.""" + strict_aud: bool + """Default: ``False``. (requires ``verify_aud=True``) Check that the ``aud`` claim is a single value (not a list), and matches ``audience`` exactly.""" + verify_aud: bool + """Default: ``verify_signature``. Check that ``aud`` (audience) claim matches ``audience``.""" + verify_exp: bool + """Default: ``verify_signature``. Check that ``exp`` (expiration) claim value is in the future (if present in payload). """ + verify_iat: bool + """Default: ``verify_signature``. Check that ``iat`` (issued at) claim value is an integer (if present in payload). """ + verify_iss: bool + """Default: ``verify_signature``. Check that ``iss`` (issuer) claim matches ``issuer``. """ + verify_jti: bool + """Default: ``verify_signature``. Check that ``jti`` (JWT ID) claim is a string (if present in payload). """ + verify_nbf: bool + """Default: ``verify_signature``. Check that ``nbf`` (not before) claim value is in the past (if present in payload). """ + verify_sub: bool + """Default: ``verify_signature``. Check that ``sub`` (subject) claim is a string and matches ``subject`` (if present in payload). """ + + +# The only difference between Options and FullOptions is that FullOptions +# required _every_ value to be there; Options doesn't require any +class FullOptions(TypedDict): + verify_signature: bool + require: list[str] + strict_aud: bool + verify_aud: bool + verify_exp: bool + verify_iat: bool + verify_iss: bool + verify_jti: bool + verify_nbf: bool + verify_sub: bool diff --git a/pyproject.toml b/pyproject.toml index b49b5f58a..c09ee9366 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,7 +79,7 @@ source = [ [tool.coverage.report] exclude_lines = [ - "if TYPE_CHECKING:", + "if TYPE_CHECKING", "pragma: no cover", ] show_missing = true diff --git a/tests/test_api_jwt.py b/tests/test_api_jwt.py index ec4eed8a8..2077b7b93 100644 --- a/tests/test_api_jwt.py +++ b/tests/test_api_jwt.py @@ -36,6 +36,14 @@ def payload(): class TestJWT: + def test_jwt_with_options(self): + jwt = PyJWT(options={"verify_signature": False}) + assert jwt.options["verify_signature"] is False + # assert that unrelated option is unchanged from default + assert jwt.options["strict_aud"] is False + # assert that verify_signature is respected unless verify_exp is overridden + assert jwt.options["verify_exp"] is False + def test_decodes_valid_jwt(self, jwt): example_payload = {"hello": "world"} example_secret = "secret"