Skip to content

Commit d1e607b

Browse files
committed
Add context tests
1 parent 1dc1d1c commit d1e607b

2 files changed

Lines changed: 115 additions & 1 deletion

File tree

crates/fluxqueue-worker/src/worker.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,105 @@ mod tests {
445445
Ok(())
446446
}
447447

448+
#[tokio::test]
449+
async fn test_sync_task_with_context() -> Result<()> {
450+
let module_path_str = get_test_module_path("test_tasks_with_context.py");
451+
let task_registry = Arc::new(TaskRegistry::new(&module_path_str, "default")?);
452+
let dispatcher_pool = Arc::new(PythonDispatcher::new(task_registry.clone())?);
453+
454+
let task = task_registry.get_task(Arc::new("sync-func-with-context".to_string()));
455+
assert!(task.is_some());
456+
457+
if let Some(task_func) = task {
458+
let task = Task {
459+
id: "test-id".to_string(),
460+
name: "name".to_string(),
461+
args: vec![144],
462+
kwargs: vec![128],
463+
created_at: 0,
464+
retries: 0,
465+
max_retries: 3,
466+
};
467+
468+
let result = run_task(
469+
Arc::new("test".to_string()),
470+
dispatcher_pool.clone(),
471+
Arc::new(task),
472+
task_func,
473+
)
474+
.await;
475+
assert!(!result.is_err());
476+
}
477+
478+
Ok(())
479+
}
480+
481+
#[tokio::test]
482+
async fn test_async_task_with_context() -> Result<()> {
483+
let module_path_str = get_test_module_path("test_tasks_with_context.py");
484+
let task_registry = Arc::new(TaskRegistry::new(&module_path_str, "default")?);
485+
let dispatcher_pool = Arc::new(PythonDispatcher::new(task_registry.clone())?);
486+
487+
let task = task_registry.get_task(Arc::new("async-func-with-context".to_string()));
488+
assert!(task.is_some());
489+
490+
if let Some(task_func) = task {
491+
let task = Task {
492+
id: "test-id".to_string(),
493+
name: "name".to_string(),
494+
args: vec![144],
495+
kwargs: vec![128],
496+
created_at: 0,
497+
retries: 0,
498+
max_retries: 3,
499+
};
500+
501+
let result = run_task(
502+
Arc::new("test".to_string()),
503+
dispatcher_pool.clone(),
504+
Arc::new(task),
505+
task_func,
506+
)
507+
.await;
508+
assert!(!result.is_err());
509+
}
510+
511+
Ok(())
512+
}
513+
514+
#[tokio::test]
515+
async fn test_task_with_custom_context() -> Result<()> {
516+
let module_path_str = get_test_module_path("test_tasks_with_context.py");
517+
let task_registry = Arc::new(TaskRegistry::new(&module_path_str, "default")?);
518+
let dispatcher_pool = Arc::new(PythonDispatcher::new(task_registry.clone())?);
519+
520+
let task = task_registry.get_task(Arc::new("test-custom-context".to_string()));
521+
assert!(task.is_some());
522+
523+
if let Some(task_func) = task {
524+
let task = Task {
525+
id: "test-id".to_string(),
526+
name: "name".to_string(),
527+
args: vec![144],
528+
kwargs: vec![128],
529+
created_at: 0,
530+
retries: 0,
531+
max_retries: 3,
532+
};
533+
534+
let result = run_task(
535+
Arc::new("test".to_string()),
536+
dispatcher_pool.clone(),
537+
Arc::new(task),
538+
task_func,
539+
)
540+
.await;
541+
assert!(!result.is_err());
542+
}
543+
544+
Ok(())
545+
}
546+
448547
async fn enqueue_tasks(redis_url: &str) -> Result<()> {
449548
use deadpool_redis::{Config, Runtime};
450549
use fluxqueue_common::{

crates/fluxqueue-worker/tests/test_tasks_with_context.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,19 @@
55

66
@fluxqueue.task_with_context()
77
def sync_func_with_context(ctx: Context):
8-
print(ctx.metadata)
8+
print("sync_func_with_context metadata: ", ctx.metadata)
9+
10+
11+
@fluxqueue.task_with_context()
12+
async def async_func_with_context(ctx: Context):
13+
print("async_func_with_context metadata: ", ctx.metadata)
14+
15+
16+
class TestContext(Context):
17+
def get_test_connection(self):
18+
return "conn"
19+
20+
21+
@fluxqueue.task_with_context()
22+
def test_custom_context(ctx: TestContext):
23+
print("test_custom_context metadata: ", ctx.metadata)

0 commit comments

Comments
 (0)