diff --git a/.env.example b/.env.example index a1fd43c..7202d7d 100644 --- a/.env.example +++ b/.env.example @@ -4,4 +4,8 @@ DATABASE_URL=postgres://pguser:pgpassword@localhost:5434/sp_uf DMOB_DATABASE_URL=postgres://pguser:pgpassword@localhost:5434/sp_uf # URL for GLIF RPC endpoint -GLIF_URL=https://api.node.glif.io/rpc/v1 \ No newline at end of file +GLIF_URL=https://api.node.glif.io/rpc/v1 + +PROXY_URL= +PROXY_USER= +PROXY_PASSWORD= diff --git a/.gitignore b/.gitignore index 367e00f..fc850a5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target z_notes dump +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c1f353f..f211594 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5889,6 +5889,7 @@ dependencies = [ "multiaddr", "once_cell", "pretty_assertions", + "rand 0.9.2", "regex", "reqwest", "reqwest-middleware", diff --git a/url_finder/Cargo.toml b/url_finder/Cargo.toml index b13b5d3..e966523 100644 --- a/url_finder/Cargo.toml +++ b/url_finder/Cargo.toml @@ -56,6 +56,7 @@ alloy = { version = "1.0.41", default-features = false, features = ["sol-types", urlencoding = "2.1.3" multiaddr = "0.18.2" dotenvy = "0.15.7" +rand = "0.9.2" [dev-dependencies] wiremock = "0.6.5" diff --git a/url_finder/src/config.rs b/url_finder/src/config.rs index 90ea8de..4cd9715 100644 --- a/url_finder/src/config.rs +++ b/url_finder/src/config.rs @@ -11,6 +11,10 @@ pub struct Config { pub log_level: String, pub glif_url: String, pub cid_contact_url: String, + pub proxy_url: Option, + pub proxy_user: Option, + pub proxy_password: Option, + pub proxy_ip_count: Option, } impl Config { @@ -32,6 +36,10 @@ impl Config { glif_url: env::var("GLIF_URL").unwrap_or("https://api.node.glif.io/rpc/v1".to_string()), cid_contact_url: env::var("CID_CONTACT_URL") .unwrap_or("https://cid.contact".to_string()), + proxy_url: env::var("PROXY_URL").unwrap_or("US".to_string()).into(), + proxy_user: env::var("PROXY_USER").ok(), + proxy_password: env::var("PROXY_PASSWORD").ok(), + proxy_ip_count: env::var("PROXY_IP_COUNT").ok().and_then(|s| s.parse().ok()), }) } @@ -43,6 +51,10 @@ impl Config { log_level: "info".to_string(), glif_url, cid_contact_url, + proxy_password: None, + proxy_url: None, + proxy_user: None, + proxy_ip_count: None, } } } diff --git a/url_finder/src/http_client.rs b/url_finder/src/http_client.rs new file mode 100644 index 0000000..3d8eb46 --- /dev/null +++ b/url_finder/src/http_client.rs @@ -0,0 +1,67 @@ +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use crate::config::Config; +use rand::Rng; +use reqwest::{Client, Proxy}; +use tracing::info; + +const RETRI_TIMEOUT_SEC: u64 = 15; +static ATOMIC_PROXY_PORT: AtomicU32 = AtomicU32::new(8001); +static ATOMIC_PROXY_LAST_CHANGE: AtomicU64 = AtomicU64::new(0); + +fn get_sticky_port_atomic(ip_count: u32) -> u16 { + // if no proxy ip count configured, return default port + if ip_count == 0 { + return 8000; + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let last = ATOMIC_PROXY_LAST_CHANGE.load(Ordering::Relaxed); + + let expired = now - last > 24 * 3600; // rotate every 24 hours + + if expired { + let start = 8001; + let end = start + ip_count - 1; + + let mut rng = rand::rng(); + let new_port = rng.random_range(start..=end); + + ATOMIC_PROXY_PORT.store(new_port, Ordering::Relaxed); + ATOMIC_PROXY_LAST_CHANGE.store(now, Ordering::Relaxed); + + return new_port as u16; + } + + ATOMIC_PROXY_PORT.load(Ordering::Relaxed) as u16 +} + +pub fn build_client(config: &Config) -> Result { + let mut builder = Client::builder().timeout(std::time::Duration::from_secs(RETRI_TIMEOUT_SEC)); + + if let (Some(proxy_url), Some(proxy_user), Some(proxy_password), Some(proxy_ip_count)) = ( + &config.proxy_url, + &config.proxy_user, + &config.proxy_password, + &config.proxy_ip_count, + ) { + let ip_count = *proxy_ip_count as u32; + + let port = get_sticky_port_atomic(ip_count); + let proxy_url_result = format!("{}:{}", proxy_url, port); + + info!("Using proxy: {}", proxy_url_result); + + let proxy = (Proxy::http(proxy_url_result))?.basic_auth(proxy_user, proxy_password); + builder = builder + .proxy(proxy) + .pool_idle_timeout(Duration::from_secs(60 * 60 * 24)); + } + + builder.build() +} diff --git a/url_finder/src/lib.rs b/url_finder/src/lib.rs index a5a90c7..f503188 100644 --- a/url_finder/src/lib.rs +++ b/url_finder/src/lib.rs @@ -7,6 +7,7 @@ pub mod api_response; pub mod background; mod cid_contact; pub mod config; +mod http_client; mod lotus_rpc; mod multiaddr_parser; mod pix_filspark; diff --git a/url_finder/src/services/url_discovery_service.rs b/url_finder/src/services/url_discovery_service.rs index 7a7be1e..ccc6783 100644 --- a/url_finder/src/services/url_discovery_service.rs +++ b/url_finder/src/services/url_discovery_service.rs @@ -119,7 +119,7 @@ pub async fn discover_url( ); debug!("Testing URLs: {:?}", urls); let (working_url, retrievability_percent) = - url_tester::check_retrievability_with_get(urls, true).await; + url_tester::check_retrievability_with_get(config, urls, true).await; debug!( "URL test result - working_url: {:?}, retrievability: {:?}", working_url, retrievability_percent diff --git a/url_finder/src/url_tester.rs b/url_finder/src/url_tester.rs index 209d9f0..6f3dfb9 100644 --- a/url_finder/src/url_tester.rs +++ b/url_finder/src/url_tester.rs @@ -7,9 +7,10 @@ use futures::{StreamExt, stream}; use reqwest::Client; use tracing::{debug, info}; +use crate::{config::Config, http_client::build_client}; + const FILTER_CONCURENCY_LIMIT: usize = 5; const RETRI_CONCURENCY_LIMIT: usize = 20; -const RETRI_TIMEOUT_SEC: u64 = 15; /// return first working url through head requests /// let's keep both head and get versions for now @@ -61,11 +62,11 @@ pub async fn filter_working_with_head(urls: Vec) -> Option { /// return retrivable percent of the urls /// let's keep both head and get versions for now #[allow(dead_code)] -pub async fn get_retrivability_with_head(urls: Vec) -> (Option, f64) { - let client = Client::builder() - .timeout(std::time::Duration::from_secs(RETRI_TIMEOUT_SEC)) - .build() - .unwrap(); +pub async fn get_retrivability_with_head( + config: &Config, + urls: Vec, +) -> (Option, f64) { + let client: Client = build_client(config).unwrap(); let success_counter = Arc::new(AtomicUsize::new(0)); let total_counter = Arc::new(AtomicUsize::new(0)); @@ -121,13 +122,11 @@ pub async fn get_retrivability_with_head(urls: Vec) -> (Option, } pub async fn check_retrievability_with_get( + config: &Config, urls: Vec, with_stats: bool, ) -> (Option, Option) { - let client = Client::builder() - .timeout(std::time::Duration::from_secs(RETRI_TIMEOUT_SEC)) - .build() - .expect("Failed to build reqwest client"); + let client = build_client(config).unwrap(); let success_counter = Arc::new(AtomicUsize::new(0)); let total_counter = Arc::new(AtomicUsize::new(0));