Skip to content

Commit 9ee8ce6

Browse files
authored
feat(worker): Add client library version check before worker initialization (#128)
* feat(worker): Add client library version check before worker initialization * Update versions * Update
1 parent 9b31023 commit 9ee8ce6

6 files changed

Lines changed: 61 additions & 5 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fluxqueue-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fluxqueue_core"
3-
version = "0.2.1"
3+
version = "0.3.0-rc1"
44
repository.workspace = true
55
homepage.workspace = true
66
authors.workspace = true

crates/fluxqueue-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ impl FluxQueueCore {
7676

7777
#[pymodule]
7878
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
79+
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
7980
m.add_class::<FluxQueueCore>()?;
8081
Ok(())
8182
}

crates/fluxqueue-worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fluxqueue-worker"
3-
version = "0.2.1"
3+
version = "0.3.0-rc1"
44
edition = "2024"
55

66
[lib]

crates/fluxqueue-worker/src/worker.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use anyhow::Result;
1+
use anyhow::{Result, anyhow};
2+
use pyo3::Python;
3+
use pyo3::types::PyAnyMethods;
24
use std::sync::Arc;
35
use std::sync::atomic::{AtomicUsize, Ordering};
46
use std::time::Duration;
@@ -18,6 +20,11 @@ pub async fn run_worker(
1820
queue_name: String,
1921
save_dead_tasks: bool,
2022
) -> Result<()> {
23+
check_client_library_version().map_err(|e| {
24+
tracing::error!("{}", e);
25+
std::process::exit(1);
26+
})?;
27+
2128
let redis_client = RedisClient::new(&redis_url).await.map_err(|e| {
2229
tracing::error!("{}", e);
2330
std::process::exit(1);
@@ -331,6 +338,53 @@ fn check_worker_is_ready(ready_check: ReadyCheck) {
331338
}
332339
}
333340

341+
fn check_client_library_version() -> Result<()> {
342+
let worker_version = env!("CARGO_PKG_VERSION");
343+
let library_version = Python::attach(|py| -> Result<String> {
344+
let module = py.import("fluxqueue")?;
345+
let version = module.getattr("__version__")?;
346+
Ok(version.to_string())
347+
})?;
348+
349+
let comparison = compare_versions(worker_version, &library_version);
350+
if comparison == -1 {
351+
tracing::warn!(
352+
"Worker version '{}' is older than client library '{}'. For full functionality, update the worker to match the client version.",
353+
worker_version,
354+
&library_version
355+
);
356+
}
357+
358+
if comparison == 1 {
359+
return Err(anyhow!(
360+
"Minimum required client library version is: {}, found: {}",
361+
worker_version,
362+
&library_version
363+
));
364+
}
365+
366+
Ok(())
367+
}
368+
369+
fn compare_versions(v1: &str, v2: &str) -> i8 {
370+
let mut parts1: Vec<u32> = v1.split('.').map(|p| p.parse().unwrap_or(0)).collect();
371+
let mut parts2: Vec<u32> = v2.split('.').map(|p| p.parse().unwrap_or(0)).collect();
372+
373+
let len = parts1.len().max(parts2.len());
374+
parts1.resize(len, 0);
375+
parts2.resize(len, 0);
376+
377+
for (a, b) in parts1.iter().zip(parts2.iter()) {
378+
if a > b {
379+
return 1;
380+
}
381+
if a < b {
382+
return -1;
383+
}
384+
}
385+
0
386+
}
387+
334388
#[cfg(test)]
335389
mod tests {
336390
use super::*;

python/fluxqueue/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from ._core import __version__ as __version__
12
from .client import FluxQueue as FluxQueue
23
from .context import Context as Context
34
from .models import TaskMetadata as TaskMetadata

0 commit comments

Comments
 (0)