From d53d528e951b099e2f258afa292aafc717c55564 Mon Sep 17 00:00:00 2001 From: Mariusz Reichert Date: Thu, 10 Apr 2025 23:11:50 +0200 Subject: [PATCH] RPC log IP anonymization --- Cargo.lock | 12 ++++---- Cargo.toml | 2 +- src/bin/electrs.rs | 59 ++++++++++++++++++++++++++++++++---- src/config.rs | 60 ++++++++++++++++++++----------------- src/electrum/server.rs | 68 ++++++++++++++++++++++++++---------------- tests/common.rs | 8 ++++- 6 files changed, 142 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d625d04f..695a530e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1038,7 +1038,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1639,7 +1639,7 @@ checksum = "e19b23d53f35ce9f56aebc7d1bb4e6ac1e9c0db7ac85c8d1760c04379edced37" dependencies = [ "hermit-abi 0.4.0", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1753,7 +1753,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -2620,7 +2620,7 @@ dependencies = [ "errno 0.3.10", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3170,7 +3170,7 @@ dependencies = [ "getrandom 0.3.1", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3927,7 +3927,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c60007ff1..7ec330366 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ arraydeque = "0.5.1" arrayref = "0.3.6" base64 = "0.22" bincode = "1.3.1" -bitcoin = { version = "0.32.4", features = ["serde"] } +bitcoin = { version = "0.32.4", features = ["serde", "rand"] } clap = "2.33.3" crossbeam-channel = "0.5.0" dirs = "5.0.1" diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 42f5cf024..247ddc6c3 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -6,10 +6,11 @@ extern crate electrs; use crossbeam_channel::{self as channel}; use error_chain::ChainedError; -use std::process; +use std::{env, process, thread}; use std::sync::{Arc, RwLock}; use std::time::Duration; - +use bitcoin::hex::DisplayHex; +use bitcoin::secp256k1::rand; use electrs::{ config::Config, daemon::Daemon, @@ -28,6 +29,9 @@ use electrs::otlp_trace; use electrs::elements::AssetRegistry; use electrs::metrics::MetricOpts; +/// Default salt rotation interval in seconds (24 hours) +const DEFAULT_SALT_ROTATION_INTERVAL_SECS: u64 = 24 * 3600; + fn fetch_from(config: &Config, store: &Store) -> FetchFrom { let mut jsonrpc_import = config.jsonrpc_import; if !jsonrpc_import { @@ -44,7 +48,7 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom { } } -fn run_server(config: Arc) -> Result<()> { +fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<()> { let (block_hash_notify, block_hash_receive) = channel::bounded(1); let signal = Waiter::start(block_hash_receive); let metrics = Metrics::new(config.monitoring_addr); @@ -116,7 +120,12 @@ fn run_server(config: Arc) -> Result<()> { // TODO: configuration for which servers to start let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query)); - let electrum_server = ElectrumRPC::start(Arc::clone(&config), Arc::clone(&query), &metrics); + let electrum_server = ElectrumRPC::start( + Arc::clone(&config), + Arc::clone(&query), + &metrics, + Arc::clone(&salt_rwlock), + ); let main_loop_count = metrics.gauge(MetricOpts::new( "electrs_main_loop_count", @@ -151,9 +160,49 @@ fn run_server(config: Arc) -> Result<()> { Ok(()) } +fn generate_salt() -> String { + let random_bytes: [u8; 32] = rand::random(); + random_bytes.to_lower_hex_string() +} + +fn rotate_salt(salt: &mut String) { + *salt = generate_salt(); +} + +fn get_salt_rotation_interval() -> Duration { + let var_name = "SALT_ROTATION_INTERVAL_SECS"; + let secs = env::var(var_name) + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_SALT_ROTATION_INTERVAL_SECS); + + Duration::from_secs(secs) +} + +fn spawn_salt_rotation_thread() -> Arc> { + let salt = generate_salt(); + let salt_rwlock = Arc::new(RwLock::new(salt)); + let writer_arc = Arc::clone(&salt_rwlock); + let interval = get_salt_rotation_interval(); + + thread::spawn(move || { + loop { + thread::sleep(interval); // 24 hours + { + let mut guard = writer_arc.write().unwrap(); + rotate_salt(&mut *guard); + info!("Salt rotated"); + } + } + }); + salt_rwlock +} + fn main_() { + let salt_rwlock = spawn_salt_rotation_thread(); + let config = Arc::new(Config::from_args()); - if let Err(e) = run_server(config) { + if let Err(e) = run_server(config, Arc::clone(&salt_rwlock)) { error!("server failed: {}", e.display_chain()); process::exit(1); } diff --git a/src/config.rs b/src/config.rs index 10fd174ca..a0abdda08 100644 --- a/src/config.rs +++ b/src/config.rs @@ -40,7 +40,7 @@ pub struct Config { pub utxos_limit: usize, pub electrum_txs_limit: usize, pub electrum_banner: String, - pub electrum_rpc_logging: Option, + pub rpc_logging: RpcLogging, pub zmq_addr: Option, /// Enable compaction during initial sync @@ -74,10 +74,6 @@ fn str_to_socketaddr(address: &str, what: &str) -> SocketAddr { impl Config { pub fn from_args() -> Config { let network_help = format!("Select network type ({})", Network::names().join(", ")); - let rpc_logging_help = format!( - "Select RPC logging option ({})", - RpcLogging::options().join(", ") - ); let args = App::new("Electrum Rust Server") .version(crate_version!()) @@ -201,10 +197,20 @@ impl Config { .help("Welcome banner for the Electrum server, shown in the console to clients.") .takes_value(true) ).arg( - Arg::with_name("electrum_rpc_logging") - .long("electrum-rpc-logging") - .help(&rpc_logging_help) - .takes_value(true), + Arg::with_name("enable_json_rpc_logging") + .long("enable-json-rpc-logging") + .help("turns on rpc logging") + .takes_value(false) + ).arg( + Arg::with_name("hide_json_rpc_logging_parameters") + .long("hide-json-rpc-logging-parameters") + .help("disables parameter printing in rpc logs") + .takes_value(false) + ).arg( + Arg::with_name("anonymize_json_rpc_logging_source_ip") + .long("anonymize-json-rpc-logging-source-ip") + .help("enables ip anonymization in rpc logs") + .takes_value(false) ).arg( Arg::with_name("initial_sync_compaction") .long("initial-sync-compaction") @@ -427,9 +433,15 @@ impl Config { electrum_rpc_addr, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, - electrum_rpc_logging: m - .value_of("electrum_rpc_logging") - .map(|option| RpcLogging::from(option)), + rpc_logging: { + let params = RpcLogging { + enabled: m.is_present("enable_json_rpc_logging"), + hide_params: m.is_present("hide_json_rpc_logging_parameters"), + anonymize_ip: m.is_present("anonymize_json_rpc_logging_source_ip"), + }; + params.validate(); + params + }, http_addr, http_socket_file, monitoring_addr, @@ -471,25 +483,17 @@ impl Config { } } -#[derive(Debug, Clone)] -pub enum RpcLogging { - Full, - NoParams, +#[derive(Debug, Default, Clone)] +pub struct RpcLogging { + pub enabled: bool, + pub hide_params: bool, + pub anonymize_ip: bool, } impl RpcLogging { - pub fn options() -> Vec { - return vec!["full".to_string(), "no-params".to_string()]; - } -} - -impl From<&str> for RpcLogging { - fn from(option: &str) -> Self { - match option { - "full" => RpcLogging::Full, - "no-params" => RpcLogging::NoParams, - - _ => panic!("unsupported RPC logging option: {:?}", option), + pub fn validate(&self) { + if !self.enabled && (self.hide_params || self.anonymize_ip) { + panic!("Flags '--hide-json-rpc-logging-parameters' or '--anonymize-json-rpc-logging-source-ip' require '--enable-json-rpc-logging'"); } } } diff --git a/src/electrum/server.rs b/src/electrum/server.rs index f3ae3db39..ea5579699 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time::Instant; @@ -19,7 +19,6 @@ use electrs_macros::trace; use bitcoin::consensus::encode::serialize_hex; #[cfg(feature = "liquid")] use elements::encode::serialize_hex; - use crate::chain::Txid; use crate::config::{Config, RpcLogging}; use crate::electrum::{get_electrum_height, ProtocolVersion}; @@ -95,7 +94,7 @@ fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option { - if $self.rpc_logging.is_some() { + if $self.rpc_logging.enabled { $self.log_rpc_event($event); } }; @@ -112,7 +111,8 @@ struct Connection { txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, - rpc_logging: Option, + rpc_logging: RpcLogging, + salt: String, } impl Connection { @@ -124,7 +124,8 @@ impl Connection { stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, - rpc_logging: Option, + rpc_logging: RpcLogging, + salt: String, ) -> Connection { Connection { query, @@ -138,6 +139,7 @@ impl Connection { #[cfg(feature = "electrum-discovery")] discovery, rpc_logging, + salt, } } @@ -525,11 +527,25 @@ impl Connection { Ok(result) } + fn hash_ip_with_salt(&self, ip: &str) -> String { + let mut hasher = Sha256::new(); + hasher.input(self.salt.as_bytes()); + hasher.input(ip.as_bytes()); + hasher.result_str() + } + fn log_rpc_event(&self, mut log: Value) { + let real_ip = self.addr.ip().to_string(); + let ip_to_log = if self.rpc_logging.anonymize_ip { + self.hash_ip_with_salt(&real_ip) + } else { + real_ip + }; + log.as_object_mut().unwrap().insert( "source".into(), json!({ - "ip": self.addr.ip().to_string(), + "ip": ip_to_log, "port": self.addr.port(), }), ); @@ -594,28 +610,20 @@ impl Connection { cmd.get("id"), ) { (Some(&Value::String(ref method)), &Value::Array(ref params), Some(ref id)) => { - conditionally_log_rpc_event!( - self, - json!({ - "event": "rpc request", - "id": id, - "method": method, - "params": if let Some(RpcLogging::Full) = self.rpc_logging { - json!(params) - } else { - Value::Null - } - }) - ); - let reply = self.handle_command(method, params, id)?; conditionally_log_rpc_event!( self, json!({ - "event": "rpc response", + "event": "rpc_response", "method": method, - "payload_size": reply.to_string().as_bytes().len(), + "params": if self.rpc_logging.hide_params { + Value::Null + } else { + json!(params) + }, + "request_size": serde_json::to_vec(&cmd).map(|v| v.len()).unwrap_or(0), + "response_size": reply.to_string().as_bytes().len(), "duration_micros": start_time.elapsed().as_micros(), "id": id, }) @@ -666,7 +674,7 @@ impl Connection { pub fn run(mut self, receiver: Receiver) { self.stats.clients.inc(); - conditionally_log_rpc_event!(self, json!({ "event": "connection established" })); + conditionally_log_rpc_event!(self, json!({ "event": "connection_established" })); let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); let sender = self.sender.clone(); @@ -684,7 +692,7 @@ impl Connection { .sub(self.status_hashes.len() as i64); debug!("[{}] shutting down connection", self.addr); - conditionally_log_rpc_event!(self, json!({ "event": "connection closed" })); + conditionally_log_rpc_event!(self, json!({ "event": "connection_closed" })); let _ = self.stream.shutdown(Shutdown::Both); if let Err(err) = child.join().expect("receiver panicked") { @@ -787,7 +795,12 @@ impl RPC { chan } - pub fn start(config: Arc, query: Arc, metrics: &Metrics) -> RPC { + pub fn start( + config: Arc, + query: Arc, + metrics: &Metrics, + salt_rwlock: Arc> + ) -> RPC { let stats = Arc::new(Stats { latency: metrics.histogram_vec( HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"), @@ -847,13 +860,15 @@ impl RPC { let query = Arc::clone(&query); let stats = Arc::clone(&stats); let garbage_sender = garbage_sender.clone(); - let rpc_logging = config.electrum_rpc_logging.clone(); + let rpc_logging = config.rpc_logging.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); let (sender, receiver) = mpsc::sync_channel(10); senders.lock().unwrap().push(sender.clone()); + let salt = salt_rwlock.read().unwrap().clone(); + let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); let conn = Connection::new( @@ -866,6 +881,7 @@ impl RPC { #[cfg(feature = "electrum-discovery")] discovery, rpc_logging, + salt, ); conn.run(receiver); info!("[{}] disconnected peer", addr); diff --git a/tests/common.rs b/tests/common.rs index e4a7e8015..2ec8b99c3 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -27,6 +27,7 @@ use electrs::{ rest, signal::Waiter, }; +use electrs::config::RpcLogging; pub struct TestRunner { config: Arc, @@ -38,6 +39,7 @@ pub struct TestRunner { daemon: Arc, mempool: Arc>, metrics: Metrics, + salt_rwlock: Arc>, } impl TestRunner { @@ -108,7 +110,7 @@ impl TestRunner { utxos_limit: 100, electrum_txs_limit: 100, electrum_banner: "".into(), - electrum_rpc_logging: None, + rpc_logging: RpcLogging::default(), zmq_addr: None, #[cfg(feature = "liquid")] @@ -179,6 +181,8 @@ impl TestRunner { None, // TODO )); + let salt_rwlock = Arc::new(RwLock::new(String::from("foobar"))); + Ok(TestRunner { config, node, @@ -188,6 +192,7 @@ impl TestRunner { daemon, mempool, metrics, + salt_rwlock, }) } @@ -280,6 +285,7 @@ pub fn init_electrum_tester() -> Result<(ElectrumRPC, net::SocketAddr, TestRunne Arc::clone(&tester.config), Arc::clone(&tester.query), &tester.metrics, + Arc::clone(&tester.salt_rwlock), ); log::info!( "Electrum server running on {}",