Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,64 @@ jobs:
- name: OpenClaw E2E (simulated runtime)
run: npm run e2e

clawdstrike-policy:
name: Canonical Policy Engine (TS)
runs-on: ubuntu-latest
defaults:
run:
working-directory: packages/clawdstrike-policy
steps:
- uses: actions/checkout@v6

- name: Setup Node.js
uses: actions/setup-node@v6
with:
node-version: '24'
cache: 'npm'
cache-dependency-path: packages/clawdstrike-policy/package-lock.json

- name: Install dependencies
run: npm ci

- name: Type check
run: npm run typecheck

- name: Build
run: npm run build

- name: Test
run: npm test

policy-parity:
name: Policy Parity (Rust vs TS)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable

- name: Setup Node.js
uses: actions/setup-node@v6
with:
node-version: '24'
cache: 'npm'
cache-dependency-path: packages/clawdstrike-policy/package-lock.json

- name: Build hush
run: cargo build -p hush-cli --bin hush

- name: Install clawdstrike-policy deps
working-directory: packages/clawdstrike-policy
run: npm ci

- name: Build clawdstrike-policy
working-directory: packages/clawdstrike-policy
run: npm run build

- name: Run parity script
run: node tools/scripts/policy-parity.mjs

python-sdk:
name: Python SDK
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions crates/clawdstrike/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@ regex.workspace = true
glob.workspace = true
chrono.workspace = true
globset.workspace = true
dashmap.workspace = true
futures.workspace = true
uuid = { workspace = true, features = ["v4"] }
unicode-normalization = "0.1"
base64.workspace = true
reqwest = { workspace = true, optional = true }
reqwest.workspace = true
hex.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
proptest = "1.9"
tokio-test = "0.4"
tempfile = "3.24"
axum.workspace = true

[features]
default = []
llm-judge-openai = ["reqwest"]
llm-judge-openai = []

[lints]
workspace = true
130 changes: 130 additions & 0 deletions crates/clawdstrike/src/async_guards/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::{Duration, Instant};

use dashmap::DashMap;

use crate::guards::GuardResult;

struct CacheEntry {
bytes: Vec<u8>,
expires_at: Instant,
size: usize,
}

/// A small in-memory TTL cache with best-effort LRU eviction.
///
/// This is intentionally simple: it is a safety/performance primitive, not a general-purpose cache.
pub struct TtlCache {
map: DashMap<String, CacheEntry>,
order: Mutex<VecDeque<String>>,
max_bytes: usize,
total_bytes: AtomicUsize,
}

impl TtlCache {
pub fn new(max_bytes: usize) -> Self {
Self {
map: DashMap::new(),
order: Mutex::new(VecDeque::new()),
max_bytes,
total_bytes: AtomicUsize::new(0),
}
}

pub fn get_guard_result(&self, key: &str) -> Option<GuardResult> {
let now = Instant::now();

let entry = self.map.get(key)?;
if entry.expires_at <= now {
drop(entry);
self.remove(key);
return None;
}

// Best-effort LRU: move key to back.
self.touch(key);

serde_json::from_slice::<GuardResult>(&entry.bytes).ok()
}

pub fn set_guard_result(&self, key: String, value: &GuardResult, ttl: Duration) {
let bytes = match serde_json::to_vec(value) {
Ok(v) => v,
Err(_) => return,
};

let size = bytes.len();
if size > self.max_bytes {
// Single entry too large; do not cache.
return;
}

let expires_at = Instant::now() + ttl;

// Remove existing entry to keep accounting sane.
self.remove(&key);

self.map.insert(
key.clone(),
CacheEntry {
bytes,
expires_at,
size,
},
);

self.total_bytes.fetch_add(size, Ordering::Relaxed);
self.push_key(key);
self.evict_if_needed();
}

fn remove(&self, key: &str) {
if let Some((_, entry)) = self.map.remove(key) {
self.total_bytes.fetch_sub(entry.size, Ordering::Relaxed);
}

let mut order = self.order_lock();
if let Some(pos) = order.iter().position(|k| k == key) {
order.remove(pos);
}
}

fn push_key(&self, key: String) {
let mut order = self.order_lock();
order.push_back(key);
}

fn touch(&self, key: &str) {
let mut order = self.order_lock();
if let Some(pos) = order.iter().position(|k| k == key) {
let k = order.remove(pos).unwrap_or_default();
order.push_back(k);
}
}

fn evict_if_needed(&self) {
while self.total_bytes.load(Ordering::Relaxed) > self.max_bytes {
let oldest = {
let mut order = self.order_lock();
order.pop_front()
};

let Some(key) = oldest else {
break;
};

if let Some((_, entry)) = self.map.remove(&key) {
self.total_bytes.fetch_sub(entry.size, Ordering::Relaxed);
}
}
}

fn order_lock(&self) -> std::sync::MutexGuard<'_, VecDeque<String>> {
match self.order.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
}
108 changes: 108 additions & 0 deletions crates/clawdstrike/src/async_guards/circuit_breaker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::time::{Duration, Instant};

use tokio::sync::Mutex;

use crate::async_guards::types::CircuitBreakerConfig;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum BreakerState {
Closed,
Open,
HalfOpen,
}

#[derive(Debug)]
struct Inner {
state: BreakerState,
failures: u32,
successes: u32,
opened_at: Option<Instant>,
}

#[derive(Debug)]
pub struct CircuitBreaker {
cfg: CircuitBreakerConfig,
inner: Mutex<Inner>,
}

impl CircuitBreaker {
pub fn new(cfg: CircuitBreakerConfig) -> Self {
Self {
cfg,
inner: Mutex::new(Inner {
state: BreakerState::Closed,
failures: 0,
successes: 0,
opened_at: None,
}),
}
}

pub async fn before_request(&self) -> Result<(), Duration> {
let mut inner = self.inner.lock().await;

match inner.state {
BreakerState::Closed => Ok(()),
BreakerState::HalfOpen => Ok(()),
BreakerState::Open => {
let Some(opened_at) = inner.opened_at else {
inner.opened_at = Some(Instant::now());
return Err(self.cfg.reset_timeout);
};

let elapsed = Instant::now().duration_since(opened_at);
if elapsed >= self.cfg.reset_timeout {
// Allow a trial request.
inner.state = BreakerState::HalfOpen;
inner.successes = 0;
inner.failures = 0;
Ok(())
} else {
Err(self.cfg.reset_timeout.saturating_sub(elapsed))
}
}
}
}

pub async fn record_success(&self) {
let mut inner = self.inner.lock().await;

match inner.state {
BreakerState::Closed => {
inner.failures = 0;
}
BreakerState::HalfOpen => {
inner.successes = inner.successes.saturating_add(1);
if inner.successes >= self.cfg.success_threshold {
inner.state = BreakerState::Closed;
inner.failures = 0;
inner.successes = 0;
inner.opened_at = None;
}
}
BreakerState::Open => {}
}
}

pub async fn record_failure(&self) {
let mut inner = self.inner.lock().await;

match inner.state {
BreakerState::Closed => {
inner.failures = inner.failures.saturating_add(1);
if inner.failures >= self.cfg.failure_threshold {
inner.state = BreakerState::Open;
inner.opened_at = Some(Instant::now());
inner.successes = 0;
}
}
BreakerState::HalfOpen => {
inner.state = BreakerState::Open;
inner.opened_at = Some(Instant::now());
inner.failures = 0;
inner.successes = 0;
}
BreakerState::Open => {}
}
}
}
Loading