Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 85827ea

Browse files
bors[bot]psarna
andauthored
Merge #311
311: RFC: read-only access JWT tokens r=psarna a=psarna I wanted to quickly evaluate how we could add coarse-grained access control to sqld with JWT and came up with this short patch, I'd be grateful for any comments/opinions. The mechanism of authenticating JWT is extended a little bit. So far we only cared if the token is there and is valid. What JWT offers is an easily extensible mechanism for "claims", which is more or less just a JSON object which allows you to define properties that the token bearer holds. This patch introduces another JWT claim, with a working name "access". If this field is set to "read-only", `sqld` will refuse to execute any requests except ones categorized as reads. The change turned out to be extremely small and took ~30 minutes tests included, since thanks to `@MarinPostma's` work we already categorize queries anyway. The class of use cases I have in mind here are browser apps. We certainly don't want to push full access tokens to users' browsers, but it might be just fine to "leak" a read-only token that lets you access the database, but not modify it. To bring a concrete example, one of my demo apps I use for testing sqld (http://sorry.idont.date) would no longer need to run on Cloudflare Workers and could instead fetch data straight from the browser - far edge! Co-authored-by: Piotr Sarna <[email protected]>
2 parents 9a5c900 + 8604638 commit 85827ea

File tree

14 files changed

+221
-75
lines changed

14 files changed

+221
-75
lines changed

scripts/gen_jwt.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
}
4141
token = jwt.encode(claims, privkey_pem, "EdDSA")
4242

43+
claims["a"] = "ro"
44+
ro_token = jwt.encode(claims, privkey_pem, "EdDSA")
45+
4346
open("jwt_key.pem", "wb").write(pubkey_pem)
4447
open("jwt_key.base64", "wb").write(pubkey_base64)
45-
print(token)
48+
print(f"Full access: {token}")
49+
print(f"Read-only: {ro_token}")

sqld/proto/proxy.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,15 @@ message OrCond {
130130
repeated Cond conds = 1;
131131
}
132132

133+
enum Authorized {
134+
READONLY = 0;
135+
FULL = 1;
136+
}
137+
133138
message ProgramReq {
134139
string client_id = 1;
135140
Program pgm = 2;
141+
optional Authorized authorized = 3;
136142
}
137143

138144
service Proxy {

sqld/src/auth.rs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,28 @@ pub enum AuthError {
3737
Other,
3838
}
3939

40+
#[non_exhaustive]
41+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42+
pub enum Authorized {
43+
FullAccess,
44+
ReadOnly,
45+
}
46+
4047
/// A witness that the user has been authenticated.
41-
#[derive(Debug)]
42-
pub struct Authenticated(());
48+
#[non_exhaustive]
49+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50+
pub enum Authenticated {
51+
Anonymous,
52+
Authorized(Authorized),
53+
}
4354

4455
impl Auth {
4556
pub fn authenticate_http(
4657
&self,
4758
auth_header: Option<&hyper::header::HeaderValue>,
4859
) -> Result<Authenticated, AuthError> {
4960
if self.disabled {
50-
return Ok(Authenticated(()));
61+
return Ok(Authenticated::Authorized(Authorized::FullAccess));
5162
}
5263

5364
let Some(auth_header) = auth_header else {
@@ -64,7 +75,7 @@ impl Auth {
6475
let actual_value = actual_value.trim_end_matches('=');
6576
let expected_value = expected_value.trim_end_matches('=');
6677
if actual_value == expected_value {
67-
Ok(Authenticated(()))
78+
Ok(Authenticated::Authorized(Authorized::FullAccess))
6879
} else {
6980
Err(AuthError::BasicRejected)
7081
}
@@ -75,7 +86,7 @@ impl Auth {
7586

7687
pub fn authenticate_jwt(&self, jwt: Option<&str>) -> Result<Authenticated, AuthError> {
7788
if self.disabled {
78-
return Ok(Authenticated(()));
89+
return Ok(Authenticated::Authorized(Authorized::FullAccess));
7990
}
8091

8192
let Some(jwt) = jwt else {
@@ -128,8 +139,18 @@ fn validate_jwt(
128139
let mut validation = jsonwebtoken::Validation::new(jsonwebtoken::Algorithm::EdDSA);
129140
validation.required_spec_claims.remove("exp");
130141

131-
match jsonwebtoken::decode::<serde_json::Value>(jwt, jwt_key, &validation) {
132-
Ok(_token) => Ok(Authenticated(())),
142+
match jsonwebtoken::decode::<serde_json::Value>(jwt, jwt_key, &validation).map(|t| t.claims) {
143+
Ok(serde_json::Value::Object(claims)) => {
144+
tracing::trace!("Claims: {claims:#?}");
145+
Ok(match claims.get("a").and_then(|s| s.as_str()) {
146+
Some("ro") => Authenticated::Authorized(Authorized::ReadOnly),
147+
Some("rw") => Authenticated::Authorized(Authorized::FullAccess),
148+
Some(_) => Authenticated::Anonymous,
149+
// Backward compatibility - no access claim means full access
150+
None => Authenticated::Authorized(Authorized::FullAccess),
151+
})
152+
}
153+
Ok(_) => Err(AuthError::JwtInvalid),
133154
Err(error) => Err(match error.kind() {
134155
ErrorKind::InvalidToken
135156
| ErrorKind::InvalidSignature
@@ -201,10 +222,13 @@ mod tests {
201222
auth.authenticate_http(Some(&HeaderValue::from_str(header).unwrap()))
202223
}
203224

204-
const VALID_JWT: &str = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFZERTQSJ9.\
205-
eyJleHAiOjQ4MzEwOTI5NDh9.\
206-
TbPFJBxqb0fPPXj74DgmIZO41skmNEx-8b3PfAXv7IJMeLa3fNgBi7J5xxLm_-0SMEV3f6KMgUN0dBFbGRk4Ag";
207-
const VALID_JWT_KEY: &str = "3dwzg2D96T4GcyZkK4MezpRQxU321g7aTrUn1iwOF0s";
225+
const VALID_JWT_KEY: &str = "zaMv-aFGmB7PXkjM4IrMdF6B5zCYEiEGXW3RgMjNAtc";
226+
const VALID_JWT: &str = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.\
227+
eyJleHAiOjc5ODg0ODM4Mjd9.\
228+
MatB2aLnPFusagqH2RMoVExP37o2GFLmaJbmd52OdLtAehRNeqeJZPrefP1t2GBFidApUTLlaBRL6poKq_s3CQ";
229+
const VALID_READONLY_JWT: &str = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.\
230+
eyJleHAiOjc5ODg0ODM4MjcsImEiOiJybyJ9.\
231+
_2ZZiO2HC8b3CbCHSCufXXBmwpl-dLCv5O9Owvpy7LZ9aiQhXODpgV-iCdTsLQJ5FVanWhfn3FtJSnmWHn25DQ";
208232

209233
macro_rules! assert_ok {
210234
($e:expr) => {
@@ -268,6 +292,11 @@ mod tests {
268292
&auth,
269293
&format!("Bearer {}", &VALID_JWT[..80])
270294
));
295+
296+
assert_eq!(
297+
authenticate_http(&auth, &format!("Bearer {VALID_READONLY_JWT}")).unwrap(),
298+
Authenticated::Authorized(Authorized::ReadOnly)
299+
);
271300
}
272301

273302
#[test]

sqld/src/database/libsql.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ use rusqlite::{OpenFlags, StatementStatus};
77
use tokio::sync::oneshot;
88
use tracing::warn;
99

10+
use crate::auth::{Authenticated, Authorized};
1011
use crate::error::Error;
1112
use crate::libsql::wal_hook::WalHook;
1213
use crate::query::{Column, Query, QueryResponse, QueryResult, ResultSet, Row};
13-
use crate::query_analysis::{State, Statement};
14+
use crate::query_analysis::{State, Statement, StmtKind};
1415
use crate::stats::Stats;
1516
use crate::Result;
1617

@@ -344,9 +345,37 @@ fn eval_cond(cond: &Cond, results: &[Option<QueryResult>]) -> Result<bool> {
344345
})
345346
}
346347

348+
fn check_auth(auth: Authenticated, pgm: &Program) -> Result<()> {
349+
for step in pgm.steps() {
350+
let query = &step.query;
351+
match (query.stmt.kind, &auth) {
352+
(_, Authenticated::Anonymous) => {
353+
return Err(Error::NotAuthorized(
354+
"anonymous access not allowed".to_string(),
355+
));
356+
}
357+
(StmtKind::Read, Authenticated::Authorized(_)) => (),
358+
(StmtKind::TxnBegin, _) | (StmtKind::TxnEnd, _) => (),
359+
(_, Authenticated::Authorized(Authorized::FullAccess)) => (),
360+
_ => {
361+
return Err(Error::NotAuthorized(format!(
362+
"Current session is not authorized to run: {}",
363+
query.stmt.stmt
364+
)));
365+
}
366+
}
367+
}
368+
Ok(())
369+
}
370+
347371
#[async_trait::async_trait]
348372
impl Database for LibSqlDb {
349-
async fn execute_program(&self, pgm: Program) -> Result<(Vec<Option<QueryResult>>, State)> {
373+
async fn execute_program(
374+
&self,
375+
pgm: Program,
376+
auth: Authenticated,
377+
) -> Result<(Vec<Option<QueryResult>>, State)> {
378+
check_auth(auth, &pgm)?;
350379
let (resp, receiver) = oneshot::channel();
351380
let msg = Message { pgm, resp };
352381
let _ = self.sender.send(msg);

sqld/src/database/mod.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use crate::auth::Authenticated;
34
use crate::query::{Params, Query, QueryResult};
45
use crate::query_analysis::{State, Statement};
56
use crate::Result;
@@ -50,13 +51,17 @@ pub enum Cond {
5051
#[async_trait::async_trait]
5152
pub trait Database: Send + Sync {
5253
/// Executes a query program
53-
async fn execute_program(&self, pgm: Program) -> Result<(Vec<Option<QueryResult>>, State)>;
54+
async fn execute_program(
55+
&self,
56+
pgm: Program,
57+
auth: Authenticated,
58+
) -> Result<(Vec<Option<QueryResult>>, State)>;
5459

5560
/// Unconditionnaly execute a query as part of a program
56-
async fn execute_one(&self, query: Query) -> Result<(QueryResult, State)> {
61+
async fn execute_one(&self, query: Query, auth: Authenticated) -> Result<(QueryResult, State)> {
5762
let pgm = Program::new(vec![Step { cond: None, query }]);
5863

59-
let (results, state) = self.execute_program(pgm).await?;
64+
let (results, state) = self.execute_program(pgm, auth).await?;
6065
Ok((results.into_iter().next().unwrap().unwrap(), state))
6166
}
6267

@@ -66,6 +71,7 @@ pub trait Database: Send + Sync {
6671
async fn execute_batch_or_rollback(
6772
&self,
6873
batch: Vec<Query>,
74+
auth: Authenticated,
6975
) -> Result<(Vec<Option<QueryResult>>, State)> {
7076
let mut steps = make_batch_program(batch);
7177

@@ -86,7 +92,7 @@ pub trait Database: Send + Sync {
8692

8793
let pgm = Program::new(steps);
8894

89-
let (mut results, state) = self.execute_program(pgm).await?;
95+
let (mut results, state) = self.execute_program(pgm, auth).await?;
9096
// remove the rollback result
9197
results.pop();
9298

@@ -95,18 +101,25 @@ pub trait Database: Send + Sync {
95101

96102
/// Execute all the queries in the batch sequentially.
97103
/// If an query in the batch fails, the remaining queries are ignored
98-
async fn execute_batch(&self, batch: Vec<Query>) -> Result<(Vec<Option<QueryResult>>, State)> {
104+
async fn execute_batch(
105+
&self,
106+
batch: Vec<Query>,
107+
auth: Authenticated,
108+
) -> Result<(Vec<Option<QueryResult>>, State)> {
99109
let steps = make_batch_program(batch);
100110
let pgm = Program::new(steps);
101-
self.execute_program(pgm).await
111+
self.execute_program(pgm, auth).await
102112
}
103113

104-
async fn rollback(&self) -> Result<()> {
114+
async fn rollback(&self, auth: Authenticated) -> Result<()> {
105115
let (results, _) = self
106-
.execute_one(Query {
107-
stmt: Statement::parse("ROLLBACK").next().unwrap().unwrap(),
108-
params: Params::empty(),
109-
})
116+
.execute_one(
117+
Query {
118+
stmt: Statement::parse("ROLLBACK").next().unwrap().unwrap(),
119+
params: Params::empty(),
120+
},
121+
auth,
122+
)
110123
.await?;
111124

112125
results?;

sqld/src/database/write_proxy.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use tokio::sync::Mutex;
55
use tonic::transport::Channel;
66
use uuid::Uuid;
77

8+
use crate::auth::{Authenticated, Authorized};
89
use crate::error::Error;
910
use crate::query::{QueryResponse, QueryResult};
1011
use crate::query_analysis::State;
@@ -74,11 +75,18 @@ impl WriteProxyDatabase {
7475
&self,
7576
pgm: Program,
7677
state: &mut State,
78+
auth: Authenticated,
7779
) -> Result<(Vec<Option<QueryResult>>, State)> {
7880
let mut client = self.write_proxy.clone();
81+
let authorized: Option<i32> = match auth {
82+
Authenticated::Anonymous => None,
83+
Authenticated::Authorized(Authorized::ReadOnly) => Some(0),
84+
Authenticated::Authorized(Authorized::FullAccess) => Some(1),
85+
};
7986
let req = crate::rpc::proxy::rpc::ProgramReq {
8087
client_id: self.client_id.to_string(),
8188
pgm: Some(pgm.into()),
89+
authorized,
8290
};
8391
match client.execute(req).await {
8492
Ok(r) => {
@@ -110,21 +118,25 @@ impl WriteProxyDatabase {
110118

111119
#[async_trait::async_trait]
112120
impl Database for WriteProxyDatabase {
113-
async fn execute_program(&self, pgm: Program) -> Result<(Vec<Option<QueryResult>>, State)> {
121+
async fn execute_program(
122+
&self,
123+
pgm: Program,
124+
auth: Authenticated,
125+
) -> Result<(Vec<Option<QueryResult>>, State)> {
114126
let mut state = self.state.lock().await;
115127
if *state == State::Init && pgm.is_read_only() {
116128
// We know that this program won't perform any writes. We attempt to run it on the
117129
// replica. If it leaves an open transaction, then this program is an interactive
118130
// transaction, so we rollback the replica, and execute again on the primary.
119-
let (results, new_state) = self.read_db.execute_program(pgm.clone()).await?;
131+
let (results, new_state) = self.read_db.execute_program(pgm.clone(), auth).await?;
120132
if new_state != State::Init {
121-
self.read_db.rollback().await?;
122-
self.execute_remote(pgm, &mut state).await
133+
self.read_db.rollback(auth).await?;
134+
self.execute_remote(pgm, &mut state, auth).await
123135
} else {
124136
Ok((results, new_state))
125137
}
126138
} else {
127-
self.execute_remote(pgm, &mut state).await
139+
self.execute_remote(pgm, &mut state, auth).await
128140
}
129141
}
130142
}

sqld/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub enum Error {
2323
Internal(String),
2424
#[error("Invalid batch step: {0}")]
2525
InvalidBatchStep(usize),
26+
#[error("Not authorized to execute query: {0}")]
27+
NotAuthorized(String),
2628
}
2729

2830
impl From<tokio::sync::oneshot::error::RecvError> for Error {

sqld/src/hrana/batch.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::Result;
22

3+
use crate::auth::Authenticated;
34
use crate::database::{Cond, Database, Program, Step};
45

56
use super::proto;
@@ -71,11 +72,15 @@ fn batch_to_program(batch: &proto::Batch) -> Result<Program> {
7172
Ok(Program::new(steps))
7273
}
7374

74-
pub async fn execute_batch(db: &dyn Database, batch: &proto::Batch) -> Result<proto::BatchResult> {
75+
pub async fn execute_batch(
76+
db: &dyn Database,
77+
auth: Authenticated,
78+
batch: &proto::Batch,
79+
) -> Result<proto::BatchResult> {
7580
let pgm = batch_to_program(batch)?;
7681
let mut step_results = Vec::with_capacity(pgm.steps.len());
7782
let mut step_errors = Vec::with_capacity(pgm.steps.len());
78-
let (results, _state) = db.execute_program(pgm).await?;
83+
let (results, _state) = db.execute_program(pgm, auth).await?;
7984
for result in results {
8085
let (step_result, step_error) = match result {
8186
Some(Ok(r)) => (Some(proto_stmt_result_from_query_response(r)), None),

0 commit comments

Comments
 (0)