Skip to content

Commit 39da521

Browse files
committed
feat(worker): Add client library version check before worker initialization
1 parent 9b31023 commit 39da521

3 files changed

Lines changed: 58 additions & 1 deletion

File tree

crates/fluxqueue-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ impl FluxQueueCore {
7676

7777
#[pymodule]
7878
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
79+
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
7980
m.add_class::<FluxQueueCore>()?;
8081
Ok(())
8182
}

crates/fluxqueue-worker/src/worker.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use anyhow::Result;
1+
use anyhow::{Result, anyhow};
2+
use pyo3::Python;
3+
use pyo3::types::PyAnyMethods;
24
use std::sync::Arc;
35
use std::sync::atomic::{AtomicUsize, Ordering};
46
use std::time::Duration;
@@ -10,6 +12,8 @@ use crate::redis_client::RedisClient;
1012
use crate::task::{PythonDispatcher, TaskData, TaskRegistry};
1113
use fluxqueue_common::{Task, deserialize_raw_task_data};
1214

15+
static MIN_CLIENT_LIB_VERSION: &str = "0.3.0";
16+
1317
pub async fn run_worker(
1418
mut shutdown: watch::Receiver<bool>,
1519
concurrency: usize,
@@ -18,6 +22,11 @@ pub async fn run_worker(
1822
queue_name: String,
1923
save_dead_tasks: bool,
2024
) -> Result<()> {
25+
check_client_library_version().map_err(|e| {
26+
tracing::error!("{}", e);
27+
std::process::exit(1);
28+
})?;
29+
2130
let redis_client = RedisClient::new(&redis_url).await.map_err(|e| {
2231
tracing::error!("{}", e);
2332
std::process::exit(1);
@@ -331,6 +340,52 @@ fn check_worker_is_ready(ready_check: ReadyCheck) {
331340
}
332341
}
333342

343+
fn check_client_library_version() -> Result<()> {
344+
let library_version = Python::attach(|py| -> Result<String> {
345+
let module = py.import("fluxqueue")?;
346+
let version = module.getattr("__version__")?;
347+
Ok(version.to_string())
348+
})?;
349+
350+
let comparison = compare_versions(MIN_CLIENT_LIB_VERSION, &library_version);
351+
if comparison == -1 {
352+
tracing::warn!(
353+
"Worker version '{}' is older than client library '{}'. For full functionality, update the worker to match the client version.",
354+
MIN_CLIENT_LIB_VERSION,
355+
&library_version
356+
);
357+
}
358+
359+
if comparison == 1 {
360+
return Err(anyhow!(
361+
"Minimum required client library version is: {}, found: {}",
362+
MIN_CLIENT_LIB_VERSION,
363+
&library_version
364+
));
365+
}
366+
367+
Ok(())
368+
}
369+
370+
fn compare_versions(v1: &str, v2: &str) -> i8 {
371+
let mut parts1: Vec<u32> = v1.split('.').map(|p| p.parse().unwrap_or(0)).collect();
372+
let mut parts2: Vec<u32> = v2.split('.').map(|p| p.parse().unwrap_or(0)).collect();
373+
374+
let len = parts1.len().max(parts2.len());
375+
parts1.resize(len, 0);
376+
parts2.resize(len, 0);
377+
378+
for (a, b) in parts1.iter().zip(parts2.iter()) {
379+
if a > b {
380+
return 1;
381+
}
382+
if a < b {
383+
return -1;
384+
}
385+
}
386+
0
387+
}
388+
334389
#[cfg(test)]
335390
mod tests {
336391
use super::*;

python/fluxqueue/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from ._core import __version__ as __version__
12
from .client import FluxQueue as FluxQueue
23
from .context import Context as Context
34
from .models import TaskMetadata as TaskMetadata

0 commit comments

Comments
 (0)