diff --git a/consensus/api/proto/consensus_config.proto b/consensus/api/proto/consensus_config.proto index 42a7667078..e8b7e886e5 100644 --- a/consensus/api/proto/consensus_config.proto +++ b/consensus/api/proto/consensus_config.proto @@ -69,4 +69,24 @@ message ConsensusNodeConfig { // SCP message signing key. external.Ed25519Public scp_message_signing_key = 8; + + // Maximum number of client session tracking structures to retain in + // a least-recently-used cache. + // + // This corresponds to Config::client_tracking_capacity + uint64 client_tracking_capacity = 9; + + // How many seconds to retain instances of proposed-transaction + // failures, per-client. This is used to implement DOS-protection, + // protecting against clients who make too many failed + // transaction proposals within this span. + uint64 tx_failure_window_seconds = 10; + + // How many tx proposal failures within the rolling window are required + // before it's treated as concerning, thereby tripping denial-of-service + // protection? + // In other words, how many failed transaction proposals are permitted + // within the last tx_failure_window_seconds before a user is + // disconnected or temporarily blocked? + uint32 tx_failure_limit = 11; } diff --git a/consensus/service/config/src/lib.rs b/consensus/service/config/src/lib.rs index e01c53598d..68e6d575da 100644 --- a/consensus/service/config/src/lib.rs +++ b/consensus/service/config/src/lib.rs @@ -136,6 +136,24 @@ pub struct Config { /// config setting to match. #[clap(long, default_value = "10000", env = "MC_CLIENT_TRACKING_CAPACITY")] pub client_tracking_capacity: usize, + + /// How many seconds to retain instances of proposed-transaction + /// failures, per-client. This is used to implement DOS-protection, + /// along the lines of kicking clients who make too many failed transaction + /// proposals within the span of tx_failure_window + // TODO: slam-testing to derive reasonable default + #[clap(long, default_value = "30", value_parser = parse_duration_in_seconds, env = "MC_CLIENT_TX_FAILURE_WINDOW")] + pub tx_failure_window: Duration, + + /// How many tx proposal failures within the rolling window are required + /// before it's treated as concerning, thereby tripping denial-of-service + /// protection? + /// In other words, how many failed transaction proposals are permitted + /// within the last tx_failure_window seconds before a user is + /// disconnected or temporarily blocked? + // TODO: slam-testing to derive reasonable default + #[clap(long, default_value = "16384", env = "MC_CLIENT_TX_FAILURE_LIMIT")] + pub tx_failure_limit: u32, } impl Config { @@ -224,6 +242,8 @@ mod tests { tokens_path: None, block_version: BlockVersion::ZERO, client_tracking_capacity: 4096, + tx_failure_window: Duration::from_secs(30), + tx_failure_limit: 16384, }; assert_eq!( @@ -293,6 +313,8 @@ mod tests { tokens_path: None, block_version: BlockVersion::ZERO, client_tracking_capacity: 4096, + tx_failure_window: Duration::from_secs(30), + tx_failure_limit: 16384, }; assert_eq!( diff --git a/consensus/service/src/api/client_api_service.rs b/consensus/service/src/api/client_api_service.rs index 89e726296e..386d9b8ccd 100644 --- a/consensus/service/src/api/client_api_service.rs +++ b/consensus/service/src/api/client_api_service.rs @@ -10,10 +10,13 @@ use crate::{ tx_manager::{TxManager, TxManagerError}, SVC_COUNTERS, }; -use grpcio::{RpcContext, RpcStatus, UnarySink}; +use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use mc_attest_api::attest::Message; use mc_attest_enclave_api::ClientSession; -use mc_common::{logger::Logger, LruCache}; +use mc_common::{ + logger::{log, Logger}, + LruCache, +}; use mc_consensus_api::{ consensus_client::{ProposeMintConfigTxResponse, ProposeMintTxResponse}, consensus_client_grpc::ConsensusClientApi, @@ -55,6 +58,17 @@ impl ClientSessionTracking { } } + pub fn get_proposetx_failures(&self) -> usize { + self.tx_proposal_failures.len() + } + + /// Remove any transaction proposal failure record that is older than our + /// tracking window. + fn clear_stale_records(&mut self, now: Instant, tracking_window: Duration) { + self.tx_proposal_failures + .retain(|past_failure| now.saturating_duration_since(*past_failure) <= tracking_window); + } + /// Push a new failed tx proposal record, clear out samples older than /// our tracking window, and return the number of tx failures remaining /// on the list - as-in, tells you "there have been x number of failures @@ -69,8 +83,7 @@ impl ClientSessionTracking { /// have existed for longer than this value will be dropped when this /// method is called. pub fn fail_tx_proposal(&mut self, now: Instant, tracking_window: Duration) -> usize { - self.tx_proposal_failures - .retain(|past_failure| now.saturating_duration_since(*past_failure) <= tracking_window); + self.clear_stale_records(now, tracking_window); self.tx_proposal_failures.push_back(now); self.tx_proposal_failures.len() } @@ -143,13 +156,14 @@ impl ClientApiService { // in pull request #3296 "Failure limit on tx proposals" let tracking_window = Duration::from_secs(60); let mut tracker = self.tracked_sessions.lock().expect("Mutex poisoned"); - if !tracker.contains(&session_id) { + let record = if let Some(record) = tracker.get_mut(&session_id) { + record + } else { tracker.put(session_id.clone(), ClientSessionTracking::new()); - } - let record = tracker - .get_mut(&session_id) - .expect("Session id {session_id} should be tracked."); - + tracker + .get_mut(&session_id) + .expect("Adding session-tracking record should be atomic.") + }; let _recent_failure_count = record.fail_tx_proposal(Instant::now(), tracking_window); // Dropping the client after a limit has been reached will be @@ -261,6 +275,10 @@ impl ClientApiService { response.set_block_version(*self.config.block_version); response.set_scp_message_signing_key((&self.config.msg_signer_key.public_key()).into()); + response.set_client_tracking_capacity(self.config.client_tracking_capacity as u64); + response.set_tx_failure_window_seconds(self.config.tx_failure_window.as_secs()); + response.set_tx_failure_limit(self.config.tx_failure_limit); + Ok(response) } } @@ -274,13 +292,54 @@ impl ConsensusClientApi for ClientApiService { ) { let _timer = SVC_COUNTERS.req(&ctx); + let session_id = ClientSession::from(msg.channel_id.clone()); + { - let session = ClientSession::from(msg.channel_id.clone()); let mut tracker = self.tracked_sessions.lock().expect("Mutex poisoned"); // Calling get() on the LRU bumps the entry to show up as more // recently-used. - if tracker.get(&session).is_none() { - tracker.put(session, ClientSessionTracking::new()); + if tracker.get(&session_id).is_none() { + tracker.put(session_id.clone(), ClientSessionTracking::new()); + } + + let session_info = tracker + .get(&session_id) + .expect("Session should be present after insert"); + let recent_failures = session_info.get_proposetx_failures() as u32; + if recent_failures >= self.config.tx_failure_limit { + log::debug!( + self.logger, + "Client has {} recent failed tx proposals within the \ + last {} seconds - dropping connection.", + recent_failures, + self.config.tx_failure_window.as_secs_f32() + ); + // Rate-limiting is performed at the auth endpoint, so + // merely dropping the connection will be enough. + let close_result = self.enclave.client_close(session_id.clone()); + // At the time of writing (30th March, 2023), it should + // only be possible for client_close() to error if a + // mutex is poisoned. However, because the + // implementation of this method might change, it + // seems wise to handle any error this might throw. + if let Err(e) = close_result { + log::error!( + self.logger, + "Failed to drop session {:?} due to: {:?}", + &session_id, + e + ); + } else { + let _ = tracker.pop(&session_id); + } + + // Send an error indicating the rate-limiting. + let rpc_code = RpcStatusCode::RESOURCE_EXHAUSTED; + let rpc_error = ConsensusGrpcError::RpcStatus(RpcStatus::new(rpc_code)); + let result: Result<_, RpcStatus> = rpc_error.into(); + + // Send the error and return early. + return send_result(ctx, sink, result, &self.logger); } } @@ -308,8 +367,21 @@ impl ConsensusClientApi for ClientApiService { ConsensusGrpcError::NotServing.into() } } else { - self.handle_proposed_tx(msg) - .or_else(ConsensusGrpcError::into) + let result = self.handle_proposed_tx(msg); + // The block present below rate-limits suspicious behavior. + if let Err(_err) = &result { + let mut tracker = self.tracked_sessions.lock().expect("Mutex poisoned"); + let record = if let Some(record) = tracker.get_mut(&session_id) { + record + } else { + tracker.put(session_id.clone(), ClientSessionTracking::new()); + tracker + .get_mut(&session_id) + .expect("Adding session-tracking record should be atomic.") + }; + record.fail_tx_proposal(Instant::now(), self.config.tx_failure_window); + } + result.or_else(ConsensusGrpcError::into) }; result = result.and_then(|mut response| { @@ -1825,4 +1897,112 @@ mod client_api_tests { .expect("Attempt to lock session-tracking mutex failed."); assert_eq!(tracker.len(), 1); } + + #[test_with_logger] + #[serial(counters)] + fn test_get_kicked_failure_limit(logger: Logger) { + let limit = 3; + + let mut consensus_enclave = MockConsensusEnclave::new(); + + let scp_client_value_sender = Arc::new( + |_value: ConsensusValue, + _node_id: Option<&NodeID>, + _responder_id: Option<&ResponderId>| { + // TODO: store inputs for inspection. + }, + ); + + const NUM_BLOCKS: u64 = 5; + let mut ledger = MockLedger::new(); + ledger + .expect_num_blocks() + .times(limit as usize) + .return_const(Ok(NUM_BLOCKS)); + + let tx_manager = MockTxManager::new(); + let is_serving_fn = Arc::new(|| -> bool { true }); + let authenticator = AnonymousAuthenticator::default(); + + const LRU_CAPACITY: usize = 4096; + let tracked_sessions = Arc::new(Mutex::new(LruCache::new(LRU_CAPACITY))); + + let mut config = get_config(); + // Permit only 3 failed transactions + config.tx_failure_limit = limit; + + // Cause the mock enclave to consistently fail each request. + consensus_enclave + .expect_client_tx_propose() + .times(limit as usize) + .return_const(Err(EnclaveError::MalformedTx( + TransactionValidationError::ContainsSpentKeyImage, + ))); + // Expect a close, since this will be exceeding our limit + consensus_enclave + .expect_client_close() + .times(1) + .return_const(Ok(())); + + let instance = ClientApiService::new( + config, + Arc::new(consensus_enclave), + scp_client_value_sender, + Arc::new(ledger), + Arc::new(tx_manager), + Arc::new(MockMintTxManager::new()), + is_serving_fn, + Arc::new(authenticator), + logger, + // Clone this, maintaining our own Arc reference into the tracked + // sessions structure so that we can inspect it later. + tracked_sessions.clone(), + ); + + // gRPC client and server. + let (client, _server) = get_client_server(instance); + let message = Message::default(); + + for _ in 0..limit { + let propose_tx_response = client + .client_tx_propose(&message) + .expect("Client tx propose error"); + assert_eq!( + propose_tx_response.get_result(), + ProposeTxResult::ContainsSpentKeyImage + ); + } + // No failed transaction proposals over the limit yet, so the session + // shouldn't have been dropped + { + let tracker = tracked_sessions + .lock() + .expect("Attempt to lock session-tracking mutex failed."); + assert_eq!(tracker.len(), 1); + let (_session_id, tracking_data) = tracker.iter().next().unwrap(); + assert_eq!(tracking_data.tx_proposal_failures.len(), limit as usize); + } + + let propose_tx_response = client.client_tx_propose(&message); + assert!(propose_tx_response.is_err()); + + match propose_tx_response { + Err(grpcio::Error::RpcFailure(rpc_status)) => { + assert_eq!(rpc_status.code(), RpcStatusCode::RESOURCE_EXHAUSTED); + } + _ => panic!( + "Unexpected response upon continuing to use\ + a rate-limited session: {propose_tx_response:?}" + ), + } + + let tracker = tracked_sessions + .lock() + .expect("Attempt to lock session-tracking mutex failed."); + // This session should have been dropped at this point. + assert_eq!(tracker.len(), 0); + + // Because of the behavior of Mockall, if this returns without calling + // client_close() exactly once, it will panic and the test will fail. + } }