Skip to content

Commit 8d13fc6

Browse files
authored
feat(context): Add context subclass feature (#114)
* feat(context): Start working on contexts feature * Fix clippy error by moving args to a struct * Move structs below the run_worker func * Start working on context in the worker * Fix clippy error * Remove dispatcher pool, was no different without it * Add tests for tasks * Remove commented lines * Finish context subclasses feature * Fix clippy errors * Add process exit on task registry init failure * Add thread storage to the context * Add run-worker wrapper script * Fix unnecessary check in context decorator * Update thread storage to ContextVar instead of thread.local() * Add task metadata feat * Finish * Update cargo tests * Fix tests * Fix tests * Fix tests * Fix tests * Fix tests once again * Fix tests * Move tests to bash script * Fix run-tests * Fix run-tests * Fix tests * Update tests * Revert "Update tests" This reverts commit 7f1bd54. * Remove uv installation
1 parent e058006 commit 8d13fc6

16 files changed

Lines changed: 843 additions & 372 deletions

File tree

.github/actions/tests/action.yml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,9 @@ runs:
1313
with:
1414
python-version: ${{ inputs.python-version }}
1515

16-
- name: Install uv
17-
uses: astral-sh/setup-uv@v7
18-
1916
- name: Install dependencies
2017
shell: bash -euxo pipefail {0}
21-
run: uv sync --extra tests
18+
run: pip install -e .[tests]
2219

2320
- name: Set up Rust
2421
uses: dtolnay/rust-toolchain@stable
@@ -29,5 +26,5 @@ runs:
2926
- name: Run Tests
3027
shell: bash -euxo pipefail {0}
3128
run: |
32-
uv run pytest tests/
29+
pytest tests/
3330
cargo test --workspace --locked

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.13.11

crates/fluxqueue-worker/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ name = "fluxqueue-worker"
1212
path = "src/main.rs"
1313

1414
[dependencies]
15+
pyo3 = { version = "0.27.2", features = ["auto-initialize"] }
1516
pyo3-async-runtimes = { version = "0.27.0", features = ["tokio-runtime"] }
1617
uuid = { version = "1.20.0", features = ["v4"] }
1718
redis = "0.32.7"
@@ -22,12 +23,15 @@ tokio = { version = "1.49.0", features = [
2223
"signal",
2324
"time",
2425
] }
25-
pyo3 = { version = "0.27.2", features = ["auto-initialize"] }
2626
rmp-serde = "1.3.1"
2727
rmpv = { version = "1.3.1", features = ["with-serde"] }
2828
serde = { version = "1.0.228", features = ["derive"] }
2929
tracing = "0.1.44"
30-
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "time"] }
30+
tracing-subscriber = { version = "0.3.22", features = [
31+
"env-filter",
32+
"fmt",
33+
"time",
34+
] }
3135
time = { version = "0.3.47", features = ["local-offset", "macros"] }
3236
clap = { version = "4.5.56", features = ["derive", "env"] }
3337
anyhow = "1.0.100"

crates/fluxqueue-worker/scripts/get_functions.py

Lines changed: 0 additions & 27 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import importlib
2+
import inspect
3+
import sys
4+
from pathlib import Path
5+
from typing import get_type_hints
6+
7+
from fluxqueue import Context
8+
9+
10+
def get_registry(module_path: str, queue: str, module_dir: str | None = None):
11+
if module_dir:
12+
module_dir_path = Path(module_dir).resolve()
13+
if str(module_dir_path) not in sys.path:
14+
sys.path.insert(0, str(module_dir_path))
15+
16+
module = importlib.import_module(module_path)
17+
registry = {"tasks": {}, "contexts": {}}
18+
for _name, obj in inspect.getmembers(module):
19+
if inspect.isfunction(obj):
20+
task_name = getattr(obj, "task_name", None)
21+
task_queue = getattr(obj, "queue", None)
22+
if not task_queue or task_queue != queue:
23+
continue
24+
25+
if registry["tasks"].get(task_name):
26+
raise ValueError(f"Task '{task_name}' is duplicated")
27+
28+
original_func = getattr(obj, "__wrapped__", obj)
29+
30+
hints = get_type_hints(original_func)
31+
sig = inspect.signature(original_func)
32+
context_params = {
33+
name: hints[name]
34+
for name in sig.parameters
35+
if name in hints
36+
and isinstance(hints[name], type)
37+
and issubclass(hints[name], Context)
38+
}
39+
if not context_params:
40+
context_name = None
41+
else:
42+
context = context_params[next(iter(context_params))]
43+
context_name = getattr(context, "__fluxqueue_context__", None)
44+
45+
registry["tasks"][task_name] = {
46+
"func": original_func,
47+
"context_name": context_name,
48+
}
49+
elif inspect.isclass(obj):
50+
if not issubclass(obj, Context):
51+
continue
52+
53+
context_name = getattr(obj, "__fluxqueue_context__", None)
54+
if registry["contexts"].get(context_name):
55+
raise ValueError(f"Context '{context_name}' is duplicated")
56+
57+
registry["contexts"][context_name] = obj
58+
59+
return registry

crates/fluxqueue-worker/src/logger.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ pub fn initial_logs(
3434
concurrency: usize,
3535
redis_url: &str,
3636
tasks_module_path: &str,
37-
tasks: &Vec<&String>,
37+
tasks: Vec<String>,
38+
contexts: Vec<String>,
3839
) {
3940
info!("Queue: {}", queue_name);
4041
info!("Concurrency: {}", concurrency);
4142
info!("Redis: {}", redis_url);
4243
info!("Tasks module: {}", tasks_module_path);
4344
info!("Tasks found: {:?}", tasks);
45+
info!("Contexts found: {:?}", contexts);
4446
info!("Starting up the executors...");
4547
}
4648

crates/fluxqueue-worker/src/redis_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ impl RedisClient {
3333
Ok(())
3434
}
3535

36-
pub async fn set_executors_heartbeat(&self, executor_ids: Arc<Vec<Arc<str>>>) -> Result<()> {
36+
pub async fn set_executors_heartbeat(&self, executor_ids: Arc<Vec<Arc<String>>>) -> Result<()> {
3737
for id in executor_ids.iter() {
3838
self.set_executor_heartbeat(id).await?;
3939
}
@@ -60,7 +60,7 @@ impl RedisClient {
6060
pub async fn cleanup_executors_registry(
6161
&self,
6262
queue_name: &str,
63-
ids: Arc<Vec<Arc<str>>>,
63+
ids: Arc<Vec<Arc<String>>>,
6464
) -> Result<()> {
6565
let mut conn = self.redis_pool.get().await?;
6666
let executors_key = keys::get_executors_key(queue_name);

0 commit comments

Comments
 (0)