Skip to content

Commit 19d7e4d

Browse files
authored
feat(guard): sync White Rabbit edge events
Adds White Rabbit Guard v1 edge event envelopes, local queue persistence, v1 event upload during normal sync/connect flows, resilient fallback when the v1 endpoint is unavailable, and terminal rejection handling so queued events cannot starve newer sync work.
1 parent 0d1933b commit 19d7e4d

9 files changed

Lines changed: 669 additions & 40 deletions

File tree

src/codex_plugin_scanner/guard/cli/connect_flow.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,36 +107,37 @@ def run_guard_connect_command(
107107
sync_payload = sync_receipts(store)
108108
except GuardSyncNotAvailableError as plan_error:
109109
plan_msg = str(plan_error).strip() or "Cloud sync requires a paid Guard plan."
110+
pending_sync_payload = dict(runtime_sync_summary)
111+
pending_sync_payload["synced_at"] = None
110112
pending_state = _record_connect_result(
111113
daemon_client=daemon_client,
112114
store=store,
113115
request_id=str(connect_request["request_id"]),
114-
status="connected",
115-
milestone="sync_not_available",
116+
status="retry_required",
117+
milestone="first_sync_failed",
116118
reason=plan_msg,
119+
sync=pending_sync_payload,
117120
)
118121
return build_connect_payload(
119122
state=pending_state,
120123
browser_opened=browser_opened,
121124
connect_url=browser_url,
122125
sync_url=sync_url,
123-
connected=True,
126+
connected=False,
127+
sync=pending_sync_payload,
124128
sync_available=False,
125129
sync_message=plan_msg,
126130
)
127131
except (RuntimeError, OSError, urllib.error.URLError, json.JSONDecodeError) as error:
128132
sync_message = str(error)
129-
if runtime_sync_error and not _is_paid_plan_sync_error(sync_message):
130-
sync_message = runtime_sync_error
131-
sync_is_plan_limited = _is_paid_plan_sync_error(sync_message)
132133
pending_sync_payload = dict(runtime_sync_summary)
133134
pending_sync_payload["synced_at"] = None
134135
pending_state = _record_connect_result(
135136
daemon_client=daemon_client,
136137
store=store,
137138
request_id=str(connect_request["request_id"]),
138-
status="connected",
139-
milestone="first_sync_pending",
139+
status="retry_required",
140+
milestone="first_sync_failed",
140141
reason=sync_message,
141142
sync=pending_sync_payload,
142143
)
@@ -145,10 +146,9 @@ def run_guard_connect_command(
145146
browser_opened=browser_opened,
146147
connect_url=browser_url,
147148
sync_url=sync_url,
148-
connected=True,
149+
connected=False,
149150
sync=pending_sync_payload,
150151
sync_message=sync_message,
151-
sync_available=False if sync_is_plan_limited else None,
152152
)
153153
sync_payload["runtime_session_synced_at"] = runtime_sync_summary["runtime_session_synced_at"]
154154
sync_payload["runtime_session_id"] = runtime_sync_summary["runtime_session_id"]
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Builders for Guard Cloud v1 events emitted by the local edge runtime."""
2+
3+
from __future__ import annotations
4+
5+
import hashlib
6+
from typing import cast
7+
8+
from .models import GuardReceipt
9+
from .schemas.guard_event_v1 import GuardEventType, GuardEventV1
10+
11+
12+
def build_receipt_event(
13+
receipt: GuardReceipt,
14+
*,
15+
device_id: str | None = None,
16+
workspace_id: str | None = None,
17+
) -> GuardEventV1:
18+
payload: dict[str, object] = {
19+
"receiptId": receipt.receipt_id,
20+
"harness": receipt.harness,
21+
"artifactId": receipt.artifact_id,
22+
"artifactHash": receipt.artifact_hash,
23+
"artifactName": receipt.artifact_name,
24+
"sourceScope": receipt.source_scope,
25+
"policyDecision": receipt.policy_decision,
26+
"capabilitiesSummary": receipt.capabilities_summary,
27+
"changedCapabilities": list(receipt.changed_capabilities),
28+
"provenanceSummary": receipt.provenance_summary,
29+
"userOverride": receipt.user_override,
30+
}
31+
event_id = f"guard-event-{_fingerprint('receipt.created', receipt.receipt_id)[:32]}"
32+
return GuardEventV1(
33+
event_id=event_id,
34+
idempotency_key=f"receipt.created:{receipt.receipt_id}",
35+
event_type="receipt.created",
36+
source="edge",
37+
occurred_at=receipt.timestamp,
38+
workspace_id=workspace_id,
39+
device_id=device_id,
40+
payload=payload,
41+
)
42+
43+
44+
def build_approval_event(
45+
*,
46+
request_id: str,
47+
event_type: str,
48+
occurred_at: str,
49+
payload: dict[str, object],
50+
device_id: str | None = None,
51+
workspace_id: str | None = None,
52+
) -> GuardEventV1:
53+
if event_type not in {"approval.created", "approval.resolved"}:
54+
raise ValueError("Approval event type must be approval.created or approval.resolved")
55+
return GuardEventV1(
56+
event_id=f"guard-event-{_fingerprint(event_type, request_id, occurred_at)[:32]}",
57+
idempotency_key=f"{event_type}:{request_id}:{occurred_at}",
58+
event_type=cast(GuardEventType, event_type),
59+
source="approval-center",
60+
occurred_at=occurred_at,
61+
workspace_id=workspace_id,
62+
device_id=device_id,
63+
payload=payload,
64+
)
65+
66+
67+
def build_policy_event(
68+
*,
69+
policy_key: str,
70+
occurred_at: str,
71+
payload: dict[str, object],
72+
device_id: str | None = None,
73+
workspace_id: str | None = None,
74+
) -> GuardEventV1:
75+
return GuardEventV1(
76+
event_id=f"guard-event-{_fingerprint('policy.changed', policy_key, occurred_at)[:32]}",
77+
idempotency_key=f"policy.changed:{policy_key}:{occurred_at}",
78+
event_type="policy.changed",
79+
source="policy",
80+
occurred_at=occurred_at,
81+
workspace_id=workspace_id,
82+
device_id=device_id,
83+
payload=payload,
84+
)
85+
86+
87+
def _fingerprint(*parts: str) -> str:
88+
return hashlib.sha256(":".join(parts).encode()).hexdigest()

src/codex_plugin_scanner/guard/runtime/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
GuardSyncNotAvailableError,
55
GuardSyncNotConfiguredError,
66
guard_run,
7+
sync_guard_events,
78
sync_receipts,
89
sync_runtime_session,
910
)
@@ -12,6 +13,7 @@
1213
"GuardSyncNotAvailableError",
1314
"GuardSyncNotConfiguredError",
1415
"guard_run",
16+
"sync_guard_events",
1517
"sync_receipts",
1618
"sync_runtime_session",
1719
]

src/codex_plugin_scanner/guard/runtime/runner.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,69 @@ def sync_receipts(store: GuardStore) -> dict[str, object]:
480480
"inventory": 0,
481481
"inventory_tracked": len(inventory),
482482
}
483+
summary["guard_events_v1"] = sync_guard_events(store)
483484
store.set_sync_payload("sync_summary", summary, now)
484485
return summary
485486

486487

488+
def sync_guard_events(store: GuardStore) -> dict[str, object]:
489+
"""Push pending GuardEventV1 envelopes to Guard Cloud."""
490+
491+
credentials = store.get_sync_credentials()
492+
if credentials is None:
493+
raise GuardSyncNotConfiguredError("Guard is not logged in.")
494+
sync_url = _guard_events_sync_url(str(credentials["sync_url"]))
495+
total_events = 0
496+
total_accepted = 0
497+
synced_at = _now()
498+
while True:
499+
pending_events = store.list_guard_events_v1(uploaded=False, limit=200)
500+
if not pending_events:
501+
break
502+
body = json.dumps({"events": [event["payload"] for event in pending_events]}).encode("utf-8")
503+
request = urllib.request.Request(
504+
sync_url,
505+
data=body,
506+
method="POST",
507+
headers=_guard_sync_headers(str(credentials["token"])),
508+
)
509+
try:
510+
payload = _urlopen_json_with_timeout_retry(
511+
request=request,
512+
timeout_seconds=_SYNC_HTTP_TIMEOUT_SECONDS,
513+
retry_timeout_seconds=_SYNC_HTTP_RETRY_TIMEOUT_SECONDS,
514+
)
515+
except urllib.error.HTTPError as error:
516+
if error.code == 404:
517+
summary = {
518+
"synced_at": synced_at,
519+
"events": total_events,
520+
"accepted": total_accepted,
521+
"sync_skipped": True,
522+
"sync_reason": "guard_events_endpoint_unavailable",
523+
}
524+
store.set_sync_payload("guard_events_v1_summary", summary, synced_at)
525+
return summary
526+
if error.code == 403:
527+
is_plan, message = _check_plan_restriction_403(error)
528+
if is_plan:
529+
raise GuardSyncNotAvailableError(message) from error
530+
raise RuntimeError(message) from error
531+
raise RuntimeError(_sync_http_error_message(error)) from error
532+
except OSError as error:
533+
raise RuntimeError(_sync_url_error_message(error)) from error
534+
completed_ids = _completed_guard_event_ids(payload)
535+
synced_at = _sync_timestamp(payload)
536+
uploaded = store.mark_guard_events_v1_uploaded(completed_ids, synced_at)
537+
total_events += len(pending_events)
538+
total_accepted += uploaded
539+
if uploaded == 0 or len(pending_events) < 200:
540+
break
541+
summary = {"synced_at": synced_at, "events": total_events, "accepted": total_accepted}
542+
store.set_sync_payload("guard_events_v1_summary", summary, synced_at)
543+
return summary
544+
545+
487546
def sync_runtime_session(
488547
store: GuardStore,
489548
*,
@@ -951,6 +1010,45 @@ def _normalized_runtime_sessions_sync_url(sync_url: str) -> str:
9511010
)
9521011

9531012

1013+
def _guard_events_sync_url(sync_url: str) -> str:
1014+
parsed = urllib.parse.urlsplit(_normalized_receipts_sync_url(sync_url))
1015+
if parsed.path.rstrip("/").endswith("/api/v1/guard/events"):
1016+
return urllib.parse.urlunsplit((parsed.scheme, parsed.netloc, parsed.path.rstrip("/"), parsed.query, ""))
1017+
path = parsed.path.rstrip("/")
1018+
for suffix in (
1019+
"/api/guard/receipts/sync",
1020+
"/guard/receipts/sync",
1021+
"/registry/api/v1/guard/receipts/sync",
1022+
):
1023+
if path.endswith(suffix):
1024+
path = path[: -len(suffix)]
1025+
break
1026+
return urllib.parse.urlunsplit(
1027+
(
1028+
parsed.scheme,
1029+
parsed.netloc,
1030+
path.rstrip("/") + "/api/v1/guard/events",
1031+
parsed.query,
1032+
"",
1033+
)
1034+
)
1035+
1036+
1037+
def _completed_guard_event_ids(payload: dict[str, object]) -> list[str]:
1038+
statuses = payload.get("statuses")
1039+
if not isinstance(statuses, list):
1040+
return []
1041+
completed: list[str] = []
1042+
for item in statuses:
1043+
if not isinstance(item, dict):
1044+
continue
1045+
status = str(item.get("status") or "")
1046+
event_id = item.get("eventId")
1047+
if status in {"accepted", "duplicate", "rejected"} and isinstance(event_id, str):
1048+
completed.append(event_id)
1049+
return completed
1050+
1051+
9541052
def _cloud_sync_receipts_payload(store: GuardStore, receipts: list[dict[str, object]]) -> list[dict[str, object]]:
9551053
device_id, device_name = _guard_device_metadata(store)
9561054
return [_cloud_sync_receipt_payload(receipt, device_id=device_id, device_name=device_name) for receipt in receipts]
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Guard Cloud event schema shared by the edge runtime and v1 ingest API."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass, field
6+
from typing import Literal, cast
7+
8+
GuardEventSource = Literal["edge", "approval-center", "policy", "protect-api"]
9+
GuardEventType = Literal["receipt.created", "approval.created", "approval.resolved", "policy.changed"]
10+
11+
12+
@dataclass(frozen=True, slots=True)
13+
class GuardEventV1:
14+
"""Versioned event envelope for replay-safe Guard Cloud sync."""
15+
16+
event_id: str
17+
idempotency_key: str
18+
event_type: GuardEventType
19+
source: GuardEventSource
20+
occurred_at: str
21+
workspace_id: str | None = None
22+
device_id: str | None = None
23+
payload: dict[str, object] = field(default_factory=dict)
24+
schema_version: str = "guard.event.v1"
25+
26+
def to_dict(self) -> dict[str, object]:
27+
return {
28+
"schemaVersion": self.schema_version,
29+
"eventId": self.event_id,
30+
"idempotencyKey": self.idempotency_key,
31+
"eventType": self.event_type,
32+
"source": self.source,
33+
"occurredAt": self.occurred_at,
34+
"workspaceId": self.workspace_id,
35+
"deviceId": self.device_id,
36+
"payload": self.payload,
37+
}
38+
39+
@classmethod
40+
def from_dict(cls, payload: dict[str, object]) -> GuardEventV1:
41+
schema_version = str(payload.get("schemaVersion") or "")
42+
if schema_version != "guard.event.v1":
43+
raise ValueError("Guard event schemaVersion must be guard.event.v1")
44+
event_id = _required_string(payload, "eventId")
45+
idempotency_key = _required_string(payload, "idempotencyKey")
46+
event_type = _required_string(payload, "eventType")
47+
source = _required_string(payload, "source")
48+
occurred_at = _required_string(payload, "occurredAt")
49+
event_payload = payload.get("payload")
50+
if not isinstance(event_payload, dict):
51+
raise ValueError("Guard event payload must be an object")
52+
if event_type not in {"receipt.created", "approval.created", "approval.resolved", "policy.changed"}:
53+
raise ValueError(f"Unsupported Guard event type: {event_type}")
54+
if source not in {"edge", "approval-center", "policy", "protect-api"}:
55+
raise ValueError(f"Unsupported Guard event source: {source}")
56+
workspace_id = payload.get("workspaceId")
57+
device_id = payload.get("deviceId")
58+
return cls(
59+
event_id=event_id,
60+
idempotency_key=idempotency_key,
61+
event_type=cast(GuardEventType, event_type),
62+
source=cast(GuardEventSource, source),
63+
occurred_at=occurred_at,
64+
workspace_id=workspace_id if isinstance(workspace_id, str) else None,
65+
device_id=device_id if isinstance(device_id, str) else None,
66+
payload={str(key): value for key, value in event_payload.items()},
67+
)
68+
69+
70+
def _required_string(payload: dict[str, object], key: str) -> str:
71+
value = payload.get(key)
72+
if not isinstance(value, str) or not value.strip():
73+
raise ValueError(f"Guard event {key} must be a non-empty string")
74+
return value

0 commit comments

Comments
 (0)