Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ prometheus-client = "0.22.3"
[features]
prom = ["metrics"]
bench = []
permissioned = []
31 changes: 27 additions & 4 deletions application/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use commonware_consensus::types::{Epoch, Epocher, Round, View};
use commonware_cryptography::bls12381::primitives::variant::Variant;
use commonware_cryptography::{PublicKey, Signer};
use std::marker::PhantomData;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
#[cfg(feature = "permissioned")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use summit_finalizer::FinalizerMailbox;
use tracing::{debug, info, warn};

Expand All @@ -47,6 +47,8 @@ pub struct Actor<
genesis_hash: [u8; 32],
epocher: ES,
cancellation_token: CancellationToken,
#[cfg(feature = "permissioned")]
paused: Arc<AtomicBool>,
_scheme_marker: PhantomData<S>,
_key_marker: PhantomData<P>,
_signer_marker: PhantomData<K>,
Expand Down Expand Up @@ -77,6 +79,8 @@ impl<
genesis_hash,
epocher: cfg.epocher,
cancellation_token: cfg.cancellation_token,
#[cfg(feature = "permissioned")]
paused: cfg.paused,
_scheme_marker: PhantomData,
_key_marker: PhantomData,
_signer_marker: PhantomData,
Expand Down Expand Up @@ -126,6 +130,12 @@ impl<
parent,
mut response,
} => {
#[cfg(feature = "permissioned")]
if self.paused.load(Ordering::Relaxed) {
warn!("consensus paused, skipping proposal for round {round}");
continue;
}

debug!("{rand_id} application: Handling message Propose for round {} (epoch {}, view {}), parent view: {}",
round, round.epoch(), round.view(), parent.0);

Expand Down Expand Up @@ -190,6 +200,12 @@ impl<
}
}
Message::Broadcast { payload: _ } => {
#[cfg(feature = "permissioned")]
if self.paused.load(Ordering::Relaxed) {
warn!("consensus paused, skipping broadcast");
continue;
}

info!("{rand_id} Handling message Broadcast");

let built_block = self.built_block.lock().expect("poisoned lock").take();
Expand All @@ -207,6 +223,13 @@ impl<
payload,
mut response,
} => {
#[cfg(feature = "permissioned")]
if self.paused.load(Ordering::Relaxed) {
warn!("consensus paused, rejecting verify for round {round}");
let _ = response.send(false);
continue;
}

debug!("{rand_id} application: Handling message Verify for round {} (epoch {}, view {}), parent view: {}",
round, round.epoch(), round.view(), parent.0);

Expand Down
9 changes: 9 additions & 0 deletions application/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use commonware_consensus::types::Epocher;
#[cfg(feature = "permissioned")]
use std::sync::Arc;
#[cfg(feature = "permissioned")]
use std::sync::atomic::AtomicBool;
use summit_types::EngineClient;
use tokio_util::sync::CancellationToken;

Expand All @@ -18,4 +22,9 @@ pub struct ApplicationConfig<C: EngineClient, ES: Epocher> {
pub epocher: ES,

pub cancellation_token: CancellationToken,

/// When true, the node will not participate in consensus
/// (skip proposals, reject verifications, skip broadcasts).
#[cfg(feature = "permissioned")]
pub paused: Arc<AtomicBool>,
}
1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,6 @@ prom = [
tokio-console = ["console-subscriber"]
jemalloc = ["dep:tikv-jemalloc-ctl"]
bench = ["summit-application/bench", "summit-types/bench"]
permissioned = ["summit-application/permissioned", "summit-rpc/permissioned"]
e2e = ["summit-types/e2e"]
bad-blocks = ["summit-types/bad-blocks"]
47 changes: 43 additions & 4 deletions node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ pub struct RunFlags {
/// When set, events emitted with target "critical" are written to files in this directory.
#[arg(long)]
pub critical_log_dir: Option<String>,

/// Path to a file containing the bearer token for admin RPC calls (pause, unpause).
/// The file should contain the token as a single line (whitespace is trimmed).
/// When set, these endpoints require an `Authorization: Bearer <token>` header.
#[arg(long)]
pub admin_token_file: Option<String>,
}

fn read_admin_token(path: &Option<String>) -> Option<String> {
let path = path.as_ref()?;
let contents = std::fs::read_to_string(path)
.unwrap_or_else(|e| panic!("failed to read admin token file {path}: {e}"));
let token = contents.trim().to_string();
if token.is_empty() {
panic!("admin token file {path} is empty");
}
Some(token)
}

impl Command {
Expand Down Expand Up @@ -411,6 +428,8 @@ impl Command {
Engine::new(context.with_label("engine"), config).await;

let finalizer_mailbox = engine.finalizer_mailbox.clone();
#[cfg(feature = "permissioned")]
let paused = engine.paused.clone();

// Start engine
let engine = engine.start(pending, recovered, resolver, broadcaster, backfiller);
Expand All @@ -421,10 +440,19 @@ impl Command {
// Start RPC server
let key_store_path = flags.key_store_path.clone();
let rpc_port = flags.rpc_port;
let admin_token = read_admin_token(&flags.admin_token_file);
let stop_signal = context.stopped();
let rpc_handle = context.with_label("rpc").spawn(move |_context| async move {
if let Err(e) =
start_rpc_server(finalizer_mailbox, key_store_path, rpc_port, stop_signal).await
if let Err(e) = start_rpc_server(
finalizer_mailbox,
key_store_path,
rpc_port,
stop_signal,
#[cfg(feature = "permissioned")]
paused,
admin_token,
)
.await
{
error!("RPC server failed: {}", e);
}
Expand Down Expand Up @@ -594,6 +622,8 @@ pub fn run_node_local(
let engine: Engine<_, _, _, _> = Engine::new(context.with_label("engine"), config).await;

let finalizer_mailbox = engine.finalizer_mailbox.clone();
#[cfg(feature = "permissioned")]
let paused = engine.paused.clone();
// Start engine
let engine = engine.start(pending, recovered, resolver, broadcaster, backfiller);

Expand All @@ -620,10 +650,19 @@ pub fn run_node_local(
// Start RPC server
let key_store_path = flags.key_store_path.clone();
let rpc_port = flags.rpc_port;
let admin_token = read_admin_token(&flags.admin_token_file);
let stop_signal = context.stopped();
let rpc_handle = context.with_label("rpc").spawn(move |_context| async move {
if let Err(e) =
start_rpc_server(finalizer_mailbox, key_store_path, rpc_port, stop_signal).await
if let Err(e) = start_rpc_server(
finalizer_mailbox,
key_store_path,
rpc_port,
stop_signal,
#[cfg(feature = "permissioned")]
paused,
admin_token,
)
.await
{
error!("RPC server failed: {}", e);
}
Expand Down
1 change: 1 addition & 0 deletions node/src/bin/protocol_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,5 +412,6 @@ fn get_node_flags(node: usize, genesis_path: &str) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}
1 change: 1 addition & 0 deletions node/src/bin/stake_and_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ fn get_node_flags(node: usize, genesis_path: &str) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions node/src/bin/stake_and_join_with_outdated_ckpt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ fn get_node_flags(node: usize, genesis_path: &str) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions node/src/bin/sync_from_genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,5 +682,6 @@ fn get_node_flags(node: usize, genesis_path: &str) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}
1 change: 1 addition & 0 deletions node/src/bin/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,5 +199,6 @@ fn get_node_flags(node: usize) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}
1 change: 1 addition & 0 deletions node/src/bin/verify_consensus_state_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,5 +559,6 @@ fn get_node_flags(node: usize, genesis_path: &str) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}
1 change: 1 addition & 0 deletions node/src/bin/withdraw_and_exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,5 +407,6 @@ fn get_node_flags(node: usize, genesis_path: &str) -> RunFlags {
ip: None,
bootstrappers: None,
critical_log_dir: None,
admin_token_file: None,
}
}
12 changes: 12 additions & 0 deletions node/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use governor::clock::Clock as GClock;
use rand::{CryptoRng, Rng};
use std::marker::PhantomData;
use std::num::NonZero;
#[cfg(feature = "permissioned")]
use std::sync::Arc;
#[cfg(feature = "permissioned")]
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use summit_application::ApplicationConfig;
use summit_finalizer::actor::Finalizer;
Expand Down Expand Up @@ -109,6 +113,8 @@ pub struct Engine<
sync_start: SyncStart,
checkpoint: Option<SyncCheckpoint<Block, MultisigScheme>>,
cancellation_token: CancellationToken,
#[cfg(feature = "permissioned")]
pub paused: Arc<AtomicBool>,
}

impl<
Expand Down Expand Up @@ -136,6 +142,8 @@ where
SummitSchemeProvider::new(private_scalar, cfg.namespace.as_bytes().to_vec());

let cancellation_token = CancellationToken::new();
#[cfg(feature = "permissioned")]
let paused = Arc::new(AtomicBool::new(false));

// create finalizer
let (finalizer, initial_state, finalizer_mailbox) = Finalizer::new(
Expand Down Expand Up @@ -174,6 +182,8 @@ where
genesis_hash: cfg.genesis_hash,
epocher: epocher.clone(),
cancellation_token: cancellation_token.clone(),
#[cfg(feature = "permissioned")]
paused: paused.clone(),
},
)
.await;
Expand Down Expand Up @@ -357,6 +367,8 @@ where
sync_start,
checkpoint,
cancellation_token,
#[cfg(feature = "permissioned")]
paused,
}
}

Expand Down
3 changes: 3 additions & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ hyper = "1.0"
bytes = { workspace = true }
http-body-util = { workspace = true }

[features]
permissioned = []

[dev-dependencies]
tempfile = "3"
tokio = { workspace = true, features = ["net", "rt-multi-thread", "macros"] }
Expand Down
15 changes: 14 additions & 1 deletion rpc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub trait SummitApi {
public_key: String,
) -> RpcResult<ValidatorAccountResponse>;

#[method(name = "getDepositSignature")]
#[method(name = "getDepositSignature", with_extensions)]
async fn get_deposit_signature(
&self,
amount: u64,
Expand Down Expand Up @@ -79,6 +79,19 @@ pub trait SummitApi {
) -> RpcResult<PendingWithdrawalResponse>;
}

#[cfg(feature = "permissioned")]
#[rpc(server, client)]
pub trait SummitPermissionedApi {
#[method(name = "pause", with_extensions)]
async fn pause(&self) -> RpcResult<bool>;

#[method(name = "unpause", with_extensions)]
async fn unpause(&self) -> RpcResult<bool>;

#[method(name = "isPaused")]
async fn is_paused(&self) -> RpcResult<bool>;
}

#[rpc(server, client)]
pub trait SummitProofApi {
#[method(name = "getStateRoot")]
Expand Down
53 changes: 53 additions & 0 deletions rpc/src/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// -- Bearer token extension (set by HTTP middleware, read by RPC middleware) --

/// Marker inserted into request extensions by HTTP middleware when a valid
/// `Authorization: Bearer <token>` header is present.
#[derive(Clone, Debug)]
pub struct BearerToken(pub String);

/// Tower layer that extracts `Authorization: Bearer <token>` from HTTP
/// request headers and inserts a [`BearerToken`] into request extensions.
#[derive(Clone)]
pub struct BearerTokenLayer;

impl<S> tower::Layer<S> for BearerTokenLayer {
type Service = BearerTokenService<S>;
fn layer(&self, inner: S) -> Self::Service {
BearerTokenService { inner }
}
}

#[derive(Clone)]
pub struct BearerTokenService<S> {
inner: S,
}

impl<S, B> tower::Service<http::Request<B>> for BearerTokenService<S>
where
S: tower::Service<http::Request<B>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
let token = req
.headers()
.get(http::header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.map(|t| t.to_string());

if let Some(token) = token {
req.extensions_mut().insert(BearerToken(token));
}
self.inner.call(req)
}
}
Loading
Loading