Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ DATABASE_URL=postgres://pguser:pgpassword@localhost:5434/sp_uf
DMOB_DATABASE_URL=postgres://pguser:pgpassword@localhost:5434/sp_uf

# URL for GLIF RPC endpoint
GLIF_URL=https://api.node.glif.io/rpc/v1
GLIF_URL=https://api.node.glif.io/rpc/v1

PROXY_URL=
PROXY_USER=
PROXY_PASSWORD=
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target
z_notes
dump
.env
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions url_finder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ alloy = { version = "1.0.41", default-features = false, features = ["sol-types",
urlencoding = "2.1.3"
multiaddr = "0.18.2"
dotenvy = "0.15.7"
rand = "0.9.2"

[dev-dependencies]
wiremock = "0.6.5"
Expand Down
12 changes: 12 additions & 0 deletions url_finder/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ pub struct Config {
pub log_level: String,
pub glif_url: String,
pub cid_contact_url: String,
pub proxy_url: Option<String>,
pub proxy_user: Option<String>,
pub proxy_password: Option<String>,
pub proxy_ip_count: Option<usize>,
}

impl Config {
Expand All @@ -32,6 +36,10 @@ impl Config {
glif_url: env::var("GLIF_URL").unwrap_or("https://api.node.glif.io/rpc/v1".to_string()),
cid_contact_url: env::var("CID_CONTACT_URL")
.unwrap_or("https://cid.contact".to_string()),
proxy_url: env::var("PROXY_URL").unwrap_or("US".to_string()).into(),
proxy_user: env::var("PROXY_USER").ok(),
proxy_password: env::var("PROXY_PASSWORD").ok(),
proxy_ip_count: env::var("PROXY_IP_COUNT").ok().and_then(|s| s.parse().ok()),
})
}

Expand All @@ -43,6 +51,10 @@ impl Config {
log_level: "info".to_string(),
glif_url,
cid_contact_url,
proxy_password: None,
proxy_url: None,
proxy_user: None,
proxy_ip_count: None,
}
}
}
66 changes: 66 additions & 0 deletions url_finder/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crate::config::Config;
use rand::Rng;
use reqwest::{Client, Proxy};

const RETRI_TIMEOUT_SEC: u64 = 15;
static ATOMIC_PROXY_PORT: AtomicU32 = AtomicU32::new(8001);
static ATOMIC_PROXY_LAST_CHANGE: AtomicU64 = AtomicU64::new(0);

fn get_sticky_port_atomic(ip_count: u32) -> u16 {
// if no proxy ip count configured, return default port
if ip_count == 0 {
return 8000;
}

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

let last = ATOMIC_PROXY_LAST_CHANGE.load(Ordering::Relaxed);

let expired = now - last > 24 * 3600; // rotate every 24 hours

if expired {
let start = 8001;
let end = start + ip_count - 1;

let mut rng = rand::rng();
let new_port = rng.random_range(start..=end);

ATOMIC_PROXY_PORT.store(new_port, Ordering::Relaxed);
ATOMIC_PROXY_LAST_CHANGE.store(now, Ordering::Relaxed);

return new_port as u16;
}

ATOMIC_PROXY_PORT.load(Ordering::Relaxed) as u16
}

pub fn build_client(config: &Config) -> Result<Client, reqwest::Error> {
let mut builder = Client::builder().timeout(std::time::Duration::from_secs(RETRI_TIMEOUT_SEC));

if let (Some(proxy_url), Some(proxy_user), Some(proxy_password), Some(proxy_ip_count)) = (
&config.proxy_url,
&config.proxy_user,
&config.proxy_password,
&config.proxy_ip_count,
) {
let ip_count = *proxy_ip_count as u32;

let port = get_sticky_port_atomic(ip_count);
let proxy_url_result = format!("{}:{}", proxy_url, port);

println!("Using proxy: {}", proxy_url_result);

let proxy = (Proxy::http(proxy_url_result))?.basic_auth(proxy_user, proxy_password);
builder = builder
.proxy(proxy)
.pool_idle_timeout(Duration::from_secs(60 * 60 * 24));
}

builder.build()
}
1 change: 1 addition & 0 deletions url_finder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod api_response;
pub mod background;
mod cid_contact;
pub mod config;
mod http_client;
mod lotus_rpc;
mod multiaddr_parser;
mod pix_filspark;
Expand Down
2 changes: 1 addition & 1 deletion url_finder/src/services/url_discovery_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub async fn discover_url(
);
debug!("Testing URLs: {:?}", urls);
let (working_url, retrievability_percent) =
url_tester::check_retrievability_with_get(urls, true).await;
url_tester::check_retrievability_with_get(config, urls, true).await;
debug!(
"URL test result - working_url: {:?}, retrievability: {:?}",
working_url, retrievability_percent
Expand Down
19 changes: 9 additions & 10 deletions url_finder/src/url_tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use futures::{StreamExt, stream};
use reqwest::Client;
use tracing::{debug, info};

use crate::{config::Config, http_client::build_client};

const FILTER_CONCURENCY_LIMIT: usize = 5;
const RETRI_CONCURENCY_LIMIT: usize = 20;
const RETRI_TIMEOUT_SEC: u64 = 15;

/// return first working url through head requests
/// let's keep both head and get versions for now
Expand Down Expand Up @@ -61,11 +62,11 @@ pub async fn filter_working_with_head(urls: Vec<String>) -> Option<String> {
/// return retrivable percent of the urls
/// let's keep both head and get versions for now
#[allow(dead_code)]
pub async fn get_retrivability_with_head(urls: Vec<String>) -> (Option<String>, f64) {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(RETRI_TIMEOUT_SEC))
.build()
.unwrap();
pub async fn get_retrivability_with_head(
config: &Config,
urls: Vec<String>,
) -> (Option<String>, f64) {
let client: Client = build_client(config).unwrap();
let success_counter = Arc::new(AtomicUsize::new(0));
let total_counter = Arc::new(AtomicUsize::new(0));

Expand Down Expand Up @@ -121,13 +122,11 @@ pub async fn get_retrivability_with_head(urls: Vec<String>) -> (Option<String>,
}

pub async fn check_retrievability_with_get(
config: &Config,
urls: Vec<String>,
with_stats: bool,
) -> (Option<String>, Option<f64>) {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(RETRI_TIMEOUT_SEC))
.build()
.expect("Failed to build reqwest client");
let client = build_client(config).unwrap();

let success_counter = Arc::new(AtomicUsize::new(0));
let total_counter = Arc::new(AtomicUsize::new(0));
Expand Down