Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions .github/workflows/spdm-validator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,27 @@ jobs:
cargo xtask all-build
cargo t -p tests-integration -- --test test_mctp_spdm_responder_conformance --nocapture --include-ignored
sccache --show-stats

- name: Setup EAT validation environment and test EAT
env:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-emu/build/bin
run: |
# Setup Python environment
sudo apt-get install -qy python3-pip python3-venv
cd runtime/userspace/api/eat/decoder
python3 -m venv eat_venv
source eat_venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
echo "Virtual environment created with working library versions"
pip list | grep -E "(cryptography|asn1crypto|cbor2)"

# Test EAT decoder
echo "SPDM_NONCE in EAT decoder stage: ${SPDM_NONCE}"
echo "SPDM_NONCE length: ${#SPDM_NONCE}"
echo "Using virtual environment with working cryptography version:"
pip list | grep -E "(cryptography|asn1crypto|cbor2)"
python3 decode_json.py 0

- name: Upload logs and traces for MCTP transport
if: always()
Expand Down Expand Up @@ -135,6 +156,21 @@ jobs:
exit 1
fi

- name: Upload logs and traces for MCTP transport
if: always()
uses: actions/upload-artifact@v4
env:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-emu/build/bin
with:
name: spdm-mctp-validator-test-results
path: |
${{ env.SPDM_VALIDATOR_DIR }}/test.log
${{ env.SPDM_VALIDATOR_DIR }}/caliptra_spdm_validator.pcap
${{ env.SPDM_VALIDATOR_DIR }}/spdm_device_validator_output.txt
${{ env.SPDM_VALIDATOR_DIR }}/slot_0_certs
${{ env.SPDM_VALIDATOR_DIR }}/measurement_block_fd.bin
${{ env.SPDM_VALIDATOR_DIR }}/eat_claims.json

- name: Run SPDM validator tests on DOE transport
env:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-emu/build/bin
Expand All @@ -143,6 +179,12 @@ jobs:
cargo t -p tests-integration -- --test test_doe_spdm_responder_conformance --nocapture --include-ignored
sccache --show-stats

- name: Display SPDM Validator test results for DOE transport
env:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-emu/build/bin
run: |
cat $SPDM_VALIDATOR_DIR/test.log

- name: Upload logs and traces for DOE transport
if: always()
uses: actions/upload-artifact@v4
Expand All @@ -155,12 +197,6 @@ jobs:
${{ env.SPDM_VALIDATOR_DIR }}/caliptra_spdm_validator.pcap
${{ env.SPDM_VALIDATOR_DIR }}/spdm_device_validator_output.txt

- name: Display SPDM Validator test results for DOE transport
env:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-emu/build/bin
run: |
cat $SPDM_VALIDATOR_DIR/test.log

- name: Check for test failures on DOE transport
env:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-emu/build/bin
Expand Down Expand Up @@ -208,5 +244,4 @@ jobs:
SPDM_VALIDATOR_DIR: ${{ github.workspace }}/spdm-rs/target/debug
with:
name: spdm-tdisp-ide-validator-test-results
path: |
${{ env.SPDM_VALIDATOR_DIR }}/tdisp_ide_validator_output.txt
path: ${{ env.SPDM_VALIDATOR_DIR }}/tdisp_ide_validator_output.txt
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/target*/
test_key
*env*

# By default, ignore Cargo.lock files in non-workspace directories.
*/**/Cargo.lock
Expand Down
216 changes: 216 additions & 0 deletions runtime/userspace/api/eat/decoder/certchain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""Certificate chain assembly and lightweight validation utilities.

Builds an ordered chain from a provided leaf (attestation / AK certificate) plus
locally stored DER files (if present) in the fixed search order:
[ leaf, RT_CERT, FMC_CERT, LDEVID_CERT, IDEVID_CERT, ROOT_CA ]

Missing files are silently skipped. Chain validation is intentionally *lightweight*:
- Expiration check
- basicConstraints CA checks for intermediates & root
- Issuer/Subject linkage (with relaxed DN comparison & normalization)
- Signature verification (cryptography, with OpenSSL fallback)
- Root self-issued check
- Diagnostic SKI/AKI correlation (including derived SKI when missing)
- Detailed ASN.1 dumps for DN mismatches

The heavier PKIX features (policies, path length, EKU, revocation) are out of scope
for this helper.
"""
from __future__ import annotations

import logging
import os
from typing import List, Tuple, Dict
from asn1crypto import x509
from dice_cert_parser import parse_certchain

logger = logging.getLogger(__name__)


def write_certs_to_file(certs: List[x509.Certificate], slot_id: int) -> None:
"""Write certificates to individual files in SPDM_VALIDATOR_DIR.

Files are written in leaf-first order (matching chain validation order):
- cert0.der = Leaf certificate (first in chain validation)
- cert1.der = Intermediate certificate
- ...
- cert5.der = Root certificate (last in chain validation)
"""
spdm_validator_dir = os.environ.get('SPDM_VALIDATOR_DIR')
if not spdm_validator_dir:
raise ValueError("SPDM_VALIDATOR_DIR environment variable is not set")

# Create output directory in SPDM_VALIDATOR_DIR
output_dir = os.path.join(spdm_validator_dir, f"slot_{slot_id}_certs")
os.makedirs(output_dir, exist_ok=True)

# Reverse the certificates to store leaf-first (cert0.der = leaf)
reversed_certs = list(reversed(certs))
for i, cert in enumerate(reversed_certs, 0):
# Write certificate to file (leaf-first order)
cert_filename = os.path.join(output_dir, f"cert{i}.der")
with open(cert_filename, "wb") as cert_file:
cert_file.write(cert.dump())
logger.info("Written to: %s", cert_filename)


def get_leaf_public_key(cert_chain: List[bytes]) -> bytes:
"""Extract the public key from the leaf certificate in DER format."""
if not cert_chain:
raise ValueError("Certificate chain is empty")

leaf_cert_der = cert_chain[0]
leaf_cert = x509.Certificate.load(leaf_cert_der)
public_key_info = leaf_cert['tbs_certificate']['subject_public_key_info']
return public_key_info.dump()

def decode_spdm_certchain(slot_id) -> List[bytes]:
"""Parse a certificate chain from SPDM_VALIDATOR_DIR and save individual certificates."""
# Construct blob path from environment variable and slot_id
spdm_validator_dir = os.environ.get('SPDM_VALIDATOR_DIR')
if not spdm_validator_dir:
raise ValueError("SPDM_VALIDATOR_DIR environment variable is not set")

blob_path = os.path.join(spdm_validator_dir, f"certificate_chain_slot_{slot_id:02d}.der")

with open(blob_path, "rb") as f:
data = f.read()

certs = []
offset = 0
while offset < len(data):
try:
cert = x509.Certificate.load(data[offset:])
certs.append(cert)
# Advance offset by the length of the parsed cert
offset += len(cert.dump())
except Exception as e:
logger.warning("Stopped parsing at offset %d: %s", offset, e)
break

# Write certificates to files (original order)
write_certs_to_file(certs, slot_id)

# Return list of DER certificate bytes in reverse order (leaf first)
return [cert.dump() for cert in reversed(certs)]


def validate_certchain(cert_chain: List[bytes], verbose: bool = False, parse: bool = False) -> Tuple[bool, List[str]]:
"""Validate certificate chain and optionally parse certificate fields."""
if parse:
logger.info("Parse the certificate chain")
if parse_certchain(cert_chain, verbose=verbose):
logger.info("Certificate parsing completed")
else:
logger.info("Certificate parsing failed")

# Just validate without parsing
try:
chain_valid, issues = _validate_certificate_chain(cert_chain)
if chain_valid:
logger.info("Certificate chain validation: SUCCESS")
else:
logger.warning("Certificate chain validation: FAILED (%d issue(s))", len(issues))
for iss in issues:
logger.warning(" - %s", iss)
return chain_valid, issues
except Exception as e:
logger.error("Chain validation error: %s", e)
return cert_chain, []

def _validate_certificate_chain(chain: List[bytes]) -> Tuple[bool, List[str]]:
"""Certificate chain validation using cryptography library's built-in validation."""
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, rsa, padding
except ImportError:
return False, ["cryptography library not available for certificate validation"]

try:
# Load certificates using cryptography
certs = [x509.load_der_x509_certificate(der) for der in chain]

if not certs:
return False, ["No certificates in chain"]

# Reject insufficient certificates - we expect a proper certificate chain
if len(certs) < 2:
return False, [f"Certificate chain must contain at least 2 certificates (leaf + issuer), found {len(certs)}"]

# Perform chain validation manually since cryptography doesn't have a simple built-in validator
leaf_cert = certs[0] # First certificate is leaf

# Validate each certificate against its issuer
for idx in range(len(certs) - 1):
child_cert = certs[idx]
issuer_cert = certs[idx + 1]

# Check expiration
now = child_cert.not_valid_before.__class__.now()
if child_cert.not_valid_after < now:
return False, [f"Certificate {idx} has expired"]

# Check issuer/subject linkage
if child_cert.issuer != issuer_cert.subject:
return False, [f"Certificate {idx} issuer does not match certificate {idx+1} subject"]

# Verify signature
try:
issuer_public_key = issuer_cert.public_key()
signature = child_cert.signature
tbs_bytes = child_cert.tbs_certificate_bytes
signature_algorithm = child_cert.signature_hash_algorithm

if isinstance(issuer_public_key, ec.EllipticCurvePublicKey):
issuer_public_key.verify(signature, tbs_bytes, ec.ECDSA(signature_algorithm))
elif isinstance(issuer_public_key, rsa.RSAPublicKey):
issuer_public_key.verify(signature, tbs_bytes, padding.PKCS1v15(), signature_algorithm)
else:
return False, [f"Certificate {idx}: Unsupported public key type in issuer {idx+1}"]
except Exception as sig_err:
return False, [f"Certificate {idx}: Signature verification failed: {sig_err}"]

# Check root certificate is self-issued
root_cert = certs[-1]
if root_cert.issuer != root_cert.subject:
return False, ["Root certificate is not self-issued"]

return True, []

except Exception as e:
# If cryptography validation fails, fall back to basic manual checks for debugging
try:
# Load with asn1crypto for basic parsing checks
asn1_certs = [x509.Certificate.load(der) for der in chain]
issues = []

# Basic parsing validation
for idx, cert in enumerate(asn1_certs):
try:
# Check if certificate can be parsed
tbs = cert['tbs_certificate']
subject = tbs['subject'].native
issuer = tbs['issuer'].native
validity = tbs['validity'].native

# Basic expiration check
import datetime
now = datetime.datetime.now(datetime.timezone.utc)
not_after = validity.get('not_after')
if not_after and not_after < now:
issues.append(f"Cert {idx} expired")

except Exception as parse_err:
issues.append(f"Cert {idx} parsing failed: {parse_err}")

# Return cryptography error with basic diagnostic info
error_msg = f"Cryptography validation failed: {e}"
if issues:
error_msg += f" | Basic checks found: {'; '.join(issues)}"

return False, [error_msg]

except Exception as fallback_err:
return False, [f"Certificate validation failed: {e} (fallback also failed: {fallback_err})"]
2 changes: 1 addition & 1 deletion runtime/userspace/api/eat/decoder/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ def parse_generic_claim_value(payload, claims_offset, value_type, value_info):

def parse_eat_claims(payload):
"""Parse EAT claims from payload"""
print("\n=== EAT Claims Analysis ===")
print("\n--- EAT Claims Analysis ---")
claims_offset = 0
claims_header, claims_offset = parse_cbor_header(payload, claims_offset)

Expand Down
Loading
Loading