diff --git a/.sqlx/query-072675ef78b55b473101ad5e556529177403872c30a9f1f95829fcfe03a3cc68.json b/.sqlx/query-072675ef78b55b473101ad5e556529177403872c30a9f1f95829fcfe03a3cc68.json deleted file mode 100644 index db64f7f..0000000 --- a/.sqlx/query-072675ef78b55b473101ad5e556529177403872c30a9f1f95829fcfe03a3cc68.json +++ /dev/null @@ -1,54 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n id,\n \"dealId\" AS deal_id,\n \"claimId\" AS claim_id,\n \"clientId\" AS client_id,\n \"providerId\" AS provider_id,\n \"pieceCid\" AS piece_cid\n FROM unified_verified_deal\n WHERE \n \"providerId\" = $1\n ORDER BY id DESC\n LIMIT $2\n OFFSET $3\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int4" - }, - { - "ordinal": 1, - "name": "deal_id", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "claim_id", - "type_info": "Int4" - }, - { - "ordinal": 3, - "name": "client_id", - "type_info": "Varchar" - }, - { - "ordinal": 4, - "name": "provider_id", - "type_info": "Varchar" - }, - { - "ordinal": 5, - "name": "piece_cid", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [ - "Text", - "Int8", - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - true, - true, - true - ] - }, - "hash": "072675ef78b55b473101ad5e556529177403872c30a9f1f95829fcfe03a3cc68" -} diff --git a/.sqlx/query-7ef60661a9826e8e3e00158e7d018c465227cff3b07498d8793f310cf78135ef.json b/.sqlx/query-7ef60661a9826e8e3e00158e7d018c465227cff3b07498d8793f310cf78135ef.json new file mode 100644 index 0000000..a283a36 --- /dev/null +++ b/.sqlx/query-7ef60661a9826e8e3e00158e7d018c465227cff3b07498d8793f310cf78135ef.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT \n \"providerId\" AS provider_id\n FROM \n unified_verified_deal\n WHERE \n \"clientId\" = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "provider_id", + "type_info": "Varchar" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + true + ] + }, + "hash": "7ef60661a9826e8e3e00158e7d018c465227cff3b07498d8793f310cf78135ef" +} diff --git a/.sqlx/query-e130ef133dc7185f8dc726e7a08b14a1d39191f9c64ec351eeec3683a808228b.json b/.sqlx/query-e130ef133dc7185f8dc726e7a08b14a1d39191f9c64ec351eeec3683a808228b.json deleted file mode 100644 index 6494b77..0000000 --- a/.sqlx/query-e130ef133dc7185f8dc726e7a08b14a1d39191f9c64ec351eeec3683a808228b.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n id,\n \"dealId\" AS deal_id,\n \"claimId\" AS claim_id,\n \"clientId\" AS client_id,\n \"providerId\" AS provider_id,\n \"pieceCid\" AS piece_cid\n FROM unified_verified_deal\n WHERE \n \"providerId\" = $1\n AND \"clientId\" = $2\n ORDER BY id DESC\n LIMIT $3\n OFFSET $4\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int4" - }, - { - "ordinal": 1, - "name": "deal_id", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "claim_id", - "type_info": "Int4" - }, - { - "ordinal": 3, - "name": "client_id", - "type_info": "Varchar" - }, - { - "ordinal": 4, - "name": "provider_id", - "type_info": "Varchar" - }, - { - "ordinal": 5, - "name": "piece_cid", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [ - "Text", - "Text", - "Int8", - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - true, - true, - true - ] - }, - "hash": "e130ef133dc7185f8dc726e7a08b14a1d39191f9c64ec351eeec3683a808228b" -} diff --git a/url_finder/src/api/api_doc.rs b/url_finder/src/api/api_doc.rs index 9071952..a6ee305 100644 --- a/url_finder/src/api/api_doc.rs +++ b/url_finder/src/api/api_doc.rs @@ -34,6 +34,7 @@ The service is using [CID Contact](https://cid.contact) as source of HTTP entry handle_find_url_sp_client, handle_find_retri_by_client_and_sp, handle_find_retri_by_sp, + handle_find_client, handle_create_job, handle_get_job, @@ -52,6 +53,10 @@ The service is using [CID Contact](https://cid.contact) as source of HTTP entry FindRetriByClientAndSpPath, FindRetriByClientAndSpResponse, + // Client + FindByClientPath, + FindByClientResponse, + // Job CreateJobPayload, CreateJobResponse, diff --git a/url_finder/src/api/create_job.rs b/url_finder/src/api/create_job.rs index 943bbc9..1041e71 100644 --- a/url_finder/src/api/create_job.rs +++ b/url_finder/src/api/create_job.rs @@ -16,7 +16,7 @@ use super::ResultCode; #[derive(Deserialize, Serialize, ToSchema)] pub struct CreateJobPayload { - provider: String, + provider: Option, client: Option, } @@ -33,13 +33,14 @@ pub struct CreateJobResponse { id: Option, } -/// Create a job to find working urls or retrievability for a given SP and Client address +/// Create a job to find working urls or retrievability for +/// Either by SP or Client address or both #[utoipa::path( post, path = "/jobs", request_body(content = CreateJobPayload), description = r#" -**Create a job to find working urls or retrievability for a given SP and Client address** +**Create a job to find working urls or retrievability for either by SP or Client address or both** "#, responses( (status = 200, description = "Successful job creation", body = CreateJobResponse), @@ -60,28 +61,38 @@ pub async fn handle_create_job( &payload.provider, &payload.client ); - // validate provider and client addresses - let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); - if !address_pattern.is_match(&payload.provider) - || (payload.client.is_some() && !address_pattern.is_match(payload.client.as_ref().unwrap())) - { + if payload.provider.is_none() && payload.client.is_none() { return Err(bad_request( - "Invalid provider or client address".to_string(), + "Either provider, client address or both must be provided".to_string(), )); } - // Verify that we have http endpoint for the provider - let _ = match provider_endpoints::get_provider_endpoints(&payload.provider).await { - Ok((result_code, endpoints)) if endpoints.is_none() => { - debug!("No endpoints found"); - return Ok(ok_response(CreateJobResponse { - result: result_code, - id: None, - })); - } - Err(e) => return Err(internal_server_error(e.to_string())), - Ok(result) => result, - }; + if let Some(client) = &payload.client { + validate_address(client).map_err(|e| { + error!("Invalid client address: {}", e); + bad_request(e) + })?; + } + + if let Some(provider) = &payload.provider { + validate_address(provider).map_err(|e| { + error!("Invalid provider address: {}", e); + bad_request(e) + })?; + + // Verify that we have http endpoint for the provider + let _ = match provider_endpoints::get_provider_endpoints(provider).await { + Ok((result_code, endpoints)) if endpoints.is_none() => { + debug!("No endpoints found"); + return Ok(ok_response(CreateJobResponse { + result: result_code, + id: None, + })); + } + Err(e) => return Err(internal_server_error(e.to_string())), + Ok(result) => result, + }; + } let job = state .job_repo @@ -97,3 +108,11 @@ pub async fn handle_create_job( id: Some(job.id), })) } + +fn validate_address(address: &str) -> Result<(), String> { + let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); + if !address_pattern.is_match(address) { + return Err("Invalid provider or client address".to_string()); + } + Ok(()) +} diff --git a/url_finder/src/api/find_client.rs b/url_finder/src/api/find_client.rs new file mode 100644 index 0000000..ae618d7 --- /dev/null +++ b/url_finder/src/api/find_client.rs @@ -0,0 +1,207 @@ +use std::{sync::Arc, time::Duration}; + +use axum::{ + debug_handler, + extract::{Path, State}, +}; +use axum_extra::extract::WithRejection; +use color_eyre::Result; +use common::api_response::*; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use tokio::time::timeout; +use tracing::debug; +use utoipa::{IntoParams, ToSchema}; + +use crate::{deal_service, provider_endpoints, url_tester, AppState}; + +use super::ResultCode; + +#[derive(Deserialize, ToSchema, IntoParams)] +pub struct FindByClientPath { + pub client: String, +} + +#[derive(Serialize, ToSchema)] +pub struct ProviderResult { + pub provider: String, + pub result: ResultCode, + pub working_url: Option, + pub retrievability_percent: f64, +} + +#[derive(Serialize, ToSchema)] +pub struct FindByClientResponse { + pub client: String, + pub result: ResultCode, + pub providers: Vec, +} + +const RETRIEVABILITY_TIMEOUT_SEC: u64 = 60; // 1 min for each provider + +/// Find retrivabiliy of urls for a given SP and Client address +#[utoipa::path( + get, + path = "/url/client/{client}", + params (FindByClientPath), + description = r#" +**Find client SPs with working url and retrievabiliy of urls for for each found SP** + "#, + responses( + (status = 200, description = "Successful check", body = FindByClientResponse), + (status = 400, description = "Bad Request", body = ErrorResponse), + (status = 500, description = "Internal Server Error", body = ErrorResponse), + ), + tags = ["URL"], +)] +#[debug_handler] +pub async fn handle_find_client( + State(state): State>, + WithRejection(Path(path), _): WithRejection, ApiResponse>, +) -> Result, ApiResponse<()>> { + debug!( + "find client working url and retri for input client address: {:?}", + &path.client + ); + + // validate provider and client addresses + let address_pattern = Regex::new(r"^f0\d{1,8}$").unwrap(); + if !address_pattern.is_match(&path.client) { + return Err(bad_request( + "Invalid provider or client address".to_string(), + )); + } + + let providers = match deal_service::get_distinct_providers_by_client( + &state.deal_repo, + &path.client, + ) + .await + { + Ok(providers) => providers, + Err(e) => { + debug!( + "Failed to get providers for client {}: {:?}", + &path.client, e + ); + + return Err(internal_server_error(format!( + "Failed to get providers for client {0}", + path.client + ))); + } + }; + + if providers.is_empty() { + debug!("No providers found for client {}", &path.client); + + return Ok(ok_response(FindByClientResponse { + result: ResultCode::Error, + client: path.client.clone(), + providers: Vec::new(), + })); + } + + let mut results = Vec::new(); + + for provider in providers { + let (result_code, endpoints) = + match provider_endpoints::get_provider_endpoints(&provider).await { + Ok(endpoints) => endpoints, + Err(e) => return Err(internal_server_error(e.to_string())), + }; + + if endpoints.is_none() { + debug!("No endpoints found for provider {}", &provider); + + results.push(ProviderResult { + provider: provider.clone(), + result: result_code, + working_url: None, + retrievability_percent: 0.0, + }); + continue; + } + let endpoints = endpoints.unwrap(); + + let provider_db = provider.strip_prefix("f0").unwrap_or(&provider).to_string(); + let client = path + .client + .strip_prefix("f0") + .unwrap_or(&path.client) + .to_string(); + + let piece_ids = deal_service::get_random_piece_ids_by_provider_and_client( + &state.deal_repo, + &provider_db, + &client, + ) + .await + .map_err(|e| { + debug!("Failed to get piece ids: {:?}", e); + + internal_server_error("Failed to get piece ids") + })?; + + if piece_ids.is_empty() { + debug!("No deals found for provider {}", &provider); + + results.push(ProviderResult { + provider: provider.clone(), + result: ResultCode::NoDealsFound, + working_url: None, + retrievability_percent: 0.0, + }); + continue; + } + + let urls = deal_service::get_piece_url(endpoints, piece_ids).await; + let first_url = if !urls.is_empty() { + if urls[0].is_empty() { + None + } else { + Some(urls[0].clone()) + } + } else { + None + }; + + // Get retrievability percent + // Make sure that the task is not running for too long + let (_, retrievability_percent) = match timeout( + Duration::from_secs(RETRIEVABILITY_TIMEOUT_SEC), + url_tester::get_retrivability_with_head(urls), + ) + .await + { + Ok(result) => result, + Err(_) => { + debug!( + "Timeout while checking retrievability for provider {}", + &provider + ); + // In case of timeout + results.push(ProviderResult { + provider: provider.clone(), + result: ResultCode::TimedOut, + working_url: first_url, + retrievability_percent: 0.0, + }); + continue; + } + }; + + results.push(ProviderResult { + provider: provider.clone(), + result: ResultCode::Success, + working_url: first_url, + retrievability_percent, + }); + } + + Ok(ok_response(FindByClientResponse { + result: ResultCode::Success, + client: path.client.clone(), + providers: results, + })) +} diff --git a/url_finder/src/api/get_job.rs b/url_finder/src/api/get_job.rs index c019846..72876e5 100644 --- a/url_finder/src/api/get_job.rs +++ b/url_finder/src/api/get_job.rs @@ -43,11 +43,16 @@ pub async fn handle_get_job( State(state): State>, WithRejection(Path(path), _): WithRejection, ApiResponse>, ) -> Result, ApiResponse<()>> { - let job = state.job_repo.get_job(path.id).await.map_err(|e| { + let mut job = state.job_repo.get_job(path.id).await.map_err(|e| { error!("Error getting job: {:?}", e); not_found("Failed to get the job".to_string()) })?; + // Modify the job to include the first result's working URL and retrievability for FE compatibility + job.working_url = job.results.first().and_then(|r| r.working_url.clone()); + job.retrievability = job.results.first().map(|r| r.retrievability as i64); + job.result = job.results.first().map(|r| r.result.clone()); + Ok(ok_response(GetJobResponse { job })) } diff --git a/url_finder/src/api/mod.rs b/url_finder/src/api/mod.rs index 87b4273..55dfe12 100644 --- a/url_finder/src/api/mod.rs +++ b/url_finder/src/api/mod.rs @@ -10,6 +10,9 @@ pub use healthcheck::*; mod api_doc; pub use api_doc::*; +mod find_client; +pub use find_client::*; + mod find_retri_sp; pub use find_retri_sp::*; diff --git a/url_finder/src/api/responses.rs b/url_finder/src/api/responses.rs index 53dd186..0a0af35 100644 --- a/url_finder/src/api/responses.rs +++ b/url_finder/src/api/responses.rs @@ -19,6 +19,8 @@ pub enum ResultCode { #[allow(clippy::enum_variant_names)] #[derive(Serialize, ToSchema, Clone)] pub enum ErrorCode { + NoProviderOrClient, + NoProvidersFound, FailedToRetrieveCidContactData, FailedToGetPeerId, FailedToGetDeals, @@ -26,6 +28,8 @@ pub enum ErrorCode { impl fmt::Display for ErrorCode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let s = match self { + ErrorCode::NoProviderOrClient => "NoProviderOrClient", + ErrorCode::NoProvidersFound => "NoProvidersFound", ErrorCode::FailedToRetrieveCidContactData => "FailedToRetrieveCidContactData", ErrorCode::FailedToGetPeerId => "FailedToGetPeerId", ErrorCode::FailedToGetDeals => "FailedToGetDeals", diff --git a/url_finder/src/background/job_handler.rs b/url_finder/src/background/job_handler.rs index e40e881..83fc219 100644 --- a/url_finder/src/background/job_handler.rs +++ b/url_finder/src/background/job_handler.rs @@ -5,14 +5,38 @@ use tracing::{debug, info}; use crate::{ deal_repo::DealRepository, deal_service, provider_endpoints, url_tester, ErrorCode, Job, - JobRepository, ResultCode, + JobRepository, JobStatus, ResultCode, }; const LOOP_DELAY: Duration = Duration::from_secs(5); -pub(super) enum JobHandlerError { +pub struct JobFailed { + pub error: Option, + pub result: Option, + pub reason: String, +} + +pub struct JobSuccessResult { + pub provider: String, + pub client: Option, + pub working_url: Option, + pub retrievability: f64, + pub result: ResultCode, +} + +pub struct JobErrorResult { + pub provider: String, + pub client: Option, + pub error: Option, + pub result: Option, +} + +pub(super) enum JobHandlerResult { Skip(String), - FailedJob(Option, Option, String), + FailedJob(JobFailed), + ErrorResult(JobErrorResult), + SuccessResult(JobSuccessResult), + MultipleResults(Vec, Vec), } pub async fn job_handler(job_repo: Arc, deal_repo: Arc) { @@ -30,86 +54,242 @@ pub async fn job_handler(job_repo: Arc, deal_repo: Arc debug!("Job processed successfully"), - Err(JobHandlerError::Skip(reason)) => { + match process_pending_job(&deal_repo, &job).await { + JobHandlerResult::Skip(reason) => { debug!("Skipping job: {}", reason); continue; } - Err(JobHandlerError::FailedJob(result_code, error_code, reason)) => { - debug!("Failed job: {}", reason); - job_repo.fail_job(job.id, result_code, error_code).await; + JobHandlerResult::FailedJob(job_failed) => { + debug!("Failed job: {}, reason: {}", job.id, job_failed.reason); + job_repo + .fail_job(job.id, job_failed.result, job_failed.error) + .await; } + JobHandlerResult::ErrorResult(error_result) => { + debug!("Error processing job: {}", job.id); + job_repo + .add_error_result( + job.id, + error_result.provider, + error_result.client, + error_result.error, + error_result.result, + ) + .await; + + job_repo.set_status(job.id, JobStatus::Completed).await; + } + JobHandlerResult::SuccessResult(success_result) => { + debug!("Job completed successfully: {}", job.id); + job_repo + .add_success_result( + job.id, + success_result.provider, + success_result.client, + success_result.working_url, + success_result.retrievability, + success_result.result, + ) + .await; + + job_repo.set_status(job.id, JobStatus::Completed).await; + } + JobHandlerResult::MultipleResults(success_results, error_resuls) => { + debug!("Multiple results processed for job: {}", job.id); + + for success_result in success_results { + debug!( + "Job success result: provider: {}, client: {:?}, working_url: {:?}, retrievability: {}", + success_result.provider, + success_result.client, + success_result.working_url, + success_result.retrievability + ); + job_repo + .add_success_result( + job.id, + success_result.provider, + success_result.client, + success_result.working_url, + success_result.retrievability, + success_result.result, + ) + .await; + } + for error_result in error_resuls { + debug!( + "Job error result: provider: {}, client: {:?}", + error_result.provider, error_result.client, + ); + job_repo + .add_error_result( + job.id, + error_result.provider, + error_result.client, + error_result.error, + error_result.result, + ) + .await; + } + job_repo.set_status(job.id, JobStatus::Completed).await; + } + } + } +} + +async fn process_pending_job(deal_repo: &DealRepository, job: &Job) -> JobHandlerResult { + match (&job.provider, &job.client) { + (Some(provider), None) => process_job_with_provider(deal_repo, provider).await, + (None, Some(client)) => process_job_with_client(deal_repo, client).await, + (Some(provider), Some(client)) => { + process_job_with_provider_and_client(deal_repo, provider, client).await + } + (None, None) => { + // should not happen + JobHandlerResult::FailedJob(JobFailed { + error: Some(ErrorCode::NoProviderOrClient), + result: Some(ResultCode::Error), + reason: "No provider or client specified".to_string(), + }) } } } -async fn process_pending_job( - job_repo: &JobRepository, +async fn process_job_with_client(deal_repo: &DealRepository, client: &str) -> JobHandlerResult { + let providers = match deal_service::get_distinct_providers_by_client(deal_repo, client).await { + Ok(providers) => providers, + Err(e) => { + debug!("Failed to get providers for client {}: {:?}", client, e); + return JobHandlerResult::Skip(format!("Failed to get providers for client {client}")); + } + }; + + if providers.is_empty() { + return JobHandlerResult::FailedJob(JobFailed { + error: Some(ErrorCode::NoProvidersFound), + result: Some(ResultCode::Error), + reason: format!("No providers found for client: {client}"), + }); + } + + let mut success_results = Vec::new(); + let mut error_results = Vec::new(); + + for provider in providers { + debug!("Processing job with provider: {}", &provider); + + match process_job(deal_repo, &provider, Some(client)).await { + JobHandlerResult::SuccessResult(result) => success_results.push(result), + JobHandlerResult::ErrorResult(result) => error_results.push(result), + JobHandlerResult::Skip(reason) => { + return JobHandlerResult::Skip(reason); + } + JobHandlerResult::FailedJob(job_failed) => { + return JobHandlerResult::FailedJob(job_failed); + } + // should not happen here + JobHandlerResult::MultipleResults(_, _) => continue, + } + } + + JobHandlerResult::MultipleResults(success_results, error_results) +} + +async fn process_job_with_provider_and_client( deal_repo: &DealRepository, - job: &Job, -) -> Result<(), JobHandlerError> { - let (_, endpoints) = match provider_endpoints::get_provider_endpoints(&job.provider).await { + provider: &str, + client: &str, +) -> JobHandlerResult { + debug!( + "Processing job with provider: {} and client: {}", + provider, client + ); + + process_job(deal_repo, provider, Some(client)).await +} + +async fn process_job_with_provider(deal_repo: &DealRepository, provider: &str) -> JobHandlerResult { + debug!("Processing job with provider: {}", provider); + + process_job(deal_repo, provider, None).await +} + +async fn process_job( + deal_repo: &DealRepository, + provider: &str, + client: Option<&str>, +) -> JobHandlerResult { + let (_, endpoints) = match provider_endpoints::get_provider_endpoints(provider).await { Ok((result_code, _)) if result_code != ResultCode::Success => { - return Err(JobHandlerError::FailedJob( - Some(result_code), - None, - "Provider endpoints not found".to_string(), - )) + return JobHandlerResult::ErrorResult(JobErrorResult { + provider: provider.to_string(), + client: client.map(|c| c.to_string()), + result: Some(result_code), + error: None, + }); } Ok(result) => result, Err(error_code) => { - return Err(JobHandlerError::FailedJob( - Some(ResultCode::Error), - Some(error_code.clone()), - error_code.to_string(), - )) + return JobHandlerResult::ErrorResult(JobErrorResult { + provider: provider.to_string(), + client: client.map(|c| c.to_string()), + result: Some(ResultCode::Error), + error: Some(error_code), + }); } }; if endpoints.is_none() || endpoints.as_ref().unwrap().is_empty() { debug!("No endpoints found"); - return Err(JobHandlerError::FailedJob( - Some(ResultCode::NoDealsFound), - None, - "No endpoints found".to_string(), - )); + return JobHandlerResult::ErrorResult(JobErrorResult { + provider: provider.to_string(), + client: client.map(|c| c.to_string()), + result: Some(ResultCode::NoDealsFound), + error: None, + }); } let endpoints = endpoints.unwrap(); - let provider = job.provider.strip_prefix("f0").unwrap_or(&job.provider); - - let client = job - .client - .as_ref() - .map(|c| c.strip_prefix("f0").unwrap_or(c)); + let provider_db = provider.strip_prefix("f0").unwrap_or(provider); + let client_db = client.as_ref().map(|c| c.strip_prefix("f0").unwrap_or(c)); - let piece_ids = deal_service::get_piece_ids_by_provider(deal_repo, provider, client) - .await - .map_err(|e| { - debug!("Failed to get piece ids: {:?}", e); + let piece_ids = + match deal_service::get_piece_ids_by_provider(deal_repo, provider_db, client_db).await { + Ok(ids) => ids, + Err(e) => { + debug!("Failed to get piece ids: {:?}", e); - JobHandlerError::Skip("Failed to get piece ids".to_string()) - })?; + return JobHandlerResult::Skip("Failed to get piece ids".to_string()); + } + }; if piece_ids.is_empty() { debug!("No deals found"); - return Err(JobHandlerError::FailedJob( - Some(ResultCode::NoDealsFound), - None, - "No deals found".to_string(), - )); + return JobHandlerResult::ErrorResult(JobErrorResult { + provider: provider.to_string(), + client: client.map(|c| c.to_string()), + result: Some(ResultCode::NoDealsFound), + error: None, + }); } let urls = deal_service::get_piece_url(endpoints, piece_ids).await; let (working_url, retrievability) = url_tester::get_retrivability_with_head(urls).await; - job_repo - .update_job_result(job.id, working_url, retrievability, ResultCode::Success) - .await; + let result_code = if working_url.is_some() { + ResultCode::Success + } else { + ResultCode::FailedToGetWorkingUrl + }; - Ok(()) + JobHandlerResult::SuccessResult(JobSuccessResult { + provider: provider.to_string(), + client: client.map(|c| c.to_string()), + working_url, + retrievability, + result: result_code, + }) } diff --git a/url_finder/src/deal_repo.rs b/url_finder/src/deal_repo.rs index 91fc317..e2bac6a 100644 --- a/url_finder/src/deal_repo.rs +++ b/url_finder/src/deal_repo.rs @@ -17,6 +17,11 @@ pub struct UnifiedVerifiedDeal { pub piece_cid: Option, } +#[derive(Debug, Serialize, Deserialize)] +pub struct Provider { + pub provider_id: Option, +} + impl DealRepository { pub fn new(pool: PgPool) -> Self { Self { pool } @@ -41,7 +46,7 @@ impl DealRepository { FROM unified_verified_deal WHERE "providerId" = $1 - ORDER BY id DESC + ORDER BY random() LIMIT $2 OFFSET $3 "#, @@ -76,7 +81,7 @@ impl DealRepository { WHERE "providerId" = $1 AND "clientId" = $2 - ORDER BY id DESC + ORDER BY random() LIMIT $3 OFFSET $4 "#, @@ -159,4 +164,26 @@ impl DealRepository { Ok(data) } + + pub async fn get_distinct_providers_by_client( + &self, + client: &str, + ) -> Result, sqlx::Error> { + let data = sqlx::query_as!( + Provider, + r#" + SELECT DISTINCT + "providerId" AS provider_id + FROM + unified_verified_deal + WHERE + "clientId" = $1 + "#, + client, + ) + .fetch_all(&self.pool) + .await?; + + Ok(data) + } } diff --git a/url_finder/src/deal_service.rs b/url_finder/src/deal_service.rs index bee9922..b0c15d2 100644 --- a/url_finder/src/deal_service.rs +++ b/url_finder/src/deal_service.rs @@ -33,6 +33,34 @@ pub async fn get_piece_ids_by_provider( Ok(piece_ids) } +pub async fn get_distinct_providers_by_client( + deal_repo: &DealRepository, + client: &str, +) -> Result> { + let client_db = client.strip_prefix("f0").unwrap_or(client); + let deals = deal_repo + .get_distinct_providers_by_client(client_db) + .await?; + + if deals.is_empty() { + return Ok(vec![]); + } + + let providers: Vec = deals + .iter() + .filter_map(|deal| deal.provider_id.clone()) + .map(|provider| { + if !provider.starts_with("f0") { + format!("f0{provider}") + } else { + provider + } + }) + .collect(); + + Ok(providers) +} + pub async fn get_random_piece_ids_by_provider_and_client( deal_repo: &DealRepository, provider: &str, diff --git a/url_finder/src/lotus_rpc.rs b/url_finder/src/lotus_rpc.rs index be63dd4..012f098 100644 --- a/url_finder/src/lotus_rpc.rs +++ b/url_finder/src/lotus_rpc.rs @@ -4,6 +4,8 @@ use serde_json::json; use tracing::debug; pub async fn get_peer_id(address: &str) -> Result { + debug!("get_peer_id address: {}", address); + let client = Client::new(); let res = client .post("https://api.node.glif.io/rpc/v1") diff --git a/url_finder/src/repository/job_repo.rs b/url_finder/src/repository/job_repo.rs index 9a9703c..d964c9b 100644 --- a/url_finder/src/repository/job_repo.rs +++ b/url_finder/src/repository/job_repo.rs @@ -11,15 +11,27 @@ use uuid::Uuid; use crate::{ErrorCode, ResultCode}; +#[derive(Clone, Serialize, ToSchema)] +pub struct ProviderResult { + pub provider: String, + pub client: Option, + pub working_url: Option, + pub retrievability: f64, + pub result: ResultCode, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + #[derive(Clone, Serialize, ToSchema)] pub struct Job { pub id: Uuid, + // working_url and retrievability are kept for FE compatibility pub working_url: Option, pub retrievability: Option, - pub provider: String, + pub results: Vec, + pub provider: Option, pub client: Option, pub status: JobStatus, - #[serde(skip_serializing_if = "Option::is_none")] pub result: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, @@ -27,11 +39,12 @@ pub struct Job { pub updated_at: DateTime, } impl Job { - pub fn new(provider: String, client: Option) -> Self { + pub fn new(provider: Option, client: Option) -> Self { Self { id: Uuid::new_v4(), working_url: None, retrievability: None, + results: Vec::new(), provider, client, status: JobStatus::Pending, @@ -67,7 +80,11 @@ impl JobRepository { } } - pub async fn create_job(&self, provider: String, client: Option) -> Result { + pub async fn create_job( + &self, + provider: Option, + client: Option, + ) -> Result { let job = Job::new(provider, client); let mut db = self.db.write().unwrap(); @@ -76,9 +93,11 @@ impl JobRepository { Ok(job) } - pub async fn update_job_result( + pub async fn add_success_result( &self, job_id: Uuid, + provider: String, + client: Option, working_url: Option, retrievability: f64, result: ResultCode, @@ -86,10 +105,50 @@ impl JobRepository { let mut db = self.db.write().unwrap(); if let Some(job) = db.get_mut(&job_id) { - job.working_url = working_url; - job.retrievability = Some(retrievability as i64); - job.status = JobStatus::Completed; - job.result = Some(result); + let provider_result = ProviderResult { + provider, + client, + working_url: working_url.clone(), + retrievability, + result, + error: None, + }; + + job.results.push(provider_result); + job.updated_at = Utc::now(); + } + } + + pub async fn add_error_result( + &self, + job_id: Uuid, + provider: String, + client: Option, + error: Option, + result: Option, + ) { + let mut db = self.db.write().unwrap(); + + if let Some(job) = db.get_mut(&job_id) { + let provider_result = ProviderResult { + provider, + client, + working_url: None, + retrievability: 0.0, + result: result.unwrap_or(ResultCode::Error), + error, + }; + + job.results.push(provider_result); + job.updated_at = Utc::now(); + } + } + + pub async fn set_status(&self, job_id: Uuid, status: JobStatus) { + let mut db = self.db.write().unwrap(); + + if let Some(job) = db.get_mut(&job_id) { + job.status = status; job.updated_at = Utc::now(); } } diff --git a/url_finder/src/routes.rs b/url_finder/src/routes.rs index f01a8ea..ebd967c 100644 --- a/url_finder/src/routes.rs +++ b/url_finder/src/routes.rs @@ -92,6 +92,7 @@ pub fn create_routes(app_state: Arc) -> Router> { "/url/retrievability/:provider", get(handle_find_retri_by_sp), ) + .route("/url/client/:client", get(handle_find_client)) .layer(middleware::from_fn_with_state( app_state.clone(), cache_middleware,