|
19 | 19 | """ |
20 | 20 |
|
21 | 21 | from datetime import UTC, datetime |
| 22 | +import hashlib |
22 | 23 | import logging |
| 24 | +import re |
23 | 25 | from typing import Any, Dict, List, Optional, Tuple |
24 | 26 |
|
25 | 27 | from acapy_agent.core.profile import Profile, ProfileSession |
@@ -231,10 +233,15 @@ async def store_certificate( |
231 | 233 | key_id: str, |
232 | 234 | metadata: Optional[Dict] = None, |
233 | 235 | ) -> None: |
234 | | - """Store a PEM certificate.""" |
| 236 | + """Store a PEM certificate and auto-register it as a trust anchor.""" |
235 | 237 | await certificates.store_certificate( |
236 | 238 | session, cert_id, certificate_pem, key_id, metadata |
237 | 239 | ) |
| 240 | + # Automatically register every cert in the PEM chain as a trust anchor |
| 241 | + # so the trust registry stays in sync without manual steps. |
| 242 | + await self.auto_register_trust_anchors( |
| 243 | + session, certificate_pem, source="cert-store" |
| 244 | + ) |
238 | 245 |
|
239 | 246 | async def get_certificate( |
240 | 247 | self, session: ProfileSession, cert_id: str |
@@ -364,3 +371,57 @@ async def get_all_trust_anchor_pems(self, session: ProfileSession) -> List[str]: |
364 | 371 | async def delete_trust_anchor(self, session: ProfileSession, anchor_id: str) -> bool: |
365 | 372 | """Delete a trust anchor by ID.""" |
366 | 373 | return await trust_anchors.delete_trust_anchor(session, anchor_id) |
| 374 | + |
| 375 | + async def auto_register_trust_anchors( |
| 376 | + self, |
| 377 | + session: ProfileSession, |
| 378 | + certificate_pem: str, |
| 379 | + source: str = "auto", |
| 380 | + ) -> List[str]: |
| 381 | + """Parse a PEM (possibly a chain) and register each cert as a trust anchor. |
| 382 | +
|
| 383 | + Duplicate certificates (by PEM content) are silently skipped so this |
| 384 | + method is idempotent. |
| 385 | +
|
| 386 | + Args: |
| 387 | + session: Active database session |
| 388 | + certificate_pem: One or more concatenated PEM certificate blocks |
| 389 | + source: Label for metadata (e.g. ``"key-generation"``) |
| 390 | +
|
| 391 | + Returns: |
| 392 | + List of anchor IDs that were newly created. |
| 393 | + """ |
| 394 | + _PEM_RE = re.compile( |
| 395 | + r"-----BEGIN CERTIFICATE-----[A-Za-z0-9+/=\s]+?" |
| 396 | + r"-----END CERTIFICATE-----\n?", |
| 397 | + re.DOTALL, |
| 398 | + ) |
| 399 | + individual_pems = _PEM_RE.findall(certificate_pem) |
| 400 | + if not individual_pems: |
| 401 | + return [] |
| 402 | + |
| 403 | + # Fetch existing trust anchor PEMs to avoid duplicates |
| 404 | + existing_pems = set(await trust_anchors.get_all_trust_anchor_pems(session)) |
| 405 | + |
| 406 | + created: List[str] = [] |
| 407 | + for pem in individual_pems: |
| 408 | + normalized = pem.strip() |
| 409 | + if normalized in existing_pems: |
| 410 | + continue |
| 411 | + # Deterministic-ish anchor ID based on cert fingerprint |
| 412 | + fingerprint = hashlib.sha256(normalized.encode()).hexdigest()[:12] |
| 413 | + anchor_id = f"auto-{source}-{fingerprint}" |
| 414 | + try: |
| 415 | + await trust_anchors.store_trust_anchor( |
| 416 | + session, |
| 417 | + anchor_id=anchor_id, |
| 418 | + certificate_pem=normalized, |
| 419 | + metadata={"source": source, "auto_registered": True}, |
| 420 | + ) |
| 421 | + created.append(anchor_id) |
| 422 | + existing_pems.add(normalized) |
| 423 | + except Exception: |
| 424 | + LOGGER.debug( |
| 425 | + "Skipping duplicate trust anchor %s", anchor_id, exc_info=True |
| 426 | + ) |
| 427 | + return created |
0 commit comments