Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions rabbitmq/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ pub struct JobMessage {
pub url: String,
pub start_time: DateTime<Utc>,
pub download_start_time: DateTime<Utc>,
pub start_range: u64,
pub end_range: u64,
pub start_range: i64,
pub end_range: i64,
pub excluded_workers: Vec<String>,
pub log_interval_ms: i64,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
95 changes: 58 additions & 37 deletions scheduler/src/api/jobs/create_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub struct CreateJobInput {
pub worker_count: Option<i64>,
pub entity: Option<String>,
pub note: Option<String>,
#[schema(minimum = 10, maximum = 1024)]
pub size_mb: Option<i64>,
#[schema(minimum = 100, maximum = 1000)]
pub log_interval_ms: Option<i64>,
}

#[derive(Serialize, ToSchema)]
Expand All @@ -41,6 +45,42 @@ pub struct CreateJobResponse {
pub sub_jobs: Vec<SubJob>,
}

#[derive(Debug)]
struct CreateJobParams {
pub url: Url,
pub routing_key: String,
pub worker_count: i64,
pub entity: Option<String>,
pub note: Option<String>,
pub size_mb: i64,
pub log_interval_ms: i64,
}

impl TryFrom<CreateJobInput> for CreateJobParams {
type Error = ApiResponse<()>;

fn try_from(input: CreateJobInput) -> Result<Self, Self::Error> {
let url = Url::parse(&input.url).map_err(|_| bad_request("Invalid URL provided"))?;
if url.scheme() != "http" && url.scheme() != "https" {
return Err(bad_request("URL scheme must be http or https"));
}

if input.routing_key.is_empty() {
return Err(bad_request("Routing key cannot be empty"));
}

Ok(CreateJobParams {
url,
routing_key: input.routing_key,
worker_count: input.worker_count.unwrap_or(10).clamp(1, 40),
entity: input.entity,
note: input.note,
size_mb: input.size_mb.unwrap_or(100).clamp(10, 1024), // Default 100 MB, Possible size 10-1024 MB
log_interval_ms: input.log_interval_ms.unwrap_or(1000).clamp(100, 1000), // Default 1000 ms, Possible range 100-1000 ms
})
}
}

/// Creates a new Job to be processed by the worker
#[utoipa::path(
post,
Expand Down Expand Up @@ -74,29 +114,31 @@ pub async fn handle_create_job(
info!("Creating job with payload: {:?}", payload);

// Validation
let url = validate_url(&payload)?;
validate_routing_key(&payload)?;

let target_worker_count = payload.worker_count.unwrap_or(10).clamp(1, 40);
let params: CreateJobParams = payload.try_into()?;
let target_worker_count = params.worker_count;

// Create the job
let (start_range, end_range) = get_file_range_for_file(url.as_ref()).await?;
let (start_range, end_range) =
get_file_range_for_file(params.url.as_ref(), &params.size_mb).await?;

let job_id = Uuid::new_v4();

let job = state
.repo
.job
.create_job(
job_id,
url.to_string(),
&payload.routing_key,
params.url.to_string(),
&params.routing_key,
JobStatus::Pending,
JobDetails::new(
start_range,
end_range,
target_worker_count,
payload.entity.clone(),
payload.note.clone(),
params.entity.clone(),
params.note.clone(),
params.log_interval_ms,
params.size_mb,
),
)
.await
Expand Down Expand Up @@ -132,32 +174,13 @@ pub async fn handle_create_job(
Ok(ok_response(CreateJobResponse { job, sub_jobs }))
}

/// Validate url and its scheme
fn validate_url(payload: &CreateJobInput) -> Result<Url, ApiResponse<()>> {
let url = Url::parse(&payload.url).map_err(|_| bad_request("Invalid URL provided"))?;
match url.scheme() {
"http" | "https" => Ok(url),
_ => Err(bad_request("URL scheme must be http or https")),
}
}

/// Validate routing key
/// In future we want to validate if the routing key is valid, maybe by checking a set of allowed keys
fn validate_routing_key(payload: &CreateJobInput) -> Result<(), ApiResponse<()>> {
if payload.routing_key.is_empty() {
return Err(bad_request("Routing key cannot be empty"));
}

Ok(())
}

/// Get a random range of 100MB from the file using HEAD request
async fn get_file_range_for_file(url: &str) -> Result<(i64, i64), ApiResponse<()>> {
async fn get_file_range_for_file(url: &str, size_mb: &i64) -> Result<(i64, i64), ApiResponse<()>> {
let response = Client::new()
.head(url)
.send()
.await
.map_err(|e| bad_request(format!("Failed to execute HEAD request {}", e)))?;
.map_err(|e| bad_request(format!("Failed to execute HEAD request {e}")))?;

debug!("Response: {:?}", response);

Expand All @@ -167,26 +190,24 @@ async fn get_file_range_for_file(url: &str) -> Result<(i64, i64), ApiResponse<()
.get(reqwest::header::CONTENT_LENGTH)
.ok_or_else(|| bad_request("Content-Length header is missing in the response"))?
.to_str()
.map_err(|e| bad_request(format!("Failed to parse Content-Length header: {}", e)))?
.map_err(|e| bad_request(format!("Failed to parse Content-Length header: {e}")))?
.parse::<i64>()
.map_err(|e| bad_request(format!("Failed to parse Content-Length header: {}", e)))?;
.map_err(|e| bad_request(format!("Failed to parse Content-Length header: {e}")))?;

debug!("Content-Length: {:?}", content_length);

let size_mb = 100; // 100 MB
let size = size_mb * 1024 * 1024;

if content_length < size {
return Err(bad_request(format!(
"File size is less than {} MB",
size_mb
)));
return Err(bad_request(format!("File size is less than {size_mb} MB")));
}

let mut rng = rand::thread_rng();
let start_range = rng.gen_range(0..content_length - size);
let end_range = start_range + size;

debug!("Selected range: {} - {}", start_range, end_range);

Ok((start_range, end_range))
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/src/api/services/services_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn handle_services_info(
.inspect_err(|e| {
error!("ServiceScaler get info error: {:?}", e);
})
.map_err(|e| internal_server_error(format!("ServiceScaler get info: {:?}", e)))?;
.map_err(|e| internal_server_error(format!("ServiceScaler get info: {e:?}")))?;

debug!(
"Successfully got service info name: {}, instances: {}",
Expand Down
4 changes: 2 additions & 2 deletions scheduler/src/api/services/services_scale_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn handle_services_scale_down(
.inspect_err(|e| {
error!("ServiceScaler scale down error: {:?}", e);
})
.map_err(|e| internal_server_error(format!("ServiceScaler scale down: {:?}", e)))?;
.map_err(|e| internal_server_error(format!("ServiceScaler scale down: {e:?}")))?;

debug!("Successfull worker scale down");

Expand All @@ -89,7 +89,7 @@ pub async fn handle_services_scale_down(
.inspect_err(|e| {
error!("ServiceScaler get info error: {:?}", e);
})
.map_err(|e| internal_server_error(format!("ServiceScaler get info: {:?}", e)))?;
.map_err(|e| internal_server_error(format!("ServiceScaler get info: {e:?}")))?;

// Clear descale deadline if desired count is down to 0
if let Some(desired_count) = service_info.desired_count {
Expand Down
4 changes: 2 additions & 2 deletions scheduler/src/api/services/services_scale_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub async fn handle_services_scale_up(
.inspect_err(|e| {
error!("ServiceScaler scale up error: {:?}", e);
})
.map_err(|e| internal_server_error(format!("ServiceScaler scale up: {:?}", e)))?;
.map_err(|e| internal_server_error(format!("ServiceScaler scale up: {e:?}")))?;

debug!("Successfull worker scale up");

Expand All @@ -92,7 +92,7 @@ pub async fn handle_services_scale_up(
.inspect_err(|e| {
error!("ServiceScaler get info error: {:?}", e);
})
.map_err(|e| internal_server_error(format!("ServiceScaler get info: {:?}", e)))?;
.map_err(|e| internal_server_error(format!("ServiceScaler get info: {e:?}")))?;

debug!(
"Successfully got service info name: {}, instances: {}",
Expand Down
11 changes: 6 additions & 5 deletions scheduler/src/background/sub_job_combineddhp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ async fn process_status_created(
url: job.url.clone(),
start_time,
download_start_time,
start_range: job.details.start_range as u64,
end_range: job.details.end_range as u64,
start_range: job.details.start_range,
end_range: job.details.end_range,
excluded_workers,
log_interval_ms: job.details.log_interval_ms,
},
};

Expand Down Expand Up @@ -153,7 +154,7 @@ async fn process_status_processing(
.data
.get_data_by_sub_job_id(&sub_job.id)
.await
.map_err(|e| SubJobHandlerError::Skip(format!("Failed to get data: {}", e)))?;
.map_err(|e| SubJobHandlerError::Skip(format!("Failed to get data: {e}")))?;

if data.len() >= workers_count as usize {
repo.sub_job
Expand All @@ -167,7 +168,7 @@ async fn process_status_processing(
.count_pending_sub_jobs(SubJobType::CombinedDHP, &sub_job.job_id)
.await
.map_err(|e| {
SubJobHandlerError::Skip(format!("Failed to count pending sub jobs: {}", e))
SubJobHandlerError::Skip(format!("Failed to count pending sub jobs: {e}"))
})?;

debug!("Pending sub jobs: {}", pending_sub_jobs);
Expand All @@ -180,7 +181,7 @@ async fn process_status_processing(
.update_job_status(&sub_job.job_id, JobStatus::Completed)
.await
.map_err(|e| {
SubJobHandlerError::Skip(format!("Failed to update job status: {}", e))
SubJobHandlerError::Skip(format!("Failed to update job status: {e}"))
})?;
}
}
Expand Down
6 changes: 6 additions & 0 deletions scheduler/src/background/sub_job_scaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ async fn process_scaling_created(

// Scale up each service
for service in services {
debug!("Scaling up service: {} id: {}", service.name, service.id);
let scaler = service_scaler_registry
.get_scaler(&service.provider_type)
.ok_or(SubJobHandlerError::FailedJob("No scaler found".to_string()))?;
Expand All @@ -103,6 +104,11 @@ async fn process_scaling_created(
.scale_up(&service, scale_each_by.try_into().unwrap_or(0))
.await
.map_err(|e| SubJobHandlerError::Skip(e.to_str()))?;

info!(
"Scaled up service: {} id: {} by {}",
service.name, service.id, scale_each_by
);
}

// Update sub job status to processing
Expand Down
7 changes: 7 additions & 0 deletions scheduler/src/repository/job_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct JobRepository {
}

#[derive(Serialize, Debug, FromRow, Type)]
#[allow(dead_code)]
pub struct JobWithData {
pub id: Uuid,
pub url: Option<String>,
Expand Down Expand Up @@ -95,6 +96,8 @@ pub struct JobDetails {
pub entity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub note: Option<String>,
pub log_interval_ms: i64,
pub size_mb: i64,
}
impl JobDetails {
pub fn new(
Expand All @@ -103,6 +106,8 @@ impl JobDetails {
target_worker_count: i64,
entity: Option<String>,
note: Option<String>,
log_interval_ms: i64,
size_mb: i64,
) -> Self {
JobDetails {
start_range,
Expand All @@ -111,6 +116,8 @@ impl JobDetails {
workers_count: None,
entity,
note,
log_interval_ms,
size_mb,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/src/service_scaler/docker_scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl DockerScaler {
.args([
"ps",
"--filter",
&format!("label=com.docker.compose.service={}", service_name),
&format!("label=com.docker.compose.service={service_name}"),
"--format",
"{{.ID}}",
])
Expand All @@ -80,7 +80,7 @@ impl DockerScaler {
"up",
"-d",
"--scale",
&format!("{}={}", service_name, count),
&format!("{service_name}={count}"),
service_name,
])
.status()
Expand Down
4 changes: 2 additions & 2 deletions scheduler/src/service_scaler/fargate_scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl FargateScaler {
.await
.map_err(|e| {
error!("DescribeServices failed: {:?}", e);
ServiceScalerError::GenericError(format!("DescribeServices failed: {}", e))
ServiceScalerError::GenericError(format!("DescribeServices failed: {e}"))
})?;

debug!("DescribeServices response: {:?}", resp);
Expand Down Expand Up @@ -147,7 +147,7 @@ impl FargateScaler {
.await
.map_err(|e| {
error!("UpdateService failed: {:?}", e);
ServiceScalerError::GenericError(format!("UpdateService failed: {}", e))
ServiceScalerError::GenericError(format!("UpdateService failed: {e}"))
})?;

let aws_service = res
Expand Down
6 changes: 3 additions & 3 deletions scheduler/src/service_scaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ impl ServiceScalerError {
pub fn to_str(&self) -> String {
match self {
ServiceScalerError::CommandError(msg) => {
format!("ServiceScalerError::CommandError: {}", msg)
format!("ServiceScalerError::CommandError: {msg}")
}
ServiceScalerError::GenericError(msg) => {
format!("ServiceScalerError::GenericError: {}", msg)
format!("ServiceScalerError::GenericError: {msg}")
}
ServiceScalerError::InvalidService(msg) => {
format!("ServiceScalerError::InvalidService: {}", msg)
format!("ServiceScalerError::InvalidService: {msg}")
}
}
}
Expand Down
Loading