Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ FluxQueue supports async functions too. Just define an async function and use th

```python
@fluxqueue.task()
async def send_email(data: dict):
async def send_email(to_email: str, subject: str, body: str):
async with email_context() as email_client:
message = EmailMessage()
message["From"] = "test@example.com"
Expand Down
1 change: 1 addition & 0 deletions crates/fluxqueue-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod logger;
mod redis_client;
mod task;
mod version_check;
mod worker;

pub use worker::*;
188 changes: 188 additions & 0 deletions crates/fluxqueue-worker/src/version_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
fn normalize(version: &str) -> String {
version
.replace("-rc-", "rc")
.replace("-beta-", "b")
.replace("-alpha-", "a")
}

fn parse_part(part: &str) -> (u32, i32, u32) {
let mut digits = String::new();
let mut suffix = String::new();

for c in part.chars() {
if c.is_ascii_digit() && suffix.is_empty() {
digits.push(c);
} else {
suffix.push(c);
}
}

let number = digits.parse().unwrap_or(0);

let (ptype, pnum) = if let Some(stripped) = suffix.strip_prefix("rc") {
(2, stripped.parse().unwrap_or(0))
} else if let Some(stripped) = suffix.strip_prefix('b') {
(1, stripped.parse().unwrap_or(0))
} else if let Some(stripped) = suffix.strip_prefix('a') {
(0, stripped.parse().unwrap_or(0))
} else {
(3, 0) // stable
};

(number, ptype, pnum)
}

pub fn compare_versions(v1: &str, v2: &str) -> i8 {
let v1 = normalize(v1);
let v2 = normalize(v2);

let parts1: Vec<_> = v1.split('.').collect();
let parts2: Vec<_> = v2.split('.').collect();

let len = parts1.len().max(parts2.len());

for i in 0..len {
let p1 = parts1.get(i).unwrap_or(&"0");
let p2 = parts2.get(i).unwrap_or(&"0");

let (n1, t1, r1) = parse_part(p1);
let (n2, t2, r2) = parse_part(p2);

if n1 != n2 {
return if n1 > n2 { 1 } else { -1 };
}

if t1 != t2 {
return if t1 > t2 { 1 } else { -1 };
}

if r1 != r2 {
return if r1 > r2 { 1 } else { -1 };
}
}

0
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_compare_versions() {
let worker_version = "0.3.1";
let client_lib_version = "0.1.2";

assert!(
compare_versions(worker_version, client_lib_version) == 1,
"left version is greater than the right one"
);

assert!(
compare_versions(client_lib_version, worker_version) == -1,
"left version is less than the right one"
);

assert!(
compare_versions("0.2.0", "0.2.0") == 0,
"both are equal versions"
);

assert!(
compare_versions("1.2.0", "1.2") == 0,
"both are equal versions"
);

assert!(
compare_versions("1.2.3", "1.2.3a1") == 1,
"alpha version is less than the latest"
);

assert!(
compare_versions("1.2.3", "1.2.3b1") == 1,
"beta version is less than the latest"
);

assert!(
compare_versions("1.2.3", "1.2.3rc1") == 1,
"rc version is less than the latest"
);

assert!(
compare_versions("1.2.3-rc-1", "1.2.3rc1") == 0,
"both are equal"
);

assert!(
compare_versions("1.2.3-beta-1", "1.2.3b1") == 0,
"both are equal"
);

assert!(
compare_versions("1.2.3-alpha-1", "1.2.3a1") == 0,
"both are equal"
);

assert!(
compare_versions("1.2.3-rc-1", "1.2.3b1") == 1,
"left is greater"
);

assert!(compare_versions("1.2.3a1", "1.2.3b1") == -1, "alpha < beta");

assert!(compare_versions("1.2.3b1", "1.2.3rc1") == -1, "beta < rc");

assert!(compare_versions("1.2.3a1", "1.2.3rc1") == -1, "alpha < rc");

assert!(compare_versions("1.2.3rc2", "1.2.3rc1") == 1, "rc2 > rc1");

assert!(compare_versions("1.2.3b2", "1.2.3b10") == -1, "b2 < b10");

assert!(compare_versions("1.2.3a10", "1.2.3a2") == 1, "a10 > a2");

assert!(
compare_versions("1.2.3.0", "1.2.3") == 0,
"trailing zeros ignored"
);

assert!(
compare_versions("1.2.3.1", "1.2.3") == 1,
"extra segment greater"
);

assert!(
compare_versions("1.2", "1.2.1") == -1,
"missing segment treated as zero"
);

assert!(
compare_versions("1.2.3-rc-2", "1.2.3rc1") == 1,
"rust rc2 > python rc1"
);

assert!(
compare_versions("1.2.3-beta-2", "1.2.3b1") == 1,
"rust beta2 > python b1"
);

assert!(
compare_versions("1.2.3-alpha-2", "1.2.3a1") == 1,
"rust alpha2 > python a1"
);

assert!(
compare_versions("1.2.3", "1.2.4a1") == -1,
"next version prerelease is higher"
);

assert!(
compare_versions("1.2.4a1", "1.2.3") == 1,
"next version prerelease is higher"
);

assert!(
compare_versions("1.2.3rc1", "1.2.3-rc-1") == 0,
"symmetry equality"
);
}
}
120 changes: 100 additions & 20 deletions crates/fluxqueue-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ fn check_client_library_version() -> Result<()> {
Ok(version.to_string())
})?;

let comparison = compare_versions(worker_version, &library_version);
let comparison = crate::version_check::compare_versions(worker_version, &library_version);

if comparison == -1 {
return Err(anyhow!(
Expand All @@ -367,25 +367,6 @@ fn check_client_library_version() -> Result<()> {
Ok(())
}

fn compare_versions(v1: &str, v2: &str) -> i8 {
let mut parts1: Vec<u32> = v1.split('.').map(|p| p.parse().unwrap_or(0)).collect();
let mut parts2: Vec<u32> = v2.split('.').map(|p| p.parse().unwrap_or(0)).collect();

let len = parts1.len().max(parts2.len());
parts1.resize(len, 0);
parts2.resize(len, 0);

for (a, b) in parts1.iter().zip(parts2.iter()) {
if a > b {
return 1;
}
if a < b {
return -1;
}
}
0
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -464,6 +445,105 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_sync_task_with_context() -> Result<()> {
let module_path_str = get_test_module_path("test_tasks_with_context.py");
let task_registry = Arc::new(TaskRegistry::new(&module_path_str, "default")?);
let dispatcher_pool = Arc::new(PythonDispatcher::new(task_registry.clone())?);

let task = task_registry.get_task(Arc::new("sync-func-with-context".to_string()));
assert!(task.is_some());

if let Some(task_func) = task {
let task = Task {
id: "test-id".to_string(),
name: "name".to_string(),
args: vec![144],
kwargs: vec![128],
created_at: 0,
retries: 0,
max_retries: 3,
};

let result = run_task(
Arc::new("test".to_string()),
dispatcher_pool.clone(),
Arc::new(task),
task_func,
)
.await;
assert!(!result.is_err());
}

Ok(())
}

#[tokio::test]
async fn test_async_task_with_context() -> Result<()> {
let module_path_str = get_test_module_path("test_tasks_with_context.py");
let task_registry = Arc::new(TaskRegistry::new(&module_path_str, "default")?);
let dispatcher_pool = Arc::new(PythonDispatcher::new(task_registry.clone())?);

let task = task_registry.get_task(Arc::new("async-func-with-context".to_string()));
assert!(task.is_some());

if let Some(task_func) = task {
let task = Task {
id: "test-id".to_string(),
name: "name".to_string(),
args: vec![144],
kwargs: vec![128],
created_at: 0,
retries: 0,
max_retries: 3,
};

let result = run_task(
Arc::new("test".to_string()),
dispatcher_pool.clone(),
Arc::new(task),
task_func,
)
.await;
assert!(!result.is_err());
}

Ok(())
}

#[tokio::test]
async fn test_task_with_custom_context() -> Result<()> {
let module_path_str = get_test_module_path("test_tasks_with_context.py");
let task_registry = Arc::new(TaskRegistry::new(&module_path_str, "default")?);
let dispatcher_pool = Arc::new(PythonDispatcher::new(task_registry.clone())?);

let task = task_registry.get_task(Arc::new("test-custom-context".to_string()));
assert!(task.is_some());

if let Some(task_func) = task {
let task = Task {
id: "test-id".to_string(),
name: "name".to_string(),
args: vec![144],
kwargs: vec![128],
created_at: 0,
retries: 0,
max_retries: 3,
};

let result = run_task(
Arc::new("test".to_string()),
dispatcher_pool.clone(),
Arc::new(task),
task_func,
)
.await;
assert!(!result.is_err());
}

Ok(())
}

async fn enqueue_tasks(redis_url: &str) -> Result<()> {
use deadpool_redis::{Config, Runtime};
use fluxqueue_common::{
Expand Down
3 changes: 3 additions & 0 deletions crates/fluxqueue-worker/tests/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from fluxqueue import FluxQueue

fluxqueue = FluxQueue()
15 changes: 6 additions & 9 deletions crates/fluxqueue-worker/tests/test_tasks_duplicate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
def task1():
pass

from .main import fluxqueue

task1.task_name = "task-1" # type: ignore
task1.queue = "default" # type: ignore


def task2():
@fluxqueue.task(name="task-1")
def task_1():
pass


task2.task_name = "task-1" # type: ignore
task2.queue = "default" # type: ignore
@fluxqueue.task(name="task-1")
def task_2():
pass
Loading