Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
96ba848
feat(context): Start working on contexts feature
CCXLV Feb 20, 2026
856eb99
Fix clippy error by moving args to a struct
CCXLV Feb 21, 2026
fed2e2a
Move structs below the run_worker func
CCXLV Feb 21, 2026
eba4d34
Start working on context in the worker
CCXLV Feb 22, 2026
e65bef3
Fix clippy error
CCXLV Feb 22, 2026
154a1ef
Remove dispatcher pool, was no different without it
CCXLV Feb 22, 2026
8f1b706
Add tests for tasks
CCXLV Feb 22, 2026
3a30b06
Remove commented lines
CCXLV Feb 22, 2026
315d0f4
Finish context subclasses feature
CCXLV Feb 23, 2026
302aaed
Fix clippy errors
CCXLV Feb 23, 2026
0897f5d
Add process exit on task registry init failure
CCXLV Feb 24, 2026
60baa8a
Add thread storage to the context
CCXLV Feb 24, 2026
04df64d
Add run-worker wrapper script
CCXLV Feb 24, 2026
5a01f6d
Fix unnecessary check in context decorator
CCXLV Feb 24, 2026
180aa29
Update thread storage to ContextVar instead of thread.local()
CCXLV Feb 25, 2026
acdb468
Add task metadata feat
CCXLV Feb 25, 2026
e24bd55
Finish
CCXLV Feb 26, 2026
0844781
Update cargo tests
CCXLV Feb 26, 2026
d690219
Fix tests
CCXLV Feb 26, 2026
e6db82f
Fix tests
CCXLV Feb 26, 2026
ece79b7
Fix tests
CCXLV Feb 26, 2026
d4c06ce
Fix tests
CCXLV Feb 26, 2026
7f57bf4
Fix tests once again
CCXLV Feb 26, 2026
371f6b1
Fix tests
CCXLV Feb 26, 2026
a27a5f2
Move tests to bash script
CCXLV Feb 26, 2026
8435fa6
Fix run-tests
CCXLV Feb 26, 2026
fc4d10d
Fix run-tests
CCXLV Feb 26, 2026
d507fd9
Fix tests
CCXLV Feb 26, 2026
7f1bd54
Update tests
CCXLV Feb 26, 2026
fd2c979
Revert "Update tests"
CCXLV Feb 26, 2026
5e593a3
Remove uv installation
CCXLV Feb 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .github/actions/tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ runs:
with:
python-version: ${{ inputs.python-version }}

- name: Install uv
uses: astral-sh/setup-uv@v7

- name: Install dependencies
shell: bash -euxo pipefail {0}
run: uv sync --extra tests
run: pip install -e .[tests]

- name: Set up Rust
uses: dtolnay/rust-toolchain@stable
Expand All @@ -29,5 +26,5 @@ runs:
- name: Run Tests
shell: bash -euxo pipefail {0}
run: |
uv run pytest tests/
pytest tests/
cargo test --workspace --locked
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.13.11
8 changes: 6 additions & 2 deletions crates/fluxqueue-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ name = "fluxqueue-worker"
path = "src/main.rs"

[dependencies]
pyo3 = { version = "0.27.2", features = ["auto-initialize"] }
pyo3-async-runtimes = { version = "0.27.0", features = ["tokio-runtime"] }
uuid = { version = "1.20.0", features = ["v4"] }
redis = "0.32.7"
Expand All @@ -22,12 +23,15 @@ tokio = { version = "1.49.0", features = [
"signal",
"time",
] }
pyo3 = { version = "0.27.2", features = ["auto-initialize"] }
rmp-serde = "1.3.1"
rmpv = { version = "1.3.1", features = ["with-serde"] }
serde = { version = "1.0.228", features = ["derive"] }
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "time"] }
tracing-subscriber = { version = "0.3.22", features = [
"env-filter",
"fmt",
"time",
] }
time = { version = "0.3.47", features = ["local-offset", "macros"] }
clap = { version = "4.5.56", features = ["derive", "env"] }
anyhow = "1.0.100"
Expand Down
27 changes: 0 additions & 27 deletions crates/fluxqueue-worker/scripts/get_functions.py

This file was deleted.

59 changes: 59 additions & 0 deletions crates/fluxqueue-worker/scripts/get_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import importlib
import inspect
import sys
from pathlib import Path
from typing import get_type_hints

from fluxqueue import Context


def get_registry(module_path: str, queue: str, module_dir: str | None = None):
if module_dir:
module_dir_path = Path(module_dir).resolve()
if str(module_dir_path) not in sys.path:
sys.path.insert(0, str(module_dir_path))

module = importlib.import_module(module_path)
registry = {"tasks": {}, "contexts": {}}
for _name, obj in inspect.getmembers(module):
if inspect.isfunction(obj):
task_name = getattr(obj, "task_name", None)
task_queue = getattr(obj, "queue", None)
if not task_queue or task_queue != queue:
continue

if registry["tasks"].get(task_name):
raise ValueError(f"Task '{task_name}' is duplicated")

original_func = getattr(obj, "__wrapped__", obj)

hints = get_type_hints(original_func)
sig = inspect.signature(original_func)
context_params = {
name: hints[name]
for name in sig.parameters
if name in hints
and isinstance(hints[name], type)
and issubclass(hints[name], Context)
}
if not context_params:
context_name = None
else:
context = context_params[next(iter(context_params))]
context_name = getattr(context, "__fluxqueue_context__", None)

registry["tasks"][task_name] = {
"func": original_func,
"context_name": context_name,
}
elif inspect.isclass(obj):
if not issubclass(obj, Context):
continue

context_name = getattr(obj, "__fluxqueue_context__", None)
if registry["contexts"].get(context_name):
raise ValueError(f"Context '{context_name}' is duplicated")

registry["contexts"][context_name] = obj

return registry
4 changes: 3 additions & 1 deletion crates/fluxqueue-worker/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ pub fn initial_logs(
concurrency: usize,
redis_url: &str,
tasks_module_path: &str,
tasks: &Vec<&String>,
tasks: Vec<String>,
contexts: Vec<String>,
) {
info!("Queue: {}", queue_name);
info!("Concurrency: {}", concurrency);
info!("Redis: {}", redis_url);
info!("Tasks module: {}", tasks_module_path);
info!("Tasks found: {:?}", tasks);
info!("Contexts found: {:?}", contexts);
info!("Starting up the executors...");
}

Expand Down
4 changes: 2 additions & 2 deletions crates/fluxqueue-worker/src/redis_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl RedisClient {
Ok(())
}

pub async fn set_executors_heartbeat(&self, executor_ids: Arc<Vec<Arc<str>>>) -> Result<()> {
pub async fn set_executors_heartbeat(&self, executor_ids: Arc<Vec<Arc<String>>>) -> Result<()> {
for id in executor_ids.iter() {
self.set_executor_heartbeat(id).await?;
}
Expand All @@ -60,7 +60,7 @@ impl RedisClient {
pub async fn cleanup_executors_registry(
&self,
queue_name: &str,
ids: Arc<Vec<Arc<str>>>,
ids: Arc<Vec<Arc<String>>>,
) -> Result<()> {
let mut conn = self.redis_pool.get().await?;
let executors_key = keys::get_executors_key(queue_name);
Expand Down
Loading