Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions acapy_agent/messaging/decorators/attach_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@
)
self.jws_ = AttachDecoratorDataJWS.deserialize(jws)

async def verify(

Check failure on line 415 in acapy_agent/messaging/decorators/attach_decorator.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=openwallet-foundation_acapy&issues=AZzi1aDDSua2IGoDVNFz&open=AZzi1aDDSua2IGoDVNFz&pullRequest=4085
self, wallet: BaseWallet, signer_verkey: Optional[str] = None
) -> bool:
"""Verify the signature(s).
Expand Down Expand Up @@ -442,8 +442,15 @@
if not await wallet.verify_message(sign_input, b_sig, verkey, ED25519):
return False

if "kid" in jwk:
encoded_pk = DIDKey.from_did(protected["jwk"]["kid"]).public_key_b58
# Prefer kid from JWS header (canonical per spec); fall back to jwk.kid
kid = None
if getattr(sig, "header", None) and getattr(sig.header, "kid", None):
kid = sig.header.kid
elif "kid" in jwk:
kid = protected["jwk"]["kid"]

if kid:
encoded_pk = DIDKey.from_did(kid).public_key_b58
verkey_to_check.append(encoded_pk)
if not await wallet.verify_message(
sign_input, b_sig, encoded_pk, ED25519
Expand Down
82 changes: 81 additions & 1 deletion acapy_agent/messaging/decorators/tests/test_attach_decorator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from copy import deepcopy
from datetime import datetime, timezone
from types import SimpleNamespace
from unittest import TestCase

import pytest
Expand All @@ -12,7 +13,7 @@
from ....wallet.base import BaseWallet
from ....wallet.did_method import SOV, DIDMethods
from ....wallet.key_type import ED25519
from ....wallet.util import b64_to_bytes, bytes_to_b64
from ....wallet.util import b58_to_bytes, b64_to_bytes, bytes_to_b64, str_to_b64
from ..attach_decorator import (
AttachDecorator,
AttachDecoratorData,
Expand Down Expand Up @@ -522,3 +523,82 @@ async def test_indy_sign(self, wallet, seed):
deco_dict["data"]["links"] = "https://en.wikipedia.org/wiki/Potato"
with pytest.raises(BaseModelError):
AttachDecorator.deserialize(deco_dict) # now has base64 and links

@pytest.mark.asyncio
async def test_verify_uses_kid_from_header_when_jwk_has_no_kid(self, wallet, seed):
"""Verify uses kid from JWS header when jwk has no kid (Credo/interop case)."""
# Build a JWS where kid is only in the unprotected header, not in jwk.
# This is the format some agents (e.g. Credo) send; verification must use header.kid.
did_info = await wallet.create_local_did(SOV, ED25519, seed[0])
verkey_b58 = did_info.verkey
kid_full = did_key(verkey_b58)

payload = b"payload for header-kid test"
b64_payload = bytes_to_b64(payload)
# Protected header with jwk but NO kid in jwk
protected = {
"alg": "EdDSA",
"jwk": {
"kty": "OKP",
"crv": "Ed25519",
"x": bytes_to_b64(b58_to_bytes(verkey_b58), urlsafe=True, pad=False),
},
}
b64_protected = str_to_b64(
json.dumps(protected, separators=(",", ":")),
urlsafe=True,
pad=False,
)
sign_input = (b64_protected + "." + b64_payload).encode("ascii")
sig_bytes = await wallet.sign_message(
message=sign_input,
from_verkey=verkey_b58,
)
b64_sig = bytes_to_b64(sig_bytes, urlsafe=True, pad=False)

data = AttachDecoratorData.deserialize(
{
"base64": b64_payload,
"jws": {
"header": {"kid": kid_full},
"protected": b64_protected,
"signature": b64_sig,
},
}
)
assert await data.verify(wallet)
assert await data.verify(wallet, signer_verkey=verkey_b58)

@pytest.mark.asyncio
async def test_verify_uses_kid_from_jwk_when_header_has_no_kid(self, wallet, seed):
"""Verify uses jwk.kid when header has no kid (fallback path)."""
deco = AttachDecorator.data_base64(
mapping=INDY_CRED,
ident=IDENT,
description=DESCRIPTION,
)
did_info = await wallet.create_local_did(SOV, ED25519, seed[0])
await deco.data.sign(did_info.verkey, wallet)
# Force fallback: header has no kid, so verify must use jwk.kid from protected
deco.data.jws_.header = SimpleNamespace(kid=None)
assert await deco.data.verify(wallet)
assert await deco.data.verify(wallet, signer_verkey=did_info.verkey)

@pytest.mark.asyncio
async def test_verify_returns_false_when_signer_verkey_does_not_match(
self, wallet, seed
):
"""Verify returns False when signer_verkey is not the signing key."""
deco = AttachDecorator.data_base64(
mapping=INDY_CRED,
ident=IDENT,
description=DESCRIPTION,
)
did_infos = [await wallet.create_local_did(SOV, ED25519, seed[i]) for i in [0, 1]]
await deco.data.sign(did_infos[0].verkey, wallet)
# Sign with key 0; require key 1 -> should fail
assert not await deco.data.verify(wallet, signer_verkey=did_infos[1].verkey)
# No signer_verkey constraint -> should pass
assert await deco.data.verify(wallet)
# Correct signer_verkey -> should pass
assert await deco.data.verify(wallet, signer_verkey=did_infos[0].verkey)
Loading