Skip to content

Commit 8cf0115

Browse files
committed
feat: BYOC agent auth + install script + Helm chart
Enable customers to skip deploying the warpd control plane and connect their own compute directly to the WarpGrid cloud. Agents authenticate with tenant-scoped tokens (wg_agent_*) on cluster Join, binding nodes to the customer's namespace for placement isolation. - AgentTokenStore: issue/validate/revoke/list tokens (in-memory + libSQL) - cloud_agent_tokens DB table with SHA-256 hashed tokens - gRPC JoinRequest.auth_token field + server-side validation - TokenValidator trait (NoopTokenValidator for self-hosted, pluggable for cloud) - Namespace injected into node labels on authenticated join - REST API: POST/GET /api/v1/cloud/agent-tokens, POST .../revoke - CLI: --auth-token / WARPGRID_AGENT_TOKEN on warpd agent - deploy/agent/install.sh: curl|bash installer for bare metal Linux - deploy/helm/warpgrid-agent: Helm chart (DaemonSet on KVM nodes) https://claude.ai/code/session_01M7skTe44vjYKG2DiZ9HWy8
1 parent 1e3fc32 commit 8cf0115

File tree

20 files changed

+1034
-5
lines changed

20 files changed

+1034
-5
lines changed

crates/warpd/src/agent_mode.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub async fn run_agent(
4747
turso_url: Option<String>,
4848
turso_auth_token: Option<String>,
4949
sync_interval: u64,
50+
auth_token: Option<String>,
5051
) -> anyhow::Result<()> {
5152
info!(region = %region, "WarpGrid daemon starting in agent mode");
5253
std::fs::create_dir_all(&data_dir)?;
@@ -213,13 +214,17 @@ pub async fn run_agent(
213214
);
214215

215216
// ── Join cluster ──────────────────────────────────────────────
217+
if auth_token.is_some() {
218+
info!("BYOC mode: agent will authenticate with cloud control plane");
219+
}
216220
let agent_config = AgentConfig {
217221
control_plane_addr,
218222
address: address.clone(),
219223
port,
220224
labels: HashMap::from([("region".to_string(), region.clone())]),
221225
capacity_memory_bytes,
222226
capacity_cpu_weight,
227+
auth_token,
223228
};
224229

225230
let mut agent = NodeAgent::new(agent_config);

crates/warpd/src/cloud/admin.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,7 @@ mod tests {
832832
usage: UsageTracker::new(),
833833
logs: crate::cloud::routes::new_log_buffer(),
834834
admin_key: None,
835+
agent_tokens: crate::cloud::agent_tokens::AgentTokenStore::new(),
835836
}
836837
}
837838

Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
//! Agent token management for BYOC (bring-your-own-compute) deployments.
2+
//!
3+
//! Customers generate agent tokens in the console, then pass them to
4+
//! `warpd agent --auth-token <token>` on their infrastructure. The agent
5+
//! presents the token on cluster Join; the control plane validates it
6+
//! and binds the node to the owning namespace for tenant-scoped placement.
7+
//!
8+
//! Token format: `wg_agent_<32 hex chars>`
9+
10+
use rand::Rng;
11+
use serde::{Deserialize, Serialize};
12+
use sha2::{Digest, Sha256};
13+
use std::collections::HashMap;
14+
use std::sync::{Arc, RwLock};
15+
16+
/// A registered agent token.
17+
#[derive(Debug, Clone, Serialize, Deserialize)]
18+
pub struct AgentToken {
19+
pub id: String,
20+
pub namespace: String,
21+
pub name: String,
22+
pub revoked: bool,
23+
pub created_at: u64,
24+
pub last_used_at: Option<u64>,
25+
}
26+
27+
/// Result of validating an agent token.
28+
#[derive(Debug, Clone)]
29+
pub struct ValidatedAgent {
30+
pub token_id: String,
31+
pub namespace: String,
32+
}
33+
34+
/// Store for agent tokens with pluggable backend.
35+
#[derive(Clone)]
36+
pub struct AgentTokenStore {
37+
backend: AgentTokenBackend,
38+
}
39+
40+
#[derive(Clone)]
41+
enum AgentTokenBackend {
42+
Memory {
43+
tokens: Arc<RwLock<HashMap<String, AgentToken>>>,
44+
hash_to_id: Arc<RwLock<HashMap<String, String>>>,
45+
},
46+
LibSql {
47+
conn: libsql::Connection,
48+
},
49+
}
50+
51+
impl AgentTokenStore {
52+
/// Create an in-memory store (for tests).
53+
pub fn new() -> Self {
54+
Self {
55+
backend: AgentTokenBackend::Memory {
56+
tokens: Arc::new(RwLock::new(HashMap::new())),
57+
hash_to_id: Arc::new(RwLock::new(HashMap::new())),
58+
},
59+
}
60+
}
61+
62+
/// Create a persistent store backed by libSQL.
63+
pub fn with_libsql(conn: libsql::Connection) -> Self {
64+
Self {
65+
backend: AgentTokenBackend::LibSql { conn },
66+
}
67+
}
68+
69+
/// Issue a new agent token for a namespace. Returns the raw token (shown once).
70+
pub async fn issue(&self, namespace: &str, name: &str) -> anyhow::Result<(String, AgentToken)> {
71+
let raw_token = generate_agent_token();
72+
let token_hash = hash_token(&raw_token);
73+
let token_id = generate_token_id();
74+
let created_at = epoch_secs();
75+
76+
let token = AgentToken {
77+
id: token_id.clone(),
78+
namespace: namespace.to_string(),
79+
name: name.to_string(),
80+
revoked: false,
81+
created_at,
82+
last_used_at: None,
83+
};
84+
85+
match &self.backend {
86+
AgentTokenBackend::Memory {
87+
tokens,
88+
hash_to_id,
89+
} => {
90+
tokens.write().unwrap().insert(token_id.clone(), token.clone());
91+
hash_to_id.write().unwrap().insert(token_hash, token_id);
92+
}
93+
AgentTokenBackend::LibSql { conn } => {
94+
conn.execute(
95+
"INSERT INTO cloud_agent_tokens (id, namespace, token_hash, name, created_at) VALUES (?, ?, ?, ?, ?)",
96+
libsql::params![token.id.clone(), token.namespace.clone(), token_hash, token.name.clone(), created_at as i64],
97+
).await?;
98+
}
99+
}
100+
101+
Ok((raw_token, token))
102+
}
103+
104+
/// Validate a raw agent token. Returns the namespace if valid and not revoked.
105+
pub async fn validate(&self, raw_token: &str) -> Option<ValidatedAgent> {
106+
let token_hash = hash_token(raw_token);
107+
108+
match &self.backend {
109+
AgentTokenBackend::Memory {
110+
tokens,
111+
hash_to_id,
112+
} => {
113+
let id = hash_to_id.read().unwrap().get(&token_hash)?.clone();
114+
let token = tokens.read().unwrap().get(&id)?.clone();
115+
if token.revoked {
116+
return None;
117+
}
118+
// Update last_used_at.
119+
if let Some(t) = tokens.write().unwrap().get_mut(&id) {
120+
t.last_used_at = Some(epoch_secs());
121+
}
122+
Some(ValidatedAgent {
123+
token_id: token.id,
124+
namespace: token.namespace,
125+
})
126+
}
127+
AgentTokenBackend::LibSql { conn } => {
128+
let mut rows = conn
129+
.query(
130+
"SELECT id, namespace, revoked FROM cloud_agent_tokens WHERE token_hash = ?",
131+
libsql::params![token_hash.clone()],
132+
)
133+
.await
134+
.ok()?;
135+
136+
let row = rows.next().await.ok()??;
137+
let id: String = row.get(0).ok()?;
138+
let namespace: String = row.get(1).ok()?;
139+
let revoked: i64 = row.get(2).ok()?;
140+
141+
if revoked != 0 {
142+
return None;
143+
}
144+
145+
// Update last_used_at (best-effort).
146+
let _ = conn
147+
.execute(
148+
"UPDATE cloud_agent_tokens SET last_used_at = ? WHERE token_hash = ?",
149+
libsql::params![epoch_secs() as i64, token_hash],
150+
)
151+
.await;
152+
153+
Some(ValidatedAgent {
154+
token_id: id,
155+
namespace,
156+
})
157+
}
158+
}
159+
}
160+
161+
/// List all tokens for a namespace.
162+
pub async fn list(&self, namespace: &str) -> anyhow::Result<Vec<AgentToken>> {
163+
match &self.backend {
164+
AgentTokenBackend::Memory { tokens, .. } => {
165+
let all = tokens.read().unwrap();
166+
Ok(all
167+
.values()
168+
.filter(|t| t.namespace == namespace)
169+
.cloned()
170+
.collect())
171+
}
172+
AgentTokenBackend::LibSql { conn } => {
173+
let mut rows = conn
174+
.query(
175+
"SELECT id, namespace, name, revoked, created_at, last_used_at FROM cloud_agent_tokens WHERE namespace = ? ORDER BY created_at DESC",
176+
libsql::params![namespace.to_string()],
177+
)
178+
.await?;
179+
180+
let mut result = Vec::new();
181+
while let Some(row) = rows.next().await? {
182+
result.push(AgentToken {
183+
id: row.get(0)?,
184+
namespace: row.get(1)?,
185+
name: row.get(2)?,
186+
revoked: row.get::<i64>(3)? != 0,
187+
created_at: row.get::<i64>(4)? as u64,
188+
last_used_at: row.get::<Option<i64>>(5)?.map(|v| v as u64),
189+
});
190+
}
191+
Ok(result)
192+
}
193+
}
194+
}
195+
196+
/// Revoke a token by ID.
197+
pub async fn revoke(&self, namespace: &str, token_id: &str) -> anyhow::Result<bool> {
198+
match &self.backend {
199+
AgentTokenBackend::Memory { tokens, .. } => {
200+
let mut all = tokens.write().unwrap();
201+
if let Some(t) = all.get_mut(token_id) {
202+
if t.namespace != namespace {
203+
return Ok(false);
204+
}
205+
t.revoked = true;
206+
Ok(true)
207+
} else {
208+
Ok(false)
209+
}
210+
}
211+
AgentTokenBackend::LibSql { conn } => {
212+
let affected = conn
213+
.execute(
214+
"UPDATE cloud_agent_tokens SET revoked = 1 WHERE id = ? AND namespace = ?",
215+
libsql::params![token_id.to_string(), namespace.to_string()],
216+
)
217+
.await?;
218+
Ok(affected > 0)
219+
}
220+
}
221+
}
222+
}
223+
224+
/// Generate a random agent token: `wg_agent_<32 hex chars>`.
225+
fn generate_agent_token() -> String {
226+
let mut rng = rand::thread_rng();
227+
let bytes: [u8; 16] = rng.r#gen();
228+
format!("wg_agent_{}", hex::encode(bytes))
229+
}
230+
231+
/// Generate a random token ID.
232+
fn generate_token_id() -> String {
233+
let mut rng = rand::thread_rng();
234+
let bytes: [u8; 8] = rng.r#gen();
235+
format!("agt_{}", hex::encode(bytes))
236+
}
237+
238+
/// Hash a token with SHA-256 for storage.
239+
pub fn hash_token(raw: &str) -> String {
240+
let mut hasher = Sha256::new();
241+
hasher.update(raw.as_bytes());
242+
hex::encode(hasher.finalize())
243+
}
244+
245+
fn epoch_secs() -> u64 {
246+
std::time::SystemTime::now()
247+
.duration_since(std::time::UNIX_EPOCH)
248+
.unwrap_or_default()
249+
.as_secs()
250+
}
251+
252+
#[cfg(test)]
253+
mod tests {
254+
use super::*;
255+
256+
#[test]
257+
fn issue_and_validate_memory() {
258+
let store = AgentTokenStore::new();
259+
let rt = tokio::runtime::Runtime::new().unwrap();
260+
261+
let (raw, token) = rt.block_on(store.issue("acme", "prod-node-1")).unwrap();
262+
assert!(raw.starts_with("wg_agent_"));
263+
assert_eq!(raw.len(), 9 + 32); // "wg_agent_" + 32 hex
264+
assert_eq!(token.namespace, "acme");
265+
assert_eq!(token.name, "prod-node-1");
266+
267+
let validated = rt.block_on(store.validate(&raw)).unwrap();
268+
assert_eq!(validated.namespace, "acme");
269+
}
270+
271+
#[test]
272+
fn revoked_token_fails_validation() {
273+
let store = AgentTokenStore::new();
274+
let rt = tokio::runtime::Runtime::new().unwrap();
275+
276+
let (raw, token) = rt.block_on(store.issue("acme", "test")).unwrap();
277+
rt.block_on(store.revoke("acme", &token.id)).unwrap();
278+
279+
assert!(rt.block_on(store.validate(&raw)).is_none());
280+
}
281+
282+
#[test]
283+
fn invalid_token_returns_none() {
284+
let store = AgentTokenStore::new();
285+
let rt = tokio::runtime::Runtime::new().unwrap();
286+
assert!(rt.block_on(store.validate("wg_agent_bogus")).is_none());
287+
}
288+
289+
#[test]
290+
fn list_filters_by_namespace() {
291+
let store = AgentTokenStore::new();
292+
let rt = tokio::runtime::Runtime::new().unwrap();
293+
294+
rt.block_on(store.issue("acme", "node-1")).unwrap();
295+
rt.block_on(store.issue("acme", "node-2")).unwrap();
296+
rt.block_on(store.issue("other", "node-3")).unwrap();
297+
298+
let acme = rt.block_on(store.list("acme")).unwrap();
299+
assert_eq!(acme.len(), 2);
300+
}
301+
302+
#[test]
303+
fn revoke_wrong_namespace_fails() {
304+
let store = AgentTokenStore::new();
305+
let rt = tokio::runtime::Runtime::new().unwrap();
306+
307+
let (_, token) = rt.block_on(store.issue("acme", "test")).unwrap();
308+
let result = rt.block_on(store.revoke("other", &token.id)).unwrap();
309+
assert!(!result);
310+
}
311+
312+
#[tokio::test]
313+
async fn libsql_issue_and_validate() {
314+
let db = crate::cloud::db::open_memory().await.unwrap();
315+
let conn = db.connect().unwrap();
316+
crate::cloud::db::migrate(&conn).await.unwrap();
317+
318+
let store = AgentTokenStore::with_libsql(conn);
319+
let (raw, token) = store.issue("acme", "prod-1").await.unwrap();
320+
321+
assert!(raw.starts_with("wg_agent_"));
322+
assert_eq!(token.namespace, "acme");
323+
324+
let validated = store.validate(&raw).await.unwrap();
325+
assert_eq!(validated.namespace, "acme");
326+
}
327+
328+
#[tokio::test]
329+
async fn libsql_revoke_and_validate() {
330+
let db = crate::cloud::db::open_memory().await.unwrap();
331+
let conn = db.connect().unwrap();
332+
crate::cloud::db::migrate(&conn).await.unwrap();
333+
334+
let store = AgentTokenStore::with_libsql(conn);
335+
let (raw, token) = store.issue("acme", "test").await.unwrap();
336+
337+
store.revoke("acme", &token.id).await.unwrap();
338+
assert!(store.validate(&raw).await.is_none());
339+
}
340+
341+
#[tokio::test]
342+
async fn libsql_list() {
343+
let db = crate::cloud::db::open_memory().await.unwrap();
344+
let conn = db.connect().unwrap();
345+
crate::cloud::db::migrate(&conn).await.unwrap();
346+
347+
let store = AgentTokenStore::with_libsql(conn);
348+
store.issue("acme", "a").await.unwrap();
349+
store.issue("acme", "b").await.unwrap();
350+
store.issue("other", "c").await.unwrap();
351+
352+
let acme = store.list("acme").await.unwrap();
353+
assert_eq!(acme.len(), 2);
354+
}
355+
}

crates/warpd/src/cloud/console.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,7 @@ mod tests {
10311031
usage: UsageTracker::new(),
10321032
logs: crate::cloud::routes::new_log_buffer(),
10331033
admin_key: None,
1034+
agent_tokens: crate::cloud::agent_tokens::AgentTokenStore::new(),
10341035
}
10351036
}
10361037

0 commit comments

Comments
 (0)